4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22 BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
37 class AuthKeyFactory(object):
38 """Factory class for creating auth keys with unique conf key ID"""
41 self._conf_key_ids = {}
43 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44 """ create a random key with unique conf key id """
45 conf_key_id = randint(0, 0xFFFFFFFF)
46 while conf_key_id in self._conf_key_ids:
47 conf_key_id = randint(0, 0xFFFFFFFF)
48 self._conf_key_ids[conf_key_id] = 1
49 key = scapy.compat.raw(
50 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51 return VppBFDAuthKey(test=test, auth_type=auth_type,
52 conf_key_id=conf_key_id, key=key)
55 @unittest.skipUnless(running_extended_tests, "part of extended tests")
56 class BFDAPITestCase(VppTestCase):
57 """Bidirectional Forwarding Detection (BFD) - API"""
64 super(BFDAPITestCase, cls).setUpClass()
65 cls.vapi.cli("set log class bfd level debug")
67 cls.create_pg_interfaces(range(2))
68 for i in cls.pg_interfaces:
74 super(BFDAPITestCase, cls).tearDownClass()
78 def tearDownClass(cls):
79 super(BFDAPITestCase, cls).tearDownClass()
82 super(BFDAPITestCase, self).setUp()
83 self.factory = AuthKeyFactory()
85 def test_add_bfd(self):
86 """ create a BFD session """
87 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
88 session.add_vpp_config()
89 self.logger.debug("Session state is %s", session.state)
90 session.remove_vpp_config()
91 session.add_vpp_config()
92 self.logger.debug("Session state is %s", session.state)
93 session.remove_vpp_config()
95 def test_double_add(self):
96 """ create the same BFD session twice (negative case) """
97 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
98 session.add_vpp_config()
100 with self.vapi.assert_negative_api_retval():
101 session.add_vpp_config()
103 session.remove_vpp_config()
105 def test_add_bfd6(self):
106 """ create IPv6 BFD session """
107 session = VppBFDUDPSession(
108 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
109 session.add_vpp_config()
110 self.logger.debug("Session state is %s", session.state)
111 session.remove_vpp_config()
112 session.add_vpp_config()
113 self.logger.debug("Session state is %s", session.state)
114 session.remove_vpp_config()
116 def test_mod_bfd(self):
117 """ modify BFD session parameters """
118 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
119 desired_min_tx=50000,
120 required_min_rx=10000,
122 session.add_vpp_config()
123 s = session.get_bfd_udp_session_dump_entry()
124 self.assert_equal(session.desired_min_tx,
126 "desired min transmit interval")
127 self.assert_equal(session.required_min_rx,
129 "required min receive interval")
130 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
131 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
132 required_min_rx=session.required_min_rx * 2,
133 detect_mult=session.detect_mult * 2)
134 s = session.get_bfd_udp_session_dump_entry()
135 self.assert_equal(session.desired_min_tx,
137 "desired min transmit interval")
138 self.assert_equal(session.required_min_rx,
140 "required min receive interval")
141 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
143 def test_add_sha1_keys(self):
144 """ add SHA1 keys """
146 keys = [self.factory.create_random_key(
147 self) for i in range(0, key_count)]
149 self.assertFalse(key.query_vpp_config())
153 self.assertTrue(key.query_vpp_config())
155 indexes = range(key_count)
160 key.remove_vpp_config()
162 for j in range(key_count):
165 self.assertFalse(key.query_vpp_config())
167 self.assertTrue(key.query_vpp_config())
168 # should be removed now
170 self.assertFalse(key.query_vpp_config())
171 # add back and remove again
175 self.assertTrue(key.query_vpp_config())
177 key.remove_vpp_config()
179 self.assertFalse(key.query_vpp_config())
181 def test_add_bfd_sha1(self):
182 """ create a BFD session (SHA1) """
183 key = self.factory.create_random_key(self)
185 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
187 session.add_vpp_config()
188 self.logger.debug("Session state is %s", session.state)
189 session.remove_vpp_config()
190 session.add_vpp_config()
191 self.logger.debug("Session state is %s", session.state)
192 session.remove_vpp_config()
194 def test_double_add_sha1(self):
195 """ create the same BFD session twice (negative case) (SHA1) """
196 key = self.factory.create_random_key(self)
198 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
200 session.add_vpp_config()
201 with self.assertRaises(Exception):
202 session.add_vpp_config()
204 def test_add_auth_nonexistent_key(self):
205 """ create BFD session using non-existent SHA1 (negative case) """
206 session = VppBFDUDPSession(
207 self, self.pg0, self.pg0.remote_ip4,
208 sha1_key=self.factory.create_random_key(self))
209 with self.assertRaises(Exception):
210 session.add_vpp_config()
212 def test_shared_sha1_key(self):
213 """ share single SHA1 key between multiple BFD sessions """
214 key = self.factory.create_random_key(self)
217 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
219 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
220 sha1_key=key, af=AF_INET6),
221 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
223 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
224 sha1_key=key, af=AF_INET6)]
229 e = key.get_bfd_auth_keys_dump_entry()
230 self.assert_equal(e.use_count, len(sessions) - removed,
231 "Use count for shared key")
232 s.remove_vpp_config()
234 e = key.get_bfd_auth_keys_dump_entry()
235 self.assert_equal(e.use_count, len(sessions) - removed,
236 "Use count for shared key")
238 def test_activate_auth(self):
239 """ activate SHA1 authentication """
240 key = self.factory.create_random_key(self)
242 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
243 session.add_vpp_config()
244 session.activate_auth(key)
246 def test_deactivate_auth(self):
247 """ deactivate SHA1 authentication """
248 key = self.factory.create_random_key(self)
250 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
251 session.add_vpp_config()
252 session.activate_auth(key)
253 session.deactivate_auth()
255 def test_change_key(self):
256 """ change SHA1 key """
257 key1 = self.factory.create_random_key(self)
258 key2 = self.factory.create_random_key(self)
259 while key2.conf_key_id == key1.conf_key_id:
260 key2 = self.factory.create_random_key(self)
261 key1.add_vpp_config()
262 key2.add_vpp_config()
263 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
265 session.add_vpp_config()
266 session.activate_auth(key2)
268 def test_set_del_udp_echo_source(self):
269 """ set/del udp echo source """
270 self.create_loopback_interfaces(1)
271 self.loopback0 = self.lo_interfaces[0]
272 self.loopback0.admin_up()
273 echo_source = self.vapi.bfd_udp_get_echo_source()
274 self.assertFalse(echo_source.is_set)
275 self.assertFalse(echo_source.have_usable_ip4)
276 self.assertFalse(echo_source.have_usable_ip6)
278 self.vapi.bfd_udp_set_echo_source(
279 sw_if_index=self.loopback0.sw_if_index)
280 echo_source = self.vapi.bfd_udp_get_echo_source()
281 self.assertTrue(echo_source.is_set)
282 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
283 self.assertFalse(echo_source.have_usable_ip4)
284 self.assertFalse(echo_source.have_usable_ip6)
286 self.loopback0.config_ip4()
287 unpacked = unpack("!L", self.loopback0.local_ip4n)
288 echo_ip4 = pack("!L", unpacked[0] ^ 1)
289 echo_source = self.vapi.bfd_udp_get_echo_source()
290 self.assertTrue(echo_source.is_set)
291 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
292 self.assertTrue(echo_source.have_usable_ip4)
293 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
294 self.assertFalse(echo_source.have_usable_ip6)
296 self.loopback0.config_ip6()
297 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
298 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
300 echo_source = self.vapi.bfd_udp_get_echo_source()
301 self.assertTrue(echo_source.is_set)
302 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
303 self.assertTrue(echo_source.have_usable_ip4)
304 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
305 self.assertTrue(echo_source.have_usable_ip6)
306 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
308 self.vapi.bfd_udp_del_echo_source()
309 echo_source = self.vapi.bfd_udp_get_echo_source()
310 self.assertFalse(echo_source.is_set)
311 self.assertFalse(echo_source.have_usable_ip4)
312 self.assertFalse(echo_source.have_usable_ip6)
315 class BFDTestSession(object):
316 """ BFD session as seen from test framework side """
318 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
319 bfd_key_id=None, our_seq_number=None,
320 tunnel_header=None, phy_interface=None):
323 self.sha1_key = sha1_key
324 self.bfd_key_id = bfd_key_id
325 self.interface = interface
327 self.phy_interface = phy_interface
329 self.phy_interface = self.interface
330 self.udp_sport = randint(49152, 65535)
331 if our_seq_number is None:
332 self.our_seq_number = randint(0, 40000000)
334 self.our_seq_number = our_seq_number
335 self.vpp_seq_number = None
336 self.my_discriminator = 0
337 self.desired_min_tx = 300000
338 self.required_min_rx = 300000
339 self.required_min_echo_rx = None
340 self.detect_mult = detect_mult
341 self.diag = BFDDiagCode.no_diagnostic
342 self.your_discriminator = None
343 self.state = BFDState.down
344 self.auth_type = BFDAuthType.no_auth
345 self.tunnel_header = tunnel_header
347 def inc_seq_num(self):
348 """ increment sequence number, wrapping if needed """
349 if self.our_seq_number == 0xFFFFFFFF:
350 self.our_seq_number = 0
352 self.our_seq_number += 1
354 def update(self, my_discriminator=None, your_discriminator=None,
355 desired_min_tx=None, required_min_rx=None,
356 required_min_echo_rx=None, detect_mult=None,
357 diag=None, state=None, auth_type=None):
358 """ update BFD parameters associated with session """
359 if my_discriminator is not None:
360 self.my_discriminator = my_discriminator
361 if your_discriminator is not None:
362 self.your_discriminator = your_discriminator
363 if required_min_rx is not None:
364 self.required_min_rx = required_min_rx
365 if required_min_echo_rx is not None:
366 self.required_min_echo_rx = required_min_echo_rx
367 if desired_min_tx is not None:
368 self.desired_min_tx = desired_min_tx
369 if detect_mult is not None:
370 self.detect_mult = detect_mult
373 if state is not None:
375 if auth_type is not None:
376 self.auth_type = auth_type
378 def fill_packet_fields(self, packet):
379 """ set packet fields with known values in packet """
381 if self.my_discriminator:
382 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
383 self.my_discriminator)
384 bfd.my_discriminator = self.my_discriminator
385 if self.your_discriminator:
386 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
387 self.your_discriminator)
388 bfd.your_discriminator = self.your_discriminator
389 if self.required_min_rx:
390 self.test.logger.debug(
391 "BFD: setting packet.required_min_rx_interval=%s",
392 self.required_min_rx)
393 bfd.required_min_rx_interval = self.required_min_rx
394 if self.required_min_echo_rx:
395 self.test.logger.debug(
396 "BFD: setting packet.required_min_echo_rx=%s",
397 self.required_min_echo_rx)
398 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
399 if self.desired_min_tx:
400 self.test.logger.debug(
401 "BFD: setting packet.desired_min_tx_interval=%s",
403 bfd.desired_min_tx_interval = self.desired_min_tx
405 self.test.logger.debug(
406 "BFD: setting packet.detect_mult=%s", self.detect_mult)
407 bfd.detect_mult = self.detect_mult
409 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
412 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
413 bfd.state = self.state
415 # this is used by a negative test-case
416 self.test.logger.debug("BFD: setting packet.auth_type=%s",
418 bfd.auth_type = self.auth_type
420 def create_packet(self):
421 """ create a BFD packet, reflecting the current state of session """
424 bfd.auth_type = self.sha1_key.auth_type
425 bfd.auth_len = BFD.sha1_auth_len
426 bfd.auth_key_id = self.bfd_key_id
427 bfd.auth_seq_num = self.our_seq_number
428 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
431 packet = Ether(src=self.phy_interface.remote_mac,
432 dst=self.phy_interface.local_mac)
433 if self.tunnel_header:
434 packet = packet / self.tunnel_header
435 if self.af == AF_INET6:
437 IPv6(src=self.interface.remote_ip6,
438 dst=self.interface.local_ip6,
440 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
444 IP(src=self.interface.remote_ip4,
445 dst=self.interface.local_ip4,
447 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
449 self.test.logger.debug("BFD: Creating packet")
450 self.fill_packet_fields(packet)
452 hash_material = scapy.compat.raw(
453 packet[BFD])[:32] + self.sha1_key.key + \
454 "\0" * (20 - len(self.sha1_key.key))
455 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
456 hashlib.sha1(hash_material).hexdigest())
457 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
460 def send_packet(self, packet=None, interface=None):
461 """ send packet on interface, creating the packet if needed """
463 packet = self.create_packet()
464 if interface is None:
465 interface = self.phy_interface
466 self.test.logger.debug(ppp("Sending packet:", packet))
467 interface.add_stream(packet)
470 def verify_sha1_auth(self, packet):
471 """ Verify correctness of authentication in BFD layer. """
473 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
474 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
476 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
477 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
478 if self.vpp_seq_number is None:
479 self.vpp_seq_number = bfd.auth_seq_num
480 self.test.logger.debug("Received initial sequence number: %s" %
483 recvd_seq_num = bfd.auth_seq_num
484 self.test.logger.debug("Received followup sequence number: %s" %
486 if self.vpp_seq_number < 0xffffffff:
487 if self.sha1_key.auth_type == \
488 BFDAuthType.meticulous_keyed_sha1:
489 self.test.assert_equal(recvd_seq_num,
490 self.vpp_seq_number + 1,
491 "BFD sequence number")
493 self.test.assert_in_range(recvd_seq_num,
495 self.vpp_seq_number + 1,
496 "BFD sequence number")
498 if self.sha1_key.auth_type == \
499 BFDAuthType.meticulous_keyed_sha1:
500 self.test.assert_equal(recvd_seq_num, 0,
501 "BFD sequence number")
503 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
504 "BFD sequence number not one of "
505 "(%s, 0)" % self.vpp_seq_number)
506 self.vpp_seq_number = recvd_seq_num
507 # last 20 bytes represent the hash - so replace them with the key,
508 # pad the result with zeros and hash the result
509 hash_material = bfd.original[:-20] + self.sha1_key.key + \
510 b"\0" * (20 - len(self.sha1_key.key))
511 expected_hash = hashlib.sha1(hash_material).hexdigest()
512 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
513 expected_hash, "Auth key hash")
515 def verify_bfd(self, packet):
516 """ Verify correctness of BFD layer. """
518 self.test.assert_equal(bfd.version, 1, "BFD version")
519 self.test.assert_equal(bfd.your_discriminator,
520 self.my_discriminator,
521 "BFD - your discriminator")
523 self.verify_sha1_auth(packet)
526 def bfd_session_up(test):
527 """ Bring BFD session up """
528 test.logger.info("BFD: Waiting for slow hello")
529 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
531 if hasattr(test, 'vpp_clock_offset'):
532 old_offset = test.vpp_clock_offset
533 test.vpp_clock_offset = time.time() - p.time
534 test.logger.debug("BFD: Calculated vpp clock offset: %s",
535 test.vpp_clock_offset)
537 test.assertAlmostEqual(
538 old_offset, test.vpp_clock_offset, delta=0.5,
539 msg="vpp clock offset not stable (new: %s, old: %s)" %
540 (test.vpp_clock_offset, old_offset))
541 test.logger.info("BFD: Sending Init")
542 test.test_session.update(my_discriminator=randint(0, 40000000),
543 your_discriminator=p[BFD].my_discriminator,
545 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
546 BFDAuthType.meticulous_keyed_sha1:
547 test.test_session.inc_seq_num()
548 test.test_session.send_packet()
549 test.logger.info("BFD: Waiting for event")
550 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
551 verify_event(test, e, expected_state=BFDState.up)
552 test.logger.info("BFD: Session is Up")
553 test.test_session.update(state=BFDState.up)
554 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
555 BFDAuthType.meticulous_keyed_sha1:
556 test.test_session.inc_seq_num()
557 test.test_session.send_packet()
558 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561 def bfd_session_down(test):
562 """ Bring BFD session down """
563 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
564 test.test_session.update(state=BFDState.down)
565 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
566 BFDAuthType.meticulous_keyed_sha1:
567 test.test_session.inc_seq_num()
568 test.test_session.send_packet()
569 test.logger.info("BFD: Waiting for event")
570 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
571 verify_event(test, e, expected_state=BFDState.down)
572 test.logger.info("BFD: Session is Down")
573 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
576 def verify_bfd_session_config(test, session, state=None):
577 dump = session.get_bfd_udp_session_dump_entry()
578 test.assertIsNotNone(dump)
579 # since dump is not none, we have verified that sw_if_index and addresses
580 # are valid (in get_bfd_udp_session_dump_entry)
582 test.assert_equal(dump.state, state, "session state")
583 test.assert_equal(dump.required_min_rx, session.required_min_rx,
584 "required min rx interval")
585 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
586 "desired min tx interval")
587 test.assert_equal(dump.detect_mult, session.detect_mult,
589 if session.sha1_key is None:
590 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
592 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
593 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
595 test.assert_equal(dump.conf_key_id,
596 session.sha1_key.conf_key_id,
600 def verify_ip(test, packet):
601 """ Verify correctness of IP layer. """
602 if test.vpp_session.af == AF_INET6:
604 local_ip = test.vpp_session.interface.local_ip6
605 remote_ip = test.vpp_session.interface.remote_ip6
606 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
609 local_ip = test.vpp_session.interface.local_ip4
610 remote_ip = test.vpp_session.interface.remote_ip4
611 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
612 test.assert_equal(ip.src, local_ip, "IP source address")
613 test.assert_equal(ip.dst, remote_ip, "IP destination address")
616 def verify_udp(test, packet):
617 """ Verify correctness of UDP layer. """
619 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
620 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
624 def verify_event(test, event, expected_state):
625 """ Verify correctness of event values. """
627 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
628 test.assert_equal(e.sw_if_index,
629 test.vpp_session.interface.sw_if_index,
630 "BFD interface index")
632 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
633 "Local IPv6 address")
634 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
636 test.assert_equal(e.state, expected_state, BFDState)
639 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
640 """ wait for BFD packet and verify its correctness
642 :param timeout: how long to wait
643 :param pcap_time_min: ignore packets with pcap timestamp lower than this
645 :returns: tuple (packet, time spent waiting for packet)
647 test.logger.info("BFD: Waiting for BFD packet")
648 deadline = time.time() + timeout
653 test.assert_in_range(counter, 0, 100, "number of packets ignored")
654 time_left = deadline - time.time()
656 raise CaptureTimeoutError("Packet did not arrive within timeout")
657 p = test.pg0.wait_for_packet(timeout=time_left)
658 test.logger.debug(ppp("BFD: Got packet:", p))
659 if pcap_time_min is not None and p.time < pcap_time_min:
660 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
661 "pcap time min %s):" %
662 (p.time, pcap_time_min), p))
666 # strip an IP layer and move to the next
671 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
673 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
676 test.test_session.verify_bfd(p)
680 @unittest.skipUnless(running_extended_tests, "part of extended tests")
681 class BFD4TestCase(VppTestCase):
682 """Bidirectional Forwarding Detection (BFD)"""
685 vpp_clock_offset = None
691 super(BFD4TestCase, cls).setUpClass()
692 cls.vapi.cli("set log class bfd level debug")
694 cls.create_pg_interfaces([0])
695 cls.create_loopback_interfaces(1)
696 cls.loopback0 = cls.lo_interfaces[0]
697 cls.loopback0.config_ip4()
698 cls.loopback0.admin_up()
700 cls.pg0.configure_ipv4_neighbors()
702 cls.pg0.resolve_arp()
705 super(BFD4TestCase, cls).tearDownClass()
709 def tearDownClass(cls):
710 super(BFD4TestCase, cls).tearDownClass()
713 super(BFD4TestCase, self).setUp()
714 self.factory = AuthKeyFactory()
715 self.vapi.want_bfd_events()
716 self.pg0.enable_capture()
718 self.vpp_session = VppBFDUDPSession(self, self.pg0,
720 self.vpp_session.add_vpp_config()
721 self.vpp_session.admin_up()
722 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
723 except BaseException:
724 self.vapi.want_bfd_events(enable_disable=0)
728 if not self.vpp_dead:
729 self.vapi.want_bfd_events(enable_disable=0)
730 self.vapi.collect_events() # clear the event queue
731 super(BFD4TestCase, self).tearDown()
733 def test_session_up(self):
734 """ bring BFD session up """
737 def test_session_up_by_ip(self):
738 """ bring BFD session up - first frame looked up by address pair """
739 self.logger.info("BFD: Sending Slow control frame")
740 self.test_session.update(my_discriminator=randint(0, 40000000))
741 self.test_session.send_packet()
742 self.pg0.enable_capture()
743 p = self.pg0.wait_for_packet(1)
744 self.assert_equal(p[BFD].your_discriminator,
745 self.test_session.my_discriminator,
746 "BFD - your discriminator")
747 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
748 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
750 self.logger.info("BFD: Waiting for event")
751 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
752 verify_event(self, e, expected_state=BFDState.init)
753 self.logger.info("BFD: Sending Up")
754 self.test_session.send_packet()
755 self.logger.info("BFD: Waiting for event")
756 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
757 verify_event(self, e, expected_state=BFDState.up)
758 self.logger.info("BFD: Session is Up")
759 self.test_session.update(state=BFDState.up)
760 self.test_session.send_packet()
761 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
763 def test_session_down(self):
764 """ bring BFD session down """
766 bfd_session_down(self)
768 @unittest.skipUnless(running_extended_tests, "part of extended tests")
769 def test_hold_up(self):
770 """ hold BFD session up """
772 for dummy in range(self.test_session.detect_mult * 2):
773 wait_for_bfd_packet(self)
774 self.test_session.send_packet()
775 self.assert_equal(len(self.vapi.collect_events()), 0,
776 "number of bfd events")
778 @unittest.skipUnless(running_extended_tests, "part of extended tests")
779 def test_slow_timer(self):
780 """ verify slow periodic control frames while session down """
782 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
783 prev_packet = wait_for_bfd_packet(self, 2)
784 for dummy in range(packet_count):
785 next_packet = wait_for_bfd_packet(self, 2)
786 time_diff = next_packet.time - prev_packet.time
787 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
788 # to work around timing issues
789 self.assert_in_range(
790 time_diff, 0.70, 1.05, "time between slow packets")
791 prev_packet = next_packet
793 @unittest.skipUnless(running_extended_tests, "part of extended tests")
794 def test_zero_remote_min_rx(self):
795 """ no packets when zero remote required min rx interval """
797 self.test_session.update(required_min_rx=0)
798 self.test_session.send_packet()
799 for dummy in range(self.test_session.detect_mult):
800 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
801 "sleep before transmitting bfd packet")
802 self.test_session.send_packet()
804 p = wait_for_bfd_packet(self, timeout=0)
805 self.logger.error(ppp("Received unexpected packet:", p))
806 except CaptureTimeoutError:
809 len(self.vapi.collect_events()), 0, "number of bfd events")
810 self.test_session.update(required_min_rx=300000)
811 for dummy in range(3):
812 self.test_session.send_packet()
814 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
816 len(self.vapi.collect_events()), 0, "number of bfd events")
818 @unittest.skipUnless(running_extended_tests, "part of extended tests")
819 def test_conn_down(self):
820 """ verify session goes down after inactivity """
822 detection_time = self.test_session.detect_mult *\
823 self.vpp_session.required_min_rx / USEC_IN_SEC
824 self.sleep(detection_time, "waiting for BFD session time-out")
825 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
826 verify_event(self, e, expected_state=BFDState.down)
828 @unittest.skipUnless(running_extended_tests, "part of extended tests")
829 def test_large_required_min_rx(self):
830 """ large remote required min rx interval """
832 p = wait_for_bfd_packet(self)
834 self.test_session.update(required_min_rx=interval)
835 self.test_session.send_packet()
836 time_mark = time.time()
838 # busy wait here, trying to collect a packet or event, vpp is not
839 # allowed to send packets and the session will timeout first - so the
840 # Up->Down event must arrive before any packets do
841 while time.time() < time_mark + interval / USEC_IN_SEC:
843 p = wait_for_bfd_packet(self, timeout=0)
844 # if vpp managed to send a packet before we did the session
845 # session update, then that's fine, ignore it
846 if p.time < time_mark - self.vpp_clock_offset:
848 self.logger.error(ppp("Received unexpected packet:", p))
850 except CaptureTimeoutError:
852 events = self.vapi.collect_events()
854 verify_event(self, events[0], BFDState.down)
856 self.assert_equal(count, 0, "number of packets received")
858 @unittest.skipUnless(running_extended_tests, "part of extended tests")
859 def test_immediate_remote_min_rx_reduction(self):
860 """ immediately honor remote required min rx reduction """
861 self.vpp_session.remove_vpp_config()
862 self.vpp_session = VppBFDUDPSession(
863 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
864 self.pg0.enable_capture()
865 self.vpp_session.add_vpp_config()
866 self.test_session.update(desired_min_tx=1000000,
867 required_min_rx=1000000)
869 reference_packet = wait_for_bfd_packet(self)
870 time_mark = time.time()
872 self.test_session.update(required_min_rx=interval)
873 self.test_session.send_packet()
874 extra_time = time.time() - time_mark
875 p = wait_for_bfd_packet(self)
876 # first packet is allowed to be late by time we spent doing the update
877 # calculated in extra_time
878 self.assert_in_range(p.time - reference_packet.time,
879 .95 * 0.75 * interval / USEC_IN_SEC,
880 1.05 * interval / USEC_IN_SEC + extra_time,
881 "time between BFD packets")
883 for dummy in range(3):
884 p = wait_for_bfd_packet(self)
885 diff = p.time - reference_packet.time
886 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
887 1.05 * interval / USEC_IN_SEC,
888 "time between BFD packets")
891 @unittest.skipUnless(running_extended_tests, "part of extended tests")
892 def test_modify_req_min_rx_double(self):
893 """ modify session - double required min rx """
895 p = wait_for_bfd_packet(self)
896 self.test_session.update(desired_min_tx=10000,
897 required_min_rx=10000)
898 self.test_session.send_packet()
899 # double required min rx
900 self.vpp_session.modify_parameters(
901 required_min_rx=2 * self.vpp_session.required_min_rx)
902 p = wait_for_bfd_packet(
903 self, pcap_time_min=time.time() - self.vpp_clock_offset)
904 # poll bit needs to be set
905 self.assertIn("P", p.sprintf("%BFD.flags%"),
906 "Poll bit not set in BFD packet")
907 # finish poll sequence with final packet
908 final = self.test_session.create_packet()
909 final[BFD].flags = "F"
910 timeout = self.test_session.detect_mult * \
911 max(self.test_session.desired_min_tx,
912 self.vpp_session.required_min_rx) / USEC_IN_SEC
913 self.test_session.send_packet(final)
914 time_mark = time.time()
915 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
916 verify_event(self, e, expected_state=BFDState.down)
917 time_to_event = time.time() - time_mark
918 self.assert_in_range(time_to_event, .9 * timeout,
919 1.1 * timeout, "session timeout")
921 @unittest.skipUnless(running_extended_tests, "part of extended tests")
922 def test_modify_req_min_rx_halve(self):
923 """ modify session - halve required min rx """
924 self.vpp_session.modify_parameters(
925 required_min_rx=2 * self.vpp_session.required_min_rx)
927 p = wait_for_bfd_packet(self)
928 self.test_session.update(desired_min_tx=10000,
929 required_min_rx=10000)
930 self.test_session.send_packet()
931 p = wait_for_bfd_packet(
932 self, pcap_time_min=time.time() - self.vpp_clock_offset)
933 # halve required min rx
934 old_required_min_rx = self.vpp_session.required_min_rx
935 self.vpp_session.modify_parameters(
936 required_min_rx=self.vpp_session.required_min_rx // 2)
937 # now we wait 0.8*3*old-req-min-rx and the session should still be up
938 self.sleep(0.8 * self.vpp_session.detect_mult *
939 old_required_min_rx / USEC_IN_SEC,
940 "wait before finishing poll sequence")
941 self.assert_equal(len(self.vapi.collect_events()), 0,
942 "number of bfd events")
943 p = wait_for_bfd_packet(self)
944 # poll bit needs to be set
945 self.assertIn("P", p.sprintf("%BFD.flags%"),
946 "Poll bit not set in BFD packet")
947 # finish poll sequence with final packet
948 final = self.test_session.create_packet()
949 final[BFD].flags = "F"
950 self.test_session.send_packet(final)
951 # now the session should time out under new conditions
952 detection_time = self.test_session.detect_mult *\
953 self.vpp_session.required_min_rx / USEC_IN_SEC
955 e = self.vapi.wait_for_event(
956 2 * detection_time, "bfd_udp_session_details")
958 self.assert_in_range(after - before,
959 0.9 * detection_time,
960 1.1 * detection_time,
961 "time before bfd session goes down")
962 verify_event(self, e, expected_state=BFDState.down)
964 @unittest.skipUnless(running_extended_tests, "part of extended tests")
965 def test_modify_detect_mult(self):
966 """ modify detect multiplier """
968 p = wait_for_bfd_packet(self)
969 self.vpp_session.modify_parameters(detect_mult=1)
970 p = wait_for_bfd_packet(
971 self, pcap_time_min=time.time() - self.vpp_clock_offset)
972 self.assert_equal(self.vpp_session.detect_mult,
975 # poll bit must not be set
976 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
977 "Poll bit not set in BFD packet")
978 self.vpp_session.modify_parameters(detect_mult=10)
979 p = wait_for_bfd_packet(
980 self, pcap_time_min=time.time() - self.vpp_clock_offset)
981 self.assert_equal(self.vpp_session.detect_mult,
984 # poll bit must not be set
985 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
986 "Poll bit not set in BFD packet")
988 @unittest.skipUnless(running_extended_tests, "part of extended tests")
989 def test_queued_poll(self):
990 """ test poll sequence queueing """
992 p = wait_for_bfd_packet(self)
993 self.vpp_session.modify_parameters(
994 required_min_rx=2 * self.vpp_session.required_min_rx)
995 p = wait_for_bfd_packet(self)
996 poll_sequence_start = time.time()
997 poll_sequence_length_min = 0.5
998 send_final_after = time.time() + poll_sequence_length_min
999 # poll bit needs to be set
1000 self.assertIn("P", p.sprintf("%BFD.flags%"),
1001 "Poll bit not set in BFD packet")
1002 self.assert_equal(p[BFD].required_min_rx_interval,
1003 self.vpp_session.required_min_rx,
1004 "BFD required min rx interval")
1005 self.vpp_session.modify_parameters(
1006 required_min_rx=2 * self.vpp_session.required_min_rx)
1007 # 2nd poll sequence should be queued now
1008 # don't send the reply back yet, wait for some time to emulate
1009 # longer round-trip time
1011 while time.time() < send_final_after:
1012 self.test_session.send_packet()
1013 p = wait_for_bfd_packet(self)
1014 self.assert_equal(len(self.vapi.collect_events()), 0,
1015 "number of bfd events")
1016 self.assert_equal(p[BFD].required_min_rx_interval,
1017 self.vpp_session.required_min_rx,
1018 "BFD required min rx interval")
1020 # poll bit must be set
1021 self.assertIn("P", p.sprintf("%BFD.flags%"),
1022 "Poll bit not set in BFD packet")
1023 final = self.test_session.create_packet()
1024 final[BFD].flags = "F"
1025 self.test_session.send_packet(final)
1026 # finish 1st with final
1027 poll_sequence_length = time.time() - poll_sequence_start
1028 # vpp must wait for some time before starting new poll sequence
1029 poll_no_2_started = False
1030 for dummy in range(2 * packet_count):
1031 p = wait_for_bfd_packet(self)
1032 self.assert_equal(len(self.vapi.collect_events()), 0,
1033 "number of bfd events")
1034 if "P" in p.sprintf("%BFD.flags%"):
1035 poll_no_2_started = True
1036 if time.time() < poll_sequence_start + poll_sequence_length:
1037 raise Exception("VPP started 2nd poll sequence too soon")
1038 final = self.test_session.create_packet()
1039 final[BFD].flags = "F"
1040 self.test_session.send_packet(final)
1043 self.test_session.send_packet()
1044 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1045 # finish 2nd with final
1046 final = self.test_session.create_packet()
1047 final[BFD].flags = "F"
1048 self.test_session.send_packet(final)
1049 p = wait_for_bfd_packet(self)
1050 # poll bit must not be set
1051 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1052 "Poll bit set in BFD packet")
1054 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1055 def test_poll_response(self):
1056 """ test correct response to control frame with poll bit set """
1057 bfd_session_up(self)
1058 poll = self.test_session.create_packet()
1059 poll[BFD].flags = "P"
1060 self.test_session.send_packet(poll)
1061 final = wait_for_bfd_packet(
1062 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1063 self.assertIn("F", final.sprintf("%BFD.flags%"))
1065 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1066 def test_no_periodic_if_remote_demand(self):
1067 """ no periodic frames outside poll sequence if remote demand set """
1068 bfd_session_up(self)
1069 demand = self.test_session.create_packet()
1070 demand[BFD].flags = "D"
1071 self.test_session.send_packet(demand)
1072 transmit_time = 0.9 \
1073 * max(self.vpp_session.required_min_rx,
1074 self.test_session.desired_min_tx) \
1077 for dummy in range(self.test_session.detect_mult * 2):
1078 self.sleep(transmit_time)
1079 self.test_session.send_packet(demand)
1081 p = wait_for_bfd_packet(self, timeout=0)
1082 self.logger.error(ppp("Received unexpected packet:", p))
1084 except CaptureTimeoutError:
1086 events = self.vapi.collect_events()
1088 self.logger.error("Received unexpected event: %s", e)
1089 self.assert_equal(count, 0, "number of packets received")
1090 self.assert_equal(len(events), 0, "number of events received")
1092 def test_echo_looped_back(self):
1093 """ echo packets looped back """
1094 # don't need a session in this case..
1095 self.vpp_session.remove_vpp_config()
1096 self.pg0.enable_capture()
1097 echo_packet_count = 10
1098 # random source port low enough to increment a few times..
1099 udp_sport_tx = randint(1, 50000)
1100 udp_sport_rx = udp_sport_tx
1101 echo_packet = (Ether(src=self.pg0.remote_mac,
1102 dst=self.pg0.local_mac) /
1103 IP(src=self.pg0.remote_ip4,
1104 dst=self.pg0.remote_ip4) /
1105 UDP(dport=BFD.udp_dport_echo) /
1106 Raw("this should be looped back"))
1107 for dummy in range(echo_packet_count):
1108 self.sleep(.01, "delay between echo packets")
1109 echo_packet[UDP].sport = udp_sport_tx
1111 self.logger.debug(ppp("Sending packet:", echo_packet))
1112 self.pg0.add_stream(echo_packet)
1114 for dummy in range(echo_packet_count):
1115 p = self.pg0.wait_for_packet(1)
1116 self.logger.debug(ppp("Got packet:", p))
1118 self.assert_equal(self.pg0.remote_mac,
1119 ether.dst, "Destination MAC")
1120 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1122 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1123 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1125 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1126 "UDP destination port")
1127 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1129 # need to compare the hex payload here, otherwise BFD_vpp_echo
1131 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1132 scapy.compat.raw(echo_packet[UDP].payload),
1133 "Received packet is not the echo packet sent")
1134 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1135 "ECHO packet identifier for test purposes)")
1137 def test_echo(self):
1138 """ echo function """
1139 bfd_session_up(self)
1140 self.test_session.update(required_min_echo_rx=150000)
1141 self.test_session.send_packet()
1142 detection_time = self.test_session.detect_mult *\
1143 self.vpp_session.required_min_rx / USEC_IN_SEC
1144 # echo shouldn't work without echo source set
1145 for dummy in range(10):
1146 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1147 self.sleep(sleep, "delay before sending bfd packet")
1148 self.test_session.send_packet()
1149 p = wait_for_bfd_packet(
1150 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1151 self.assert_equal(p[BFD].required_min_rx_interval,
1152 self.vpp_session.required_min_rx,
1153 "BFD required min rx interval")
1154 self.test_session.send_packet()
1155 self.vapi.bfd_udp_set_echo_source(
1156 sw_if_index=self.loopback0.sw_if_index)
1158 # should be turned on - loopback echo packets
1159 for dummy in range(3):
1160 loop_until = time.time() + 0.75 * detection_time
1161 while time.time() < loop_until:
1162 p = self.pg0.wait_for_packet(1)
1163 self.logger.debug(ppp("Got packet:", p))
1164 if p[UDP].dport == BFD.udp_dport_echo:
1166 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1167 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1168 "BFD ECHO src IP equal to loopback IP")
1169 self.logger.debug(ppp("Looping back packet:", p))
1170 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1171 "ECHO packet destination MAC address")
1172 p[Ether].dst = self.pg0.local_mac
1173 self.pg0.add_stream(p)
1176 elif p.haslayer(BFD):
1178 self.assertGreaterEqual(
1179 p[BFD].required_min_rx_interval,
1181 if "P" in p.sprintf("%BFD.flags%"):
1182 final = self.test_session.create_packet()
1183 final[BFD].flags = "F"
1184 self.test_session.send_packet(final)
1186 raise Exception(ppp("Received unknown packet:", p))
1188 self.assert_equal(len(self.vapi.collect_events()), 0,
1189 "number of bfd events")
1190 self.test_session.send_packet()
1191 self.assertTrue(echo_seen, "No echo packets received")
1193 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1194 def test_echo_fail(self):
1195 """ session goes down if echo function fails """
1196 bfd_session_up(self)
1197 self.test_session.update(required_min_echo_rx=150000)
1198 self.test_session.send_packet()
1199 detection_time = self.test_session.detect_mult *\
1200 self.vpp_session.required_min_rx / USEC_IN_SEC
1201 self.vapi.bfd_udp_set_echo_source(
1202 sw_if_index=self.loopback0.sw_if_index)
1203 # echo function should be used now, but we will drop the echo packets
1204 verified_diag = False
1205 for dummy in range(3):
1206 loop_until = time.time() + 0.75 * detection_time
1207 while time.time() < loop_until:
1208 p = self.pg0.wait_for_packet(1)
1209 self.logger.debug(ppp("Got packet:", p))
1210 if p[UDP].dport == BFD.udp_dport_echo:
1213 elif p.haslayer(BFD):
1214 if "P" in p.sprintf("%BFD.flags%"):
1215 self.assertGreaterEqual(
1216 p[BFD].required_min_rx_interval,
1218 final = self.test_session.create_packet()
1219 final[BFD].flags = "F"
1220 self.test_session.send_packet(final)
1221 if p[BFD].state == BFDState.down:
1222 self.assert_equal(p[BFD].diag,
1223 BFDDiagCode.echo_function_failed,
1225 verified_diag = True
1227 raise Exception(ppp("Received unknown packet:", p))
1228 self.test_session.send_packet()
1229 events = self.vapi.collect_events()
1230 self.assert_equal(len(events), 1, "number of bfd events")
1231 self.assert_equal(events[0].state, BFDState.down, BFDState)
1232 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1234 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1235 def test_echo_stop(self):
1236 """ echo function stops if peer sets required min echo rx zero """
1237 bfd_session_up(self)
1238 self.test_session.update(required_min_echo_rx=150000)
1239 self.test_session.send_packet()
1240 self.vapi.bfd_udp_set_echo_source(
1241 sw_if_index=self.loopback0.sw_if_index)
1242 # wait for first echo packet
1244 p = self.pg0.wait_for_packet(1)
1245 self.logger.debug(ppp("Got packet:", p))
1246 if p[UDP].dport == BFD.udp_dport_echo:
1247 self.logger.debug(ppp("Looping back packet:", p))
1248 p[Ether].dst = self.pg0.local_mac
1249 self.pg0.add_stream(p)
1252 elif p.haslayer(BFD):
1256 raise Exception(ppp("Received unknown packet:", p))
1257 self.test_session.update(required_min_echo_rx=0)
1258 self.test_session.send_packet()
1259 # echo packets shouldn't arrive anymore
1260 for dummy in range(5):
1261 wait_for_bfd_packet(
1262 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1263 self.test_session.send_packet()
1264 events = self.vapi.collect_events()
1265 self.assert_equal(len(events), 0, "number of bfd events")
1267 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1268 def test_echo_source_removed(self):
1269 """ echo function stops if echo source is removed """
1270 bfd_session_up(self)
1271 self.test_session.update(required_min_echo_rx=150000)
1272 self.test_session.send_packet()
1273 self.vapi.bfd_udp_set_echo_source(
1274 sw_if_index=self.loopback0.sw_if_index)
1275 # wait for first echo packet
1277 p = self.pg0.wait_for_packet(1)
1278 self.logger.debug(ppp("Got packet:", p))
1279 if p[UDP].dport == BFD.udp_dport_echo:
1280 self.logger.debug(ppp("Looping back packet:", p))
1281 p[Ether].dst = self.pg0.local_mac
1282 self.pg0.add_stream(p)
1285 elif p.haslayer(BFD):
1289 raise Exception(ppp("Received unknown packet:", p))
1290 self.vapi.bfd_udp_del_echo_source()
1291 self.test_session.send_packet()
1292 # echo packets shouldn't arrive anymore
1293 for dummy in range(5):
1294 wait_for_bfd_packet(
1295 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1296 self.test_session.send_packet()
1297 events = self.vapi.collect_events()
1298 self.assert_equal(len(events), 0, "number of bfd events")
1300 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1301 def test_stale_echo(self):
1302 """ stale echo packets don't keep a session up """
1303 bfd_session_up(self)
1304 self.test_session.update(required_min_echo_rx=150000)
1305 self.vapi.bfd_udp_set_echo_source(
1306 sw_if_index=self.loopback0.sw_if_index)
1307 self.test_session.send_packet()
1308 # should be turned on - loopback echo packets
1312 for dummy in range(10 * self.vpp_session.detect_mult):
1313 p = self.pg0.wait_for_packet(1)
1314 if p[UDP].dport == BFD.udp_dport_echo:
1315 if echo_packet is None:
1316 self.logger.debug(ppp("Got first echo packet:", p))
1318 timeout_at = time.time() + self.vpp_session.detect_mult * \
1319 self.test_session.required_min_echo_rx / USEC_IN_SEC
1321 self.logger.debug(ppp("Got followup echo packet:", p))
1322 self.logger.debug(ppp("Looping back first echo packet:", p))
1323 echo_packet[Ether].dst = self.pg0.local_mac
1324 self.pg0.add_stream(echo_packet)
1326 elif p.haslayer(BFD):
1327 self.logger.debug(ppp("Got packet:", p))
1328 if "P" in p.sprintf("%BFD.flags%"):
1329 final = self.test_session.create_packet()
1330 final[BFD].flags = "F"
1331 self.test_session.send_packet(final)
1332 if p[BFD].state == BFDState.down:
1333 self.assertIsNotNone(
1335 "Session went down before first echo packet received")
1337 self.assertGreaterEqual(
1339 "Session timeout at %s, but is expected at %s" %
1341 self.assert_equal(p[BFD].diag,
1342 BFDDiagCode.echo_function_failed,
1344 events = self.vapi.collect_events()
1345 self.assert_equal(len(events), 1, "number of bfd events")
1346 self.assert_equal(events[0].state, BFDState.down, BFDState)
1350 raise Exception(ppp("Received unknown packet:", p))
1351 self.test_session.send_packet()
1352 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1354 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1355 def test_invalid_echo_checksum(self):
1356 """ echo packets with invalid checksum don't keep a session up """
1357 bfd_session_up(self)
1358 self.test_session.update(required_min_echo_rx=150000)
1359 self.vapi.bfd_udp_set_echo_source(
1360 sw_if_index=self.loopback0.sw_if_index)
1361 self.test_session.send_packet()
1362 # should be turned on - loopback echo packets
1365 for dummy in range(10 * self.vpp_session.detect_mult):
1366 p = self.pg0.wait_for_packet(1)
1367 if p[UDP].dport == BFD.udp_dport_echo:
1368 self.logger.debug(ppp("Got echo packet:", p))
1369 if timeout_at is None:
1370 timeout_at = time.time() + self.vpp_session.detect_mult * \
1371 self.test_session.required_min_echo_rx / USEC_IN_SEC
1372 p[BFD_vpp_echo].checksum = getrandbits(64)
1373 p[Ether].dst = self.pg0.local_mac
1374 self.logger.debug(ppp("Looping back modified echo packet:", p))
1375 self.pg0.add_stream(p)
1377 elif p.haslayer(BFD):
1378 self.logger.debug(ppp("Got packet:", p))
1379 if "P" in p.sprintf("%BFD.flags%"):
1380 final = self.test_session.create_packet()
1381 final[BFD].flags = "F"
1382 self.test_session.send_packet(final)
1383 if p[BFD].state == BFDState.down:
1384 self.assertIsNotNone(
1386 "Session went down before first echo packet received")
1388 self.assertGreaterEqual(
1390 "Session timeout at %s, but is expected at %s" %
1392 self.assert_equal(p[BFD].diag,
1393 BFDDiagCode.echo_function_failed,
1395 events = self.vapi.collect_events()
1396 self.assert_equal(len(events), 1, "number of bfd events")
1397 self.assert_equal(events[0].state, BFDState.down, BFDState)
1401 raise Exception(ppp("Received unknown packet:", p))
1402 self.test_session.send_packet()
1403 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1405 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1406 def test_admin_up_down(self):
1407 """ put session admin-up and admin-down """
1408 bfd_session_up(self)
1409 self.vpp_session.admin_down()
1410 self.pg0.enable_capture()
1411 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1412 verify_event(self, e, expected_state=BFDState.admin_down)
1413 for dummy in range(2):
1414 p = wait_for_bfd_packet(self)
1415 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1416 # try to bring session up - shouldn't be possible
1417 self.test_session.update(state=BFDState.init)
1418 self.test_session.send_packet()
1419 for dummy in range(2):
1420 p = wait_for_bfd_packet(self)
1421 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1422 self.vpp_session.admin_up()
1423 self.test_session.update(state=BFDState.down)
1424 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1425 verify_event(self, e, expected_state=BFDState.down)
1426 p = wait_for_bfd_packet(
1427 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1428 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1429 self.test_session.send_packet()
1430 p = wait_for_bfd_packet(
1431 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1432 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1433 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1434 verify_event(self, e, expected_state=BFDState.init)
1435 self.test_session.update(state=BFDState.up)
1436 self.test_session.send_packet()
1437 p = wait_for_bfd_packet(
1438 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1439 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1440 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1441 verify_event(self, e, expected_state=BFDState.up)
1443 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1444 def test_config_change_remote_demand(self):
1445 """ configuration change while peer in demand mode """
1446 bfd_session_up(self)
1447 demand = self.test_session.create_packet()
1448 demand[BFD].flags = "D"
1449 self.test_session.send_packet(demand)
1450 self.vpp_session.modify_parameters(
1451 required_min_rx=2 * self.vpp_session.required_min_rx)
1452 p = wait_for_bfd_packet(
1453 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1454 # poll bit must be set
1455 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1456 # terminate poll sequence
1457 final = self.test_session.create_packet()
1458 final[BFD].flags = "D+F"
1459 self.test_session.send_packet(final)
1460 # vpp should be quiet now again
1461 transmit_time = 0.9 \
1462 * max(self.vpp_session.required_min_rx,
1463 self.test_session.desired_min_tx) \
1466 for dummy in range(self.test_session.detect_mult * 2):
1467 self.sleep(transmit_time)
1468 self.test_session.send_packet(demand)
1470 p = wait_for_bfd_packet(self, timeout=0)
1471 self.logger.error(ppp("Received unexpected packet:", p))
1473 except CaptureTimeoutError:
1475 events = self.vapi.collect_events()
1477 self.logger.error("Received unexpected event: %s", e)
1478 self.assert_equal(count, 0, "number of packets received")
1479 self.assert_equal(len(events), 0, "number of events received")
1481 def test_intf_deleted(self):
1482 """ interface with bfd session deleted """
1483 intf = VppLoInterface(self)
1486 sw_if_index = intf.sw_if_index
1487 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1488 vpp_session.add_vpp_config()
1489 vpp_session.admin_up()
1490 intf.remove_vpp_config()
1491 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1492 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1493 self.assertFalse(vpp_session.query_vpp_config())
1496 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1497 class BFD6TestCase(VppTestCase):
1498 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1501 vpp_clock_offset = None
1506 def setUpClass(cls):
1507 super(BFD6TestCase, cls).setUpClass()
1508 cls.vapi.cli("set log class bfd level debug")
1510 cls.create_pg_interfaces([0])
1511 cls.pg0.config_ip6()
1512 cls.pg0.configure_ipv6_neighbors()
1514 cls.pg0.resolve_ndp()
1515 cls.create_loopback_interfaces(1)
1516 cls.loopback0 = cls.lo_interfaces[0]
1517 cls.loopback0.config_ip6()
1518 cls.loopback0.admin_up()
1521 super(BFD6TestCase, cls).tearDownClass()
1525 def tearDownClass(cls):
1526 super(BFD6TestCase, cls).tearDownClass()
1529 super(BFD6TestCase, self).setUp()
1530 self.factory = AuthKeyFactory()
1531 self.vapi.want_bfd_events()
1532 self.pg0.enable_capture()
1534 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1535 self.pg0.remote_ip6,
1537 self.vpp_session.add_vpp_config()
1538 self.vpp_session.admin_up()
1539 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1540 self.logger.debug(self.vapi.cli("show adj nbr"))
1541 except BaseException:
1542 self.vapi.want_bfd_events(enable_disable=0)
1546 if not self.vpp_dead:
1547 self.vapi.want_bfd_events(enable_disable=0)
1548 self.vapi.collect_events() # clear the event queue
1549 super(BFD6TestCase, self).tearDown()
1551 def test_session_up(self):
1552 """ bring BFD session up """
1553 bfd_session_up(self)
1555 def test_session_up_by_ip(self):
1556 """ bring BFD session up - first frame looked up by address pair """
1557 self.logger.info("BFD: Sending Slow control frame")
1558 self.test_session.update(my_discriminator=randint(0, 40000000))
1559 self.test_session.send_packet()
1560 self.pg0.enable_capture()
1561 p = self.pg0.wait_for_packet(1)
1562 self.assert_equal(p[BFD].your_discriminator,
1563 self.test_session.my_discriminator,
1564 "BFD - your discriminator")
1565 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1566 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1568 self.logger.info("BFD: Waiting for event")
1569 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1570 verify_event(self, e, expected_state=BFDState.init)
1571 self.logger.info("BFD: Sending Up")
1572 self.test_session.send_packet()
1573 self.logger.info("BFD: Waiting for event")
1574 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1575 verify_event(self, e, expected_state=BFDState.up)
1576 self.logger.info("BFD: Session is Up")
1577 self.test_session.update(state=BFDState.up)
1578 self.test_session.send_packet()
1579 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1581 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1582 def test_hold_up(self):
1583 """ hold BFD session up """
1584 bfd_session_up(self)
1585 for dummy in range(self.test_session.detect_mult * 2):
1586 wait_for_bfd_packet(self)
1587 self.test_session.send_packet()
1588 self.assert_equal(len(self.vapi.collect_events()), 0,
1589 "number of bfd events")
1590 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1592 def test_echo_looped_back(self):
1593 """ echo packets looped back """
1594 # don't need a session in this case..
1595 self.vpp_session.remove_vpp_config()
1596 self.pg0.enable_capture()
1597 echo_packet_count = 10
1598 # random source port low enough to increment a few times..
1599 udp_sport_tx = randint(1, 50000)
1600 udp_sport_rx = udp_sport_tx
1601 echo_packet = (Ether(src=self.pg0.remote_mac,
1602 dst=self.pg0.local_mac) /
1603 IPv6(src=self.pg0.remote_ip6,
1604 dst=self.pg0.remote_ip6) /
1605 UDP(dport=BFD.udp_dport_echo) /
1606 Raw("this should be looped back"))
1607 for dummy in range(echo_packet_count):
1608 self.sleep(.01, "delay between echo packets")
1609 echo_packet[UDP].sport = udp_sport_tx
1611 self.logger.debug(ppp("Sending packet:", echo_packet))
1612 self.pg0.add_stream(echo_packet)
1614 for dummy in range(echo_packet_count):
1615 p = self.pg0.wait_for_packet(1)
1616 self.logger.debug(ppp("Got packet:", p))
1618 self.assert_equal(self.pg0.remote_mac,
1619 ether.dst, "Destination MAC")
1620 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1622 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1623 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1625 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1626 "UDP destination port")
1627 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1629 # need to compare the hex payload here, otherwise BFD_vpp_echo
1631 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1632 scapy.compat.raw(echo_packet[UDP].payload),
1633 "Received packet is not the echo packet sent")
1634 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1635 "ECHO packet identifier for test purposes)")
1636 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1637 "ECHO packet identifier for test purposes)")
1639 def test_echo(self):
1640 """ echo function """
1641 bfd_session_up(self)
1642 self.test_session.update(required_min_echo_rx=150000)
1643 self.test_session.send_packet()
1644 detection_time = self.test_session.detect_mult *\
1645 self.vpp_session.required_min_rx / USEC_IN_SEC
1646 # echo shouldn't work without echo source set
1647 for dummy in range(10):
1648 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1649 self.sleep(sleep, "delay before sending bfd packet")
1650 self.test_session.send_packet()
1651 p = wait_for_bfd_packet(
1652 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1653 self.assert_equal(p[BFD].required_min_rx_interval,
1654 self.vpp_session.required_min_rx,
1655 "BFD required min rx interval")
1656 self.test_session.send_packet()
1657 self.vapi.bfd_udp_set_echo_source(
1658 sw_if_index=self.loopback0.sw_if_index)
1660 # should be turned on - loopback echo packets
1661 for dummy in range(3):
1662 loop_until = time.time() + 0.75 * detection_time
1663 while time.time() < loop_until:
1664 p = self.pg0.wait_for_packet(1)
1665 self.logger.debug(ppp("Got packet:", p))
1666 if p[UDP].dport == BFD.udp_dport_echo:
1668 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1669 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1670 "BFD ECHO src IP equal to loopback IP")
1671 self.logger.debug(ppp("Looping back packet:", p))
1672 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1673 "ECHO packet destination MAC address")
1674 p[Ether].dst = self.pg0.local_mac
1675 self.pg0.add_stream(p)
1678 elif p.haslayer(BFD):
1680 self.assertGreaterEqual(
1681 p[BFD].required_min_rx_interval,
1683 if "P" in p.sprintf("%BFD.flags%"):
1684 final = self.test_session.create_packet()
1685 final[BFD].flags = "F"
1686 self.test_session.send_packet(final)
1688 raise Exception(ppp("Received unknown packet:", p))
1690 self.assert_equal(len(self.vapi.collect_events()), 0,
1691 "number of bfd events")
1692 self.test_session.send_packet()
1693 self.assertTrue(echo_seen, "No echo packets received")
1695 def test_intf_deleted(self):
1696 """ interface with bfd session deleted """
1697 intf = VppLoInterface(self)
1700 sw_if_index = intf.sw_if_index
1701 vpp_session = VppBFDUDPSession(
1702 self, intf, intf.remote_ip6, af=AF_INET6)
1703 vpp_session.add_vpp_config()
1704 vpp_session.admin_up()
1705 intf.remove_vpp_config()
1706 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1707 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1708 self.assertFalse(vpp_session.query_vpp_config())
1711 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1712 class BFDFIBTestCase(VppTestCase):
1713 """ BFD-FIB interactions (IPv6) """
1719 def setUpClass(cls):
1720 super(BFDFIBTestCase, cls).setUpClass()
1723 def tearDownClass(cls):
1724 super(BFDFIBTestCase, cls).tearDownClass()
1727 super(BFDFIBTestCase, self).setUp()
1728 self.create_pg_interfaces(range(1))
1730 self.vapi.want_bfd_events()
1731 self.pg0.enable_capture()
1733 for i in self.pg_interfaces:
1736 i.configure_ipv6_neighbors()
1739 if not self.vpp_dead:
1740 self.vapi.want_bfd_events(enable_disable=False)
1742 super(BFDFIBTestCase, self).tearDown()
1745 def pkt_is_not_data_traffic(p):
1746 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1747 if p.haslayer(BFD) or is_ipv6_misc(p):
1751 def test_session_with_fib(self):
1752 """ BFD-FIB interactions """
1754 # packets to match against both of the routes
1755 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1756 IPv6(src="3001::1", dst="2001::1") /
1757 UDP(sport=1234, dport=1234) /
1759 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1760 IPv6(src="3001::1", dst="2002::1") /
1761 UDP(sport=1234, dport=1234) /
1764 # A recursive and a non-recursive route via a next-hop that
1765 # will have a BFD session
1766 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1767 [VppRoutePath(self.pg0.remote_ip6,
1768 self.pg0.sw_if_index)])
1769 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1770 [VppRoutePath(self.pg0.remote_ip6,
1772 ip_2001_s_64.add_vpp_config()
1773 ip_2002_s_64.add_vpp_config()
1775 # bring the session up now the routes are present
1776 self.vpp_session = VppBFDUDPSession(self,
1778 self.pg0.remote_ip6,
1780 self.vpp_session.add_vpp_config()
1781 self.vpp_session.admin_up()
1782 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1784 # session is up - traffic passes
1785 bfd_session_up(self)
1787 self.pg0.add_stream(p)
1790 captured = self.pg0.wait_for_packet(
1792 filter_out_fn=self.pkt_is_not_data_traffic)
1793 self.assertEqual(captured[IPv6].dst,
1796 # session is up - traffic is dropped
1797 bfd_session_down(self)
1799 self.pg0.add_stream(p)
1801 with self.assertRaises(CaptureTimeoutError):
1802 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1804 # session is up - traffic passes
1805 bfd_session_up(self)
1807 self.pg0.add_stream(p)
1810 captured = self.pg0.wait_for_packet(
1812 filter_out_fn=self.pkt_is_not_data_traffic)
1813 self.assertEqual(captured[IPv6].dst,
1817 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1818 class BFDTunTestCase(VppTestCase):
1819 """ BFD over GRE tunnel """
1825 def setUpClass(cls):
1826 super(BFDTunTestCase, cls).setUpClass()
1829 def tearDownClass(cls):
1830 super(BFDTunTestCase, cls).tearDownClass()
1833 super(BFDTunTestCase, self).setUp()
1834 self.create_pg_interfaces(range(1))
1836 self.vapi.want_bfd_events()
1837 self.pg0.enable_capture()
1839 for i in self.pg_interfaces:
1845 if not self.vpp_dead:
1846 self.vapi.want_bfd_events(enable_disable=0)
1848 super(BFDTunTestCase, self).tearDown()
1851 def pkt_is_not_data_traffic(p):
1852 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1853 if p.haslayer(BFD) or is_ipv6_misc(p):
1857 def test_bfd_o_gre(self):
1860 # A GRE interface over which to run a BFD session
1861 gre_if = VppGreInterface(self,
1863 self.pg0.remote_ip4)
1864 gre_if.add_vpp_config()
1868 # bring the session up now the routes are present
1869 self.vpp_session = VppBFDUDPSession(self,
1873 self.vpp_session.add_vpp_config()
1874 self.vpp_session.admin_up()
1876 self.test_session = BFDTestSession(
1877 self, gre_if, AF_INET,
1878 tunnel_header=(IP(src=self.pg0.remote_ip4,
1879 dst=self.pg0.local_ip4) /
1881 phy_interface=self.pg0)
1883 # packets to match against both of the routes
1884 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1885 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1886 UDP(sport=1234, dport=1234) /
1889 # session is up - traffic passes
1890 bfd_session_up(self)
1892 self.send_and_expect(self.pg0, p, self.pg0)
1894 # bring session down
1895 bfd_session_down(self)
1898 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1899 class BFDSHA1TestCase(VppTestCase):
1900 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1903 vpp_clock_offset = None
1908 def setUpClass(cls):
1909 super(BFDSHA1TestCase, cls).setUpClass()
1910 cls.vapi.cli("set log class bfd level debug")
1912 cls.create_pg_interfaces([0])
1913 cls.pg0.config_ip4()
1915 cls.pg0.resolve_arp()
1918 super(BFDSHA1TestCase, cls).tearDownClass()
1922 def tearDownClass(cls):
1923 super(BFDSHA1TestCase, cls).tearDownClass()
1926 super(BFDSHA1TestCase, self).setUp()
1927 self.factory = AuthKeyFactory()
1928 self.vapi.want_bfd_events()
1929 self.pg0.enable_capture()
1932 if not self.vpp_dead:
1933 self.vapi.want_bfd_events(enable_disable=False)
1934 self.vapi.collect_events() # clear the event queue
1935 super(BFDSHA1TestCase, self).tearDown()
1937 def test_session_up(self):
1938 """ bring BFD session up """
1939 key = self.factory.create_random_key(self)
1940 key.add_vpp_config()
1941 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1942 self.pg0.remote_ip4,
1944 self.vpp_session.add_vpp_config()
1945 self.vpp_session.admin_up()
1946 self.test_session = BFDTestSession(
1947 self, self.pg0, AF_INET, sha1_key=key,
1948 bfd_key_id=self.vpp_session.bfd_key_id)
1949 bfd_session_up(self)
1951 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1952 def test_hold_up(self):
1953 """ hold BFD session up """
1954 key = self.factory.create_random_key(self)
1955 key.add_vpp_config()
1956 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1957 self.pg0.remote_ip4,
1959 self.vpp_session.add_vpp_config()
1960 self.vpp_session.admin_up()
1961 self.test_session = BFDTestSession(
1962 self, self.pg0, AF_INET, sha1_key=key,
1963 bfd_key_id=self.vpp_session.bfd_key_id)
1964 bfd_session_up(self)
1965 for dummy in range(self.test_session.detect_mult * 2):
1966 wait_for_bfd_packet(self)
1967 self.test_session.send_packet()
1968 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1970 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1971 def test_hold_up_meticulous(self):
1972 """ hold BFD session up - meticulous auth """
1973 key = self.factory.create_random_key(
1974 self, BFDAuthType.meticulous_keyed_sha1)
1975 key.add_vpp_config()
1976 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1977 self.pg0.remote_ip4, sha1_key=key)
1978 self.vpp_session.add_vpp_config()
1979 self.vpp_session.admin_up()
1980 # specify sequence number so that it wraps
1981 self.test_session = BFDTestSession(
1982 self, self.pg0, AF_INET, sha1_key=key,
1983 bfd_key_id=self.vpp_session.bfd_key_id,
1984 our_seq_number=0xFFFFFFFF - 4)
1985 bfd_session_up(self)
1986 for dummy in range(30):
1987 wait_for_bfd_packet(self)
1988 self.test_session.inc_seq_num()
1989 self.test_session.send_packet()
1990 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1992 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1993 def test_send_bad_seq_number(self):
1994 """ session is not kept alive by msgs with bad sequence numbers"""
1995 key = self.factory.create_random_key(
1996 self, BFDAuthType.meticulous_keyed_sha1)
1997 key.add_vpp_config()
1998 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1999 self.pg0.remote_ip4, sha1_key=key)
2000 self.vpp_session.add_vpp_config()
2001 self.test_session = BFDTestSession(
2002 self, self.pg0, AF_INET, sha1_key=key,
2003 bfd_key_id=self.vpp_session.bfd_key_id)
2004 bfd_session_up(self)
2005 detection_time = self.test_session.detect_mult *\
2006 self.vpp_session.required_min_rx / USEC_IN_SEC
2007 send_until = time.time() + 2 * detection_time
2008 while time.time() < send_until:
2009 self.test_session.send_packet()
2010 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2011 "time between bfd packets")
2012 e = self.vapi.collect_events()
2013 # session should be down now, because the sequence numbers weren't
2015 self.assert_equal(len(e), 1, "number of bfd events")
2016 verify_event(self, e[0], expected_state=BFDState.down)
2018 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2019 legitimate_test_session,
2021 rogue_bfd_values=None):
2022 """ execute a rogue session interaction scenario
2024 1. create vpp session, add config
2025 2. bring the legitimate session up
2026 3. copy the bfd values from legitimate session to rogue session
2027 4. apply rogue_bfd_values to rogue session
2028 5. set rogue session state to down
2029 6. send message to take the session down from the rogue session
2030 7. assert that the legitimate session is unaffected
2033 self.vpp_session = vpp_bfd_udp_session
2034 self.vpp_session.add_vpp_config()
2035 self.test_session = legitimate_test_session
2036 # bring vpp session up
2037 bfd_session_up(self)
2038 # send packet from rogue session
2039 rogue_test_session.update(
2040 my_discriminator=self.test_session.my_discriminator,
2041 your_discriminator=self.test_session.your_discriminator,
2042 desired_min_tx=self.test_session.desired_min_tx,
2043 required_min_rx=self.test_session.required_min_rx,
2044 detect_mult=self.test_session.detect_mult,
2045 diag=self.test_session.diag,
2046 state=self.test_session.state,
2047 auth_type=self.test_session.auth_type)
2048 if rogue_bfd_values:
2049 rogue_test_session.update(**rogue_bfd_values)
2050 rogue_test_session.update(state=BFDState.down)
2051 rogue_test_session.send_packet()
2052 wait_for_bfd_packet(self)
2053 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2055 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2056 def test_mismatch_auth(self):
2057 """ session is not brought down by unauthenticated msg """
2058 key = self.factory.create_random_key(self)
2059 key.add_vpp_config()
2060 vpp_session = VppBFDUDPSession(
2061 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2062 legitimate_test_session = BFDTestSession(
2063 self, self.pg0, AF_INET, sha1_key=key,
2064 bfd_key_id=vpp_session.bfd_key_id)
2065 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2066 self.execute_rogue_session_scenario(vpp_session,
2067 legitimate_test_session,
2070 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2071 def test_mismatch_bfd_key_id(self):
2072 """ session is not brought down by msg with non-existent key-id """
2073 key = self.factory.create_random_key(self)
2074 key.add_vpp_config()
2075 vpp_session = VppBFDUDPSession(
2076 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2077 # pick a different random bfd key id
2079 while x == vpp_session.bfd_key_id:
2081 legitimate_test_session = BFDTestSession(
2082 self, self.pg0, AF_INET, sha1_key=key,
2083 bfd_key_id=vpp_session.bfd_key_id)
2084 rogue_test_session = BFDTestSession(
2085 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2086 self.execute_rogue_session_scenario(vpp_session,
2087 legitimate_test_session,
2090 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2091 def test_mismatched_auth_type(self):
2092 """ session is not brought down by msg with wrong auth type """
2093 key = self.factory.create_random_key(self)
2094 key.add_vpp_config()
2095 vpp_session = VppBFDUDPSession(
2096 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2097 legitimate_test_session = BFDTestSession(
2098 self, self.pg0, AF_INET, sha1_key=key,
2099 bfd_key_id=vpp_session.bfd_key_id)
2100 rogue_test_session = BFDTestSession(
2101 self, self.pg0, AF_INET, sha1_key=key,
2102 bfd_key_id=vpp_session.bfd_key_id)
2103 self.execute_rogue_session_scenario(
2104 vpp_session, legitimate_test_session, rogue_test_session,
2105 {'auth_type': BFDAuthType.keyed_md5})
2107 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2108 def test_restart(self):
2109 """ simulate remote peer restart and resynchronization """
2110 key = self.factory.create_random_key(
2111 self, BFDAuthType.meticulous_keyed_sha1)
2112 key.add_vpp_config()
2113 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2114 self.pg0.remote_ip4, sha1_key=key)
2115 self.vpp_session.add_vpp_config()
2116 self.test_session = BFDTestSession(
2117 self, self.pg0, AF_INET, sha1_key=key,
2118 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2119 bfd_session_up(self)
2120 # don't send any packets for 2*detection_time
2121 detection_time = self.test_session.detect_mult *\
2122 self.vpp_session.required_min_rx / USEC_IN_SEC
2123 self.sleep(2 * detection_time, "simulating peer restart")
2124 events = self.vapi.collect_events()
2125 self.assert_equal(len(events), 1, "number of bfd events")
2126 verify_event(self, events[0], expected_state=BFDState.down)
2127 self.test_session.update(state=BFDState.down)
2128 # reset sequence number
2129 self.test_session.our_seq_number = 0
2130 self.test_session.vpp_seq_number = None
2131 # now throw away any pending packets
2132 self.pg0.enable_capture()
2133 bfd_session_up(self)
2136 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2137 class BFDAuthOnOffTestCase(VppTestCase):
2138 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2145 def setUpClass(cls):
2146 super(BFDAuthOnOffTestCase, cls).setUpClass()
2147 cls.vapi.cli("set log class bfd level debug")
2149 cls.create_pg_interfaces([0])
2150 cls.pg0.config_ip4()
2152 cls.pg0.resolve_arp()
2155 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2159 def tearDownClass(cls):
2160 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2163 super(BFDAuthOnOffTestCase, self).setUp()
2164 self.factory = AuthKeyFactory()
2165 self.vapi.want_bfd_events()
2166 self.pg0.enable_capture()
2169 if not self.vpp_dead:
2170 self.vapi.want_bfd_events(enable_disable=False)
2171 self.vapi.collect_events() # clear the event queue
2172 super(BFDAuthOnOffTestCase, self).tearDown()
2174 def test_auth_on_immediate(self):
2175 """ turn auth on without disturbing session state (immediate) """
2176 key = self.factory.create_random_key(self)
2177 key.add_vpp_config()
2178 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2179 self.pg0.remote_ip4)
2180 self.vpp_session.add_vpp_config()
2181 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2182 bfd_session_up(self)
2183 for dummy in range(self.test_session.detect_mult * 2):
2184 p = wait_for_bfd_packet(self)
2185 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2186 self.test_session.send_packet()
2187 self.vpp_session.activate_auth(key)
2188 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2189 self.test_session.sha1_key = key
2190 for dummy in range(self.test_session.detect_mult * 2):
2191 p = wait_for_bfd_packet(self)
2192 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2193 self.test_session.send_packet()
2194 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2195 self.assert_equal(len(self.vapi.collect_events()), 0,
2196 "number of bfd events")
2198 def test_auth_off_immediate(self):
2199 """ turn auth off without disturbing session state (immediate) """
2200 key = self.factory.create_random_key(self)
2201 key.add_vpp_config()
2202 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2203 self.pg0.remote_ip4, sha1_key=key)
2204 self.vpp_session.add_vpp_config()
2205 self.test_session = BFDTestSession(
2206 self, self.pg0, AF_INET, sha1_key=key,
2207 bfd_key_id=self.vpp_session.bfd_key_id)
2208 bfd_session_up(self)
2209 # self.vapi.want_bfd_events(enable_disable=0)
2210 for dummy in range(self.test_session.detect_mult * 2):
2211 p = wait_for_bfd_packet(self)
2212 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2213 self.test_session.inc_seq_num()
2214 self.test_session.send_packet()
2215 self.vpp_session.deactivate_auth()
2216 self.test_session.bfd_key_id = None
2217 self.test_session.sha1_key = None
2218 for dummy in range(self.test_session.detect_mult * 2):
2219 p = wait_for_bfd_packet(self)
2220 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2221 self.test_session.inc_seq_num()
2222 self.test_session.send_packet()
2223 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2224 self.assert_equal(len(self.vapi.collect_events()), 0,
2225 "number of bfd events")
2227 def test_auth_change_key_immediate(self):
2228 """ change auth key without disturbing session state (immediate) """
2229 key1 = self.factory.create_random_key(self)
2230 key1.add_vpp_config()
2231 key2 = self.factory.create_random_key(self)
2232 key2.add_vpp_config()
2233 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2234 self.pg0.remote_ip4, sha1_key=key1)
2235 self.vpp_session.add_vpp_config()
2236 self.test_session = BFDTestSession(
2237 self, self.pg0, AF_INET, sha1_key=key1,
2238 bfd_key_id=self.vpp_session.bfd_key_id)
2239 bfd_session_up(self)
2240 for dummy in range(self.test_session.detect_mult * 2):
2241 p = wait_for_bfd_packet(self)
2242 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2243 self.test_session.send_packet()
2244 self.vpp_session.activate_auth(key2)
2245 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2246 self.test_session.sha1_key = key2
2247 for dummy in range(self.test_session.detect_mult * 2):
2248 p = wait_for_bfd_packet(self)
2249 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2250 self.test_session.send_packet()
2251 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2252 self.assert_equal(len(self.vapi.collect_events()), 0,
2253 "number of bfd events")
2255 def test_auth_on_delayed(self):
2256 """ turn auth on without disturbing session state (delayed) """
2257 key = self.factory.create_random_key(self)
2258 key.add_vpp_config()
2259 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2260 self.pg0.remote_ip4)
2261 self.vpp_session.add_vpp_config()
2262 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2263 bfd_session_up(self)
2264 for dummy in range(self.test_session.detect_mult * 2):
2265 wait_for_bfd_packet(self)
2266 self.test_session.send_packet()
2267 self.vpp_session.activate_auth(key, delayed=True)
2268 for dummy in range(self.test_session.detect_mult * 2):
2269 p = wait_for_bfd_packet(self)
2270 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2271 self.test_session.send_packet()
2272 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2273 self.test_session.sha1_key = key
2274 self.test_session.send_packet()
2275 for dummy in range(self.test_session.detect_mult * 2):
2276 p = wait_for_bfd_packet(self)
2277 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2278 self.test_session.send_packet()
2279 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2280 self.assert_equal(len(self.vapi.collect_events()), 0,
2281 "number of bfd events")
2283 def test_auth_off_delayed(self):
2284 """ turn auth off without disturbing session state (delayed) """
2285 key = self.factory.create_random_key(self)
2286 key.add_vpp_config()
2287 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2288 self.pg0.remote_ip4, sha1_key=key)
2289 self.vpp_session.add_vpp_config()
2290 self.test_session = BFDTestSession(
2291 self, self.pg0, AF_INET, sha1_key=key,
2292 bfd_key_id=self.vpp_session.bfd_key_id)
2293 bfd_session_up(self)
2294 for dummy in range(self.test_session.detect_mult * 2):
2295 p = wait_for_bfd_packet(self)
2296 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2297 self.test_session.send_packet()
2298 self.vpp_session.deactivate_auth(delayed=True)
2299 for dummy in range(self.test_session.detect_mult * 2):
2300 p = wait_for_bfd_packet(self)
2301 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2302 self.test_session.send_packet()
2303 self.test_session.bfd_key_id = None
2304 self.test_session.sha1_key = None
2305 self.test_session.send_packet()
2306 for dummy in range(self.test_session.detect_mult * 2):
2307 p = wait_for_bfd_packet(self)
2308 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2309 self.test_session.send_packet()
2310 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2311 self.assert_equal(len(self.vapi.collect_events()), 0,
2312 "number of bfd events")
2314 def test_auth_change_key_delayed(self):
2315 """ change auth key without disturbing session state (delayed) """
2316 key1 = self.factory.create_random_key(self)
2317 key1.add_vpp_config()
2318 key2 = self.factory.create_random_key(self)
2319 key2.add_vpp_config()
2320 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2321 self.pg0.remote_ip4, sha1_key=key1)
2322 self.vpp_session.add_vpp_config()
2323 self.vpp_session.admin_up()
2324 self.test_session = BFDTestSession(
2325 self, self.pg0, AF_INET, sha1_key=key1,
2326 bfd_key_id=self.vpp_session.bfd_key_id)
2327 bfd_session_up(self)
2328 for dummy in range(self.test_session.detect_mult * 2):
2329 p = wait_for_bfd_packet(self)
2330 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2331 self.test_session.send_packet()
2332 self.vpp_session.activate_auth(key2, delayed=True)
2333 for dummy in range(self.test_session.detect_mult * 2):
2334 p = wait_for_bfd_packet(self)
2335 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2336 self.test_session.send_packet()
2337 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2338 self.test_session.sha1_key = key2
2339 self.test_session.send_packet()
2340 for dummy in range(self.test_session.detect_mult * 2):
2341 p = wait_for_bfd_packet(self)
2342 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2343 self.test_session.send_packet()
2344 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2345 self.assert_equal(len(self.vapi.collect_events()), 0,
2346 "number of bfd events")
2349 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2350 class BFDCLITestCase(VppTestCase):
2351 """Bidirectional Forwarding Detection (BFD) (CLI) """
2355 def setUpClass(cls):
2356 super(BFDCLITestCase, cls).setUpClass()
2357 cls.vapi.cli("set log class bfd level debug")
2359 cls.create_pg_interfaces((0,))
2360 cls.pg0.config_ip4()
2361 cls.pg0.config_ip6()
2362 cls.pg0.resolve_arp()
2363 cls.pg0.resolve_ndp()
2366 super(BFDCLITestCase, cls).tearDownClass()
2370 def tearDownClass(cls):
2371 super(BFDCLITestCase, cls).tearDownClass()
2374 super(BFDCLITestCase, self).setUp()
2375 self.factory = AuthKeyFactory()
2376 self.pg0.enable_capture()
2380 self.vapi.want_bfd_events(enable_disable=False)
2381 except UnexpectedApiReturnValueError:
2382 # some tests aren't subscribed, so this is not an issue
2384 self.vapi.collect_events() # clear the event queue
2385 super(BFDCLITestCase, self).tearDown()
2387 def cli_verify_no_response(self, cli):
2388 """ execute a CLI, asserting that the response is empty """
2389 self.assert_equal(self.vapi.cli(cli),
2391 "CLI command response")
2393 def cli_verify_response(self, cli, expected):
2394 """ execute a CLI, asserting that the response matches expectation """
2396 reply = self.vapi.cli(cli)
2397 except CliFailedCommandError as cli_error:
2398 reply = str(cli_error)
2399 self.assert_equal(reply.strip(),
2401 "CLI command response")
2403 def test_show(self):
2404 """ show commands """
2405 k1 = self.factory.create_random_key(self)
2407 k2 = self.factory.create_random_key(
2408 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2410 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2412 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2415 self.logger.info(self.vapi.ppcli("show bfd keys"))
2416 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2417 self.logger.info(self.vapi.ppcli("show bfd"))
2419 def test_set_del_sha1_key(self):
2420 """ set/delete SHA1 auth key """
2421 k = self.factory.create_random_key(self)
2422 self.registry.register(k, self.logger)
2423 self.cli_verify_no_response(
2424 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2426 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2427 self.assertTrue(k.query_vpp_config())
2428 self.vpp_session = VppBFDUDPSession(
2429 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2430 self.vpp_session.add_vpp_config()
2431 self.test_session = \
2432 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2433 bfd_key_id=self.vpp_session.bfd_key_id)
2434 self.vapi.want_bfd_events()
2435 bfd_session_up(self)
2436 bfd_session_down(self)
2437 # try to replace the secret for the key - should fail because the key
2439 k2 = self.factory.create_random_key(self)
2440 self.cli_verify_response(
2441 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2443 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2444 "bfd key set: `bfd_auth_set_key' API call failed, "
2445 "rv=-103:BFD object in use")
2446 # manipulating the session using old secret should still work
2447 bfd_session_up(self)
2448 bfd_session_down(self)
2449 self.vpp_session.remove_vpp_config()
2450 self.cli_verify_no_response(
2451 "bfd key del conf-key-id %s" % k.conf_key_id)
2452 self.assertFalse(k.query_vpp_config())
2454 def test_set_del_meticulous_sha1_key(self):
2455 """ set/delete meticulous SHA1 auth key """
2456 k = self.factory.create_random_key(
2457 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2458 self.registry.register(k, self.logger)
2459 self.cli_verify_no_response(
2460 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2462 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2463 self.assertTrue(k.query_vpp_config())
2464 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2465 self.pg0.remote_ip6, af=AF_INET6,
2467 self.vpp_session.add_vpp_config()
2468 self.vpp_session.admin_up()
2469 self.test_session = \
2470 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2471 bfd_key_id=self.vpp_session.bfd_key_id)
2472 self.vapi.want_bfd_events()
2473 bfd_session_up(self)
2474 bfd_session_down(self)
2475 # try to replace the secret for the key - should fail because the key
2477 k2 = self.factory.create_random_key(self)
2478 self.cli_verify_response(
2479 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2481 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2482 "bfd key set: `bfd_auth_set_key' API call failed, "
2483 "rv=-103:BFD object in use")
2484 # manipulating the session using old secret should still work
2485 bfd_session_up(self)
2486 bfd_session_down(self)
2487 self.vpp_session.remove_vpp_config()
2488 self.cli_verify_no_response(
2489 "bfd key del conf-key-id %s" % k.conf_key_id)
2490 self.assertFalse(k.query_vpp_config())
2492 def test_add_mod_del_bfd_udp(self):
2493 """ create/modify/delete IPv4 BFD UDP session """
2494 vpp_session = VppBFDUDPSession(
2495 self, self.pg0, self.pg0.remote_ip4)
2496 self.registry.register(vpp_session, self.logger)
2497 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2498 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2499 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2500 self.pg0.remote_ip4,
2501 vpp_session.desired_min_tx,
2502 vpp_session.required_min_rx,
2503 vpp_session.detect_mult)
2504 self.cli_verify_no_response(cli_add_cmd)
2505 # 2nd add should fail
2506 self.cli_verify_response(
2508 "bfd udp session add: `bfd_add_add_session' API call"
2509 " failed, rv=-101:Duplicate BFD object")
2510 verify_bfd_session_config(self, vpp_session)
2511 mod_session = VppBFDUDPSession(
2512 self, self.pg0, self.pg0.remote_ip4,
2513 required_min_rx=2 * vpp_session.required_min_rx,
2514 desired_min_tx=3 * vpp_session.desired_min_tx,
2515 detect_mult=4 * vpp_session.detect_mult)
2516 self.cli_verify_no_response(
2517 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2518 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2519 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2520 mod_session.desired_min_tx, mod_session.required_min_rx,
2521 mod_session.detect_mult))
2522 verify_bfd_session_config(self, mod_session)
2523 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2524 "peer-addr %s" % (self.pg0.name,
2525 self.pg0.local_ip4, self.pg0.remote_ip4)
2526 self.cli_verify_no_response(cli_del_cmd)
2527 # 2nd del is expected to fail
2528 self.cli_verify_response(
2529 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2530 " failed, rv=-102:No such BFD object")
2531 self.assertFalse(vpp_session.query_vpp_config())
2533 def test_add_mod_del_bfd_udp6(self):
2534 """ create/modify/delete IPv6 BFD UDP session """
2535 vpp_session = VppBFDUDPSession(
2536 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2537 self.registry.register(vpp_session, self.logger)
2538 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2539 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2540 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2541 self.pg0.remote_ip6,
2542 vpp_session.desired_min_tx,
2543 vpp_session.required_min_rx,
2544 vpp_session.detect_mult)
2545 self.cli_verify_no_response(cli_add_cmd)
2546 # 2nd add should fail
2547 self.cli_verify_response(
2549 "bfd udp session add: `bfd_add_add_session' API call"
2550 " failed, rv=-101:Duplicate BFD object")
2551 verify_bfd_session_config(self, vpp_session)
2552 mod_session = VppBFDUDPSession(
2553 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2554 required_min_rx=2 * vpp_session.required_min_rx,
2555 desired_min_tx=3 * vpp_session.desired_min_tx,
2556 detect_mult=4 * vpp_session.detect_mult)
2557 self.cli_verify_no_response(
2558 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2559 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2560 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2561 mod_session.desired_min_tx,
2562 mod_session.required_min_rx, mod_session.detect_mult))
2563 verify_bfd_session_config(self, mod_session)
2564 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2565 "peer-addr %s" % (self.pg0.name,
2566 self.pg0.local_ip6, self.pg0.remote_ip6)
2567 self.cli_verify_no_response(cli_del_cmd)
2568 # 2nd del is expected to fail
2569 self.cli_verify_response(
2571 "bfd udp session del: `bfd_udp_del_session' API call"
2572 " failed, rv=-102:No such BFD object")
2573 self.assertFalse(vpp_session.query_vpp_config())
2575 def test_add_mod_del_bfd_udp_auth(self):
2576 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2577 key = self.factory.create_random_key(self)
2578 key.add_vpp_config()
2579 vpp_session = VppBFDUDPSession(
2580 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2581 self.registry.register(vpp_session, self.logger)
2582 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2583 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2584 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2585 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2586 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2587 vpp_session.detect_mult, key.conf_key_id,
2588 vpp_session.bfd_key_id)
2589 self.cli_verify_no_response(cli_add_cmd)
2590 # 2nd add should fail
2591 self.cli_verify_response(
2593 "bfd udp session add: `bfd_add_add_session' API call"
2594 " failed, rv=-101:Duplicate BFD object")
2595 verify_bfd_session_config(self, vpp_session)
2596 mod_session = VppBFDUDPSession(
2597 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2598 bfd_key_id=vpp_session.bfd_key_id,
2599 required_min_rx=2 * vpp_session.required_min_rx,
2600 desired_min_tx=3 * vpp_session.desired_min_tx,
2601 detect_mult=4 * vpp_session.detect_mult)
2602 self.cli_verify_no_response(
2603 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2604 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2605 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2606 mod_session.desired_min_tx,
2607 mod_session.required_min_rx, mod_session.detect_mult))
2608 verify_bfd_session_config(self, mod_session)
2609 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2610 "peer-addr %s" % (self.pg0.name,
2611 self.pg0.local_ip4, self.pg0.remote_ip4)
2612 self.cli_verify_no_response(cli_del_cmd)
2613 # 2nd del is expected to fail
2614 self.cli_verify_response(
2616 "bfd udp session del: `bfd_udp_del_session' API call"
2617 " failed, rv=-102:No such BFD object")
2618 self.assertFalse(vpp_session.query_vpp_config())
2620 def test_add_mod_del_bfd_udp6_auth(self):
2621 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2622 key = self.factory.create_random_key(
2623 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2624 key.add_vpp_config()
2625 vpp_session = VppBFDUDPSession(
2626 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2627 self.registry.register(vpp_session, self.logger)
2628 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2629 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2630 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2631 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2632 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2633 vpp_session.detect_mult, key.conf_key_id,
2634 vpp_session.bfd_key_id)
2635 self.cli_verify_no_response(cli_add_cmd)
2636 # 2nd add should fail
2637 self.cli_verify_response(
2639 "bfd udp session add: `bfd_add_add_session' API call"
2640 " failed, rv=-101:Duplicate BFD object")
2641 verify_bfd_session_config(self, vpp_session)
2642 mod_session = VppBFDUDPSession(
2643 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2644 bfd_key_id=vpp_session.bfd_key_id,
2645 required_min_rx=2 * vpp_session.required_min_rx,
2646 desired_min_tx=3 * vpp_session.desired_min_tx,
2647 detect_mult=4 * vpp_session.detect_mult)
2648 self.cli_verify_no_response(
2649 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2650 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2651 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2652 mod_session.desired_min_tx,
2653 mod_session.required_min_rx, mod_session.detect_mult))
2654 verify_bfd_session_config(self, mod_session)
2655 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2656 "peer-addr %s" % (self.pg0.name,
2657 self.pg0.local_ip6, self.pg0.remote_ip6)
2658 self.cli_verify_no_response(cli_del_cmd)
2659 # 2nd del is expected to fail
2660 self.cli_verify_response(
2662 "bfd udp session del: `bfd_udp_del_session' API call"
2663 " failed, rv=-102:No such BFD object")
2664 self.assertFalse(vpp_session.query_vpp_config())
2666 def test_auth_on_off(self):
2667 """ turn authentication on and off """
2668 key = self.factory.create_random_key(
2669 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2670 key.add_vpp_config()
2671 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2672 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2674 session.add_vpp_config()
2676 "bfd udp session auth activate interface %s local-addr %s "\
2677 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2678 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2679 key.conf_key_id, auth_session.bfd_key_id)
2680 self.cli_verify_no_response(cli_activate)
2681 verify_bfd_session_config(self, auth_session)
2682 self.cli_verify_no_response(cli_activate)
2683 verify_bfd_session_config(self, auth_session)
2685 "bfd udp session auth deactivate interface %s local-addr %s "\
2687 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2688 self.cli_verify_no_response(cli_deactivate)
2689 verify_bfd_session_config(self, session)
2690 self.cli_verify_no_response(cli_deactivate)
2691 verify_bfd_session_config(self, session)
2693 def test_auth_on_off_delayed(self):
2694 """ turn authentication on and off (delayed) """
2695 key = self.factory.create_random_key(
2696 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2697 key.add_vpp_config()
2698 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2699 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2701 session.add_vpp_config()
2703 "bfd udp session auth activate interface %s local-addr %s "\
2704 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2705 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2706 key.conf_key_id, auth_session.bfd_key_id)
2707 self.cli_verify_no_response(cli_activate)
2708 verify_bfd_session_config(self, auth_session)
2709 self.cli_verify_no_response(cli_activate)
2710 verify_bfd_session_config(self, auth_session)
2712 "bfd udp session auth deactivate interface %s local-addr %s "\
2713 "peer-addr %s delayed yes"\
2714 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2715 self.cli_verify_no_response(cli_deactivate)
2716 verify_bfd_session_config(self, session)
2717 self.cli_verify_no_response(cli_deactivate)
2718 verify_bfd_session_config(self, session)
2720 def test_admin_up_down(self):
2721 """ put session admin-up and admin-down """
2722 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2723 session.add_vpp_config()
2725 "bfd udp session set-flags admin down interface %s local-addr %s "\
2727 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2729 "bfd udp session set-flags admin up interface %s local-addr %s "\
2731 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2732 self.cli_verify_no_response(cli_down)
2733 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2734 self.cli_verify_no_response(cli_up)
2735 verify_bfd_session_config(self, session, state=BFDState.down)
2737 def test_set_del_udp_echo_source(self):
2738 """ set/del udp echo source """
2739 self.create_loopback_interfaces(1)
2740 self.loopback0 = self.lo_interfaces[0]
2741 self.loopback0.admin_up()
2742 self.cli_verify_response("show bfd echo-source",
2743 "UDP echo source is not set.")
2744 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2745 self.cli_verify_no_response(cli_set)
2746 self.cli_verify_response("show bfd echo-source",
2747 "UDP echo source is: %s\n"
2748 "IPv4 address usable as echo source: none\n"
2749 "IPv6 address usable as echo source: none" %
2750 self.loopback0.name)
2751 self.loopback0.config_ip4()
2752 unpacked = unpack("!L", self.loopback0.local_ip4n)
2753 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2754 self.cli_verify_response("show bfd echo-source",
2755 "UDP echo source is: %s\n"
2756 "IPv4 address usable as echo source: %s\n"
2757 "IPv6 address usable as echo source: none" %
2758 (self.loopback0.name, echo_ip4))
2759 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2760 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2761 unpacked[2], unpacked[3] ^ 1))
2762 self.loopback0.config_ip6()
2763 self.cli_verify_response("show bfd echo-source",
2764 "UDP echo source is: %s\n"
2765 "IPv4 address usable as echo source: %s\n"
2766 "IPv6 address usable as echo source: %s" %
2767 (self.loopback0.name, echo_ip4, echo_ip6))
2768 cli_del = "bfd udp echo-source del"
2769 self.cli_verify_no_response(cli_del)
2770 self.cli_verify_response("show bfd echo-source",
2771 "UDP echo source is not set.")
2774 if __name__ == '__main__':
2775 unittest.main(testRunner=VppTestRunner)