4 from __future__ import division
12 from random import randint, shuffle, getrandbits
13 from socket import AF_INET, AF_INET6, inet_ntop
14 from struct import pack, unpack
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23 BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from framework import tag_run_solo
27 from vpp_ip import DpoProto
28 from vpp_ip_route import VppIpRoute, VppRoutePath
29 from vpp_lo_interface import VppLoInterface
30 from vpp_papi_provider import UnexpectedApiReturnValueError, \
32 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
33 from vpp_gre_interface import VppGreInterface
34 from vpp_papi import VppEnum
39 class AuthKeyFactory(object):
40 """Factory class for creating auth keys with unique conf key ID"""
43 self._conf_key_ids = {}
45 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
46 """ create a random key with unique conf key id """
47 conf_key_id = randint(0, 0xFFFFFFFF)
48 while conf_key_id in self._conf_key_ids:
49 conf_key_id = randint(0, 0xFFFFFFFF)
50 self._conf_key_ids[conf_key_id] = 1
51 key = scapy.compat.raw(
52 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
53 return VppBFDAuthKey(test=test, auth_type=auth_type,
54 conf_key_id=conf_key_id, key=key)
57 class BFDAPITestCase(VppTestCase):
58 """Bidirectional Forwarding Detection (BFD) - API"""
65 super(BFDAPITestCase, cls).setUpClass()
66 cls.vapi.cli("set log class bfd level debug")
68 cls.create_pg_interfaces(range(2))
69 for i in cls.pg_interfaces:
75 super(BFDAPITestCase, cls).tearDownClass()
79 def tearDownClass(cls):
80 super(BFDAPITestCase, cls).tearDownClass()
83 super(BFDAPITestCase, self).setUp()
84 self.factory = AuthKeyFactory()
86 def test_add_bfd(self):
87 """ create a BFD session """
88 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
89 session.add_vpp_config()
90 self.logger.debug("Session state is %s", session.state)
91 session.remove_vpp_config()
92 session.add_vpp_config()
93 self.logger.debug("Session state is %s", session.state)
94 session.remove_vpp_config()
96 def test_double_add(self):
97 """ create the same BFD session twice (negative case) """
98 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
99 session.add_vpp_config()
101 with self.vapi.assert_negative_api_retval():
102 session.add_vpp_config()
104 session.remove_vpp_config()
106 def test_add_bfd6(self):
107 """ create IPv6 BFD session """
108 session = VppBFDUDPSession(
109 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
110 session.add_vpp_config()
111 self.logger.debug("Session state is %s", session.state)
112 session.remove_vpp_config()
113 session.add_vpp_config()
114 self.logger.debug("Session state is %s", session.state)
115 session.remove_vpp_config()
117 def test_mod_bfd(self):
118 """ modify BFD session parameters """
119 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
120 desired_min_tx=50000,
121 required_min_rx=10000,
123 session.add_vpp_config()
124 s = session.get_bfd_udp_session_dump_entry()
125 self.assert_equal(session.desired_min_tx,
127 "desired min transmit interval")
128 self.assert_equal(session.required_min_rx,
130 "required min receive interval")
131 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
132 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
133 required_min_rx=session.required_min_rx * 2,
134 detect_mult=session.detect_mult * 2)
135 s = session.get_bfd_udp_session_dump_entry()
136 self.assert_equal(session.desired_min_tx,
138 "desired min transmit interval")
139 self.assert_equal(session.required_min_rx,
141 "required min receive interval")
142 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
144 def test_add_sha1_keys(self):
145 """ add SHA1 keys """
147 keys = [self.factory.create_random_key(
148 self) for i in range(0, key_count)]
150 self.assertFalse(key.query_vpp_config())
154 self.assertTrue(key.query_vpp_config())
156 indexes = list(range(key_count))
161 key.remove_vpp_config()
163 for j in range(key_count):
166 self.assertFalse(key.query_vpp_config())
168 self.assertTrue(key.query_vpp_config())
169 # should be removed now
171 self.assertFalse(key.query_vpp_config())
172 # add back and remove again
176 self.assertTrue(key.query_vpp_config())
178 key.remove_vpp_config()
180 self.assertFalse(key.query_vpp_config())
182 def test_add_bfd_sha1(self):
183 """ create a BFD session (SHA1) """
184 key = self.factory.create_random_key(self)
186 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
188 session.add_vpp_config()
189 self.logger.debug("Session state is %s", session.state)
190 session.remove_vpp_config()
191 session.add_vpp_config()
192 self.logger.debug("Session state is %s", session.state)
193 session.remove_vpp_config()
195 def test_double_add_sha1(self):
196 """ create the same BFD session twice (negative case) (SHA1) """
197 key = self.factory.create_random_key(self)
199 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
201 session.add_vpp_config()
202 with self.assertRaises(Exception):
203 session.add_vpp_config()
205 def test_add_auth_nonexistent_key(self):
206 """ create BFD session using non-existent SHA1 (negative case) """
207 session = VppBFDUDPSession(
208 self, self.pg0, self.pg0.remote_ip4,
209 sha1_key=self.factory.create_random_key(self))
210 with self.assertRaises(Exception):
211 session.add_vpp_config()
213 def test_shared_sha1_key(self):
214 """ share single SHA1 key between multiple BFD sessions """
215 key = self.factory.create_random_key(self)
218 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
220 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
221 sha1_key=key, af=AF_INET6),
222 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
224 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
225 sha1_key=key, af=AF_INET6)]
230 e = key.get_bfd_auth_keys_dump_entry()
231 self.assert_equal(e.use_count, len(sessions) - removed,
232 "Use count for shared key")
233 s.remove_vpp_config()
235 e = key.get_bfd_auth_keys_dump_entry()
236 self.assert_equal(e.use_count, len(sessions) - removed,
237 "Use count for shared key")
239 def test_activate_auth(self):
240 """ activate SHA1 authentication """
241 key = self.factory.create_random_key(self)
243 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
244 session.add_vpp_config()
245 session.activate_auth(key)
247 def test_deactivate_auth(self):
248 """ deactivate SHA1 authentication """
249 key = self.factory.create_random_key(self)
251 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
252 session.add_vpp_config()
253 session.activate_auth(key)
254 session.deactivate_auth()
256 def test_change_key(self):
257 """ change SHA1 key """
258 key1 = self.factory.create_random_key(self)
259 key2 = self.factory.create_random_key(self)
260 while key2.conf_key_id == key1.conf_key_id:
261 key2 = self.factory.create_random_key(self)
262 key1.add_vpp_config()
263 key2.add_vpp_config()
264 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
266 session.add_vpp_config()
267 session.activate_auth(key2)
269 def test_set_del_udp_echo_source(self):
270 """ set/del udp echo source """
271 self.create_loopback_interfaces(1)
272 self.loopback0 = self.lo_interfaces[0]
273 self.loopback0.admin_up()
274 echo_source = self.vapi.bfd_udp_get_echo_source()
275 self.assertFalse(echo_source.is_set)
276 self.assertFalse(echo_source.have_usable_ip4)
277 self.assertFalse(echo_source.have_usable_ip6)
279 self.vapi.bfd_udp_set_echo_source(
280 sw_if_index=self.loopback0.sw_if_index)
281 echo_source = self.vapi.bfd_udp_get_echo_source()
282 self.assertTrue(echo_source.is_set)
283 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
284 self.assertFalse(echo_source.have_usable_ip4)
285 self.assertFalse(echo_source.have_usable_ip6)
287 self.loopback0.config_ip4()
288 echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
289 self.loopback0.local_ip4)) ^ 1).packed
290 echo_source = self.vapi.bfd_udp_get_echo_source()
291 self.assertTrue(echo_source.is_set)
292 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
293 self.assertTrue(echo_source.have_usable_ip4)
294 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
295 self.assertFalse(echo_source.have_usable_ip6)
297 self.loopback0.config_ip6()
298 echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
299 self.loopback0.local_ip6)) ^ 1).packed
301 echo_source = self.vapi.bfd_udp_get_echo_source()
302 self.assertTrue(echo_source.is_set)
303 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
304 self.assertTrue(echo_source.have_usable_ip4)
305 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
306 self.assertTrue(echo_source.have_usable_ip6)
307 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
309 self.vapi.bfd_udp_del_echo_source()
310 echo_source = self.vapi.bfd_udp_get_echo_source()
311 self.assertFalse(echo_source.is_set)
312 self.assertFalse(echo_source.have_usable_ip4)
313 self.assertFalse(echo_source.have_usable_ip6)
316 class BFDTestSession(object):
317 """ BFD session as seen from test framework side """
319 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
320 bfd_key_id=None, our_seq_number=None,
321 tunnel_header=None, phy_interface=None):
324 self.sha1_key = sha1_key
325 self.bfd_key_id = bfd_key_id
326 self.interface = interface
328 self.phy_interface = phy_interface
330 self.phy_interface = self.interface
331 self.udp_sport = randint(49152, 65535)
332 if our_seq_number is None:
333 self.our_seq_number = randint(0, 40000000)
335 self.our_seq_number = our_seq_number
336 self.vpp_seq_number = None
337 self.my_discriminator = 0
338 self.desired_min_tx = 300000
339 self.required_min_rx = 300000
340 self.required_min_echo_rx = None
341 self.detect_mult = detect_mult
342 self.diag = BFDDiagCode.no_diagnostic
343 self.your_discriminator = None
344 self.state = BFDState.down
345 self.auth_type = BFDAuthType.no_auth
346 self.tunnel_header = tunnel_header
348 def inc_seq_num(self):
349 """ increment sequence number, wrapping if needed """
350 if self.our_seq_number == 0xFFFFFFFF:
351 self.our_seq_number = 0
353 self.our_seq_number += 1
355 def update(self, my_discriminator=None, your_discriminator=None,
356 desired_min_tx=None, required_min_rx=None,
357 required_min_echo_rx=None, detect_mult=None,
358 diag=None, state=None, auth_type=None):
359 """ update BFD parameters associated with session """
360 if my_discriminator is not None:
361 self.my_discriminator = my_discriminator
362 if your_discriminator is not None:
363 self.your_discriminator = your_discriminator
364 if required_min_rx is not None:
365 self.required_min_rx = required_min_rx
366 if required_min_echo_rx is not None:
367 self.required_min_echo_rx = required_min_echo_rx
368 if desired_min_tx is not None:
369 self.desired_min_tx = desired_min_tx
370 if detect_mult is not None:
371 self.detect_mult = detect_mult
374 if state is not None:
376 if auth_type is not None:
377 self.auth_type = auth_type
379 def fill_packet_fields(self, packet):
380 """ set packet fields with known values in packet """
382 if self.my_discriminator:
383 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
384 self.my_discriminator)
385 bfd.my_discriminator = self.my_discriminator
386 if self.your_discriminator:
387 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
388 self.your_discriminator)
389 bfd.your_discriminator = self.your_discriminator
390 if self.required_min_rx:
391 self.test.logger.debug(
392 "BFD: setting packet.required_min_rx_interval=%s",
393 self.required_min_rx)
394 bfd.required_min_rx_interval = self.required_min_rx
395 if self.required_min_echo_rx:
396 self.test.logger.debug(
397 "BFD: setting packet.required_min_echo_rx=%s",
398 self.required_min_echo_rx)
399 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
400 if self.desired_min_tx:
401 self.test.logger.debug(
402 "BFD: setting packet.desired_min_tx_interval=%s",
404 bfd.desired_min_tx_interval = self.desired_min_tx
406 self.test.logger.debug(
407 "BFD: setting packet.detect_mult=%s", self.detect_mult)
408 bfd.detect_mult = self.detect_mult
410 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
413 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
414 bfd.state = self.state
416 # this is used by a negative test-case
417 self.test.logger.debug("BFD: setting packet.auth_type=%s",
419 bfd.auth_type = self.auth_type
421 def create_packet(self):
422 """ create a BFD packet, reflecting the current state of session """
425 bfd.auth_type = self.sha1_key.auth_type
426 bfd.auth_len = BFD.sha1_auth_len
427 bfd.auth_key_id = self.bfd_key_id
428 bfd.auth_seq_num = self.our_seq_number
429 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
432 packet = Ether(src=self.phy_interface.remote_mac,
433 dst=self.phy_interface.local_mac)
434 if self.tunnel_header:
435 packet = packet / self.tunnel_header
436 if self.af == AF_INET6:
438 IPv6(src=self.interface.remote_ip6,
439 dst=self.interface.local_ip6,
441 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
445 IP(src=self.interface.remote_ip4,
446 dst=self.interface.local_ip4,
448 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
450 self.test.logger.debug("BFD: Creating packet")
451 self.fill_packet_fields(packet)
453 hash_material = scapy.compat.raw(
454 packet[BFD])[:32] + self.sha1_key.key + \
455 b"\0" * (20 - len(self.sha1_key.key))
456 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
457 hashlib.sha1(hash_material).hexdigest())
458 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
461 def send_packet(self, packet=None, interface=None):
462 """ send packet on interface, creating the packet if needed """
464 packet = self.create_packet()
465 if interface is None:
466 interface = self.phy_interface
467 self.test.logger.debug(ppp("Sending packet:", packet))
468 interface.add_stream(packet)
471 def verify_sha1_auth(self, packet):
472 """ Verify correctness of authentication in BFD layer. """
474 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
475 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
477 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
478 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
479 if self.vpp_seq_number is None:
480 self.vpp_seq_number = bfd.auth_seq_num
481 self.test.logger.debug("Received initial sequence number: %s" %
484 recvd_seq_num = bfd.auth_seq_num
485 self.test.logger.debug("Received followup sequence number: %s" %
487 if self.vpp_seq_number < 0xffffffff:
488 if self.sha1_key.auth_type == \
489 BFDAuthType.meticulous_keyed_sha1:
490 self.test.assert_equal(recvd_seq_num,
491 self.vpp_seq_number + 1,
492 "BFD sequence number")
494 self.test.assert_in_range(recvd_seq_num,
496 self.vpp_seq_number + 1,
497 "BFD sequence number")
499 if self.sha1_key.auth_type == \
500 BFDAuthType.meticulous_keyed_sha1:
501 self.test.assert_equal(recvd_seq_num, 0,
502 "BFD sequence number")
504 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
505 "BFD sequence number not one of "
506 "(%s, 0)" % self.vpp_seq_number)
507 self.vpp_seq_number = recvd_seq_num
508 # last 20 bytes represent the hash - so replace them with the key,
509 # pad the result with zeros and hash the result
510 hash_material = bfd.original[:-20] + self.sha1_key.key + \
511 b"\0" * (20 - len(self.sha1_key.key))
512 expected_hash = hashlib.sha1(hash_material).hexdigest()
513 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
514 expected_hash.encode(), "Auth key hash")
516 def verify_bfd(self, packet):
517 """ Verify correctness of BFD layer. """
519 self.test.assert_equal(bfd.version, 1, "BFD version")
520 self.test.assert_equal(bfd.your_discriminator,
521 self.my_discriminator,
522 "BFD - your discriminator")
524 self.verify_sha1_auth(packet)
527 def bfd_session_up(test):
528 """ Bring BFD session up """
529 test.logger.info("BFD: Waiting for slow hello")
530 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
532 if hasattr(test, 'vpp_clock_offset'):
533 old_offset = test.vpp_clock_offset
534 test.vpp_clock_offset = time.time() - float(p.time)
535 test.logger.debug("BFD: Calculated vpp clock offset: %s",
536 test.vpp_clock_offset)
538 test.assertAlmostEqual(
539 old_offset, test.vpp_clock_offset, delta=0.5,
540 msg="vpp clock offset not stable (new: %s, old: %s)" %
541 (test.vpp_clock_offset, old_offset))
542 test.logger.info("BFD: Sending Init")
543 test.test_session.update(my_discriminator=randint(0, 40000000),
544 your_discriminator=p[BFD].my_discriminator,
546 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
547 BFDAuthType.meticulous_keyed_sha1:
548 test.test_session.inc_seq_num()
549 test.test_session.send_packet()
550 test.logger.info("BFD: Waiting for event")
551 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
552 verify_event(test, e, expected_state=BFDState.up)
553 test.logger.info("BFD: Session is Up")
554 test.test_session.update(state=BFDState.up)
555 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
556 BFDAuthType.meticulous_keyed_sha1:
557 test.test_session.inc_seq_num()
558 test.test_session.send_packet()
559 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
562 def bfd_session_down(test):
563 """ Bring BFD session down """
564 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
565 test.test_session.update(state=BFDState.down)
566 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
567 BFDAuthType.meticulous_keyed_sha1:
568 test.test_session.inc_seq_num()
569 test.test_session.send_packet()
570 test.logger.info("BFD: Waiting for event")
571 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
572 verify_event(test, e, expected_state=BFDState.down)
573 test.logger.info("BFD: Session is Down")
574 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
577 def verify_bfd_session_config(test, session, state=None):
578 dump = session.get_bfd_udp_session_dump_entry()
579 test.assertIsNotNone(dump)
580 # since dump is not none, we have verified that sw_if_index and addresses
581 # are valid (in get_bfd_udp_session_dump_entry)
583 test.assert_equal(dump.state, state, "session state")
584 test.assert_equal(dump.required_min_rx, session.required_min_rx,
585 "required min rx interval")
586 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
587 "desired min tx interval")
588 test.assert_equal(dump.detect_mult, session.detect_mult,
590 if session.sha1_key is None:
591 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
593 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
594 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
596 test.assert_equal(dump.conf_key_id,
597 session.sha1_key.conf_key_id,
601 def verify_ip(test, packet):
602 """ Verify correctness of IP layer. """
603 if test.vpp_session.af == AF_INET6:
605 local_ip = test.vpp_session.interface.local_ip6
606 remote_ip = test.vpp_session.interface.remote_ip6
607 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
610 local_ip = test.vpp_session.interface.local_ip4
611 remote_ip = test.vpp_session.interface.remote_ip4
612 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
613 test.assert_equal(ip.src, local_ip, "IP source address")
614 test.assert_equal(ip.dst, remote_ip, "IP destination address")
617 def verify_udp(test, packet):
618 """ Verify correctness of UDP layer. """
620 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
621 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
625 def verify_event(test, event, expected_state):
626 """ Verify correctness of event values. """
628 test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
629 test.assert_equal(e.sw_if_index,
630 test.vpp_session.interface.sw_if_index,
631 "BFD interface index")
633 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
634 "Local IPv6 address")
635 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
637 test.assert_equal(e.state, expected_state, BFDState)
640 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
641 """ wait for BFD packet and verify its correctness
643 :param timeout: how long to wait
644 :param pcap_time_min: ignore packets with pcap timestamp lower than this
646 :returns: tuple (packet, time spent waiting for packet)
648 test.logger.info("BFD: Waiting for BFD packet")
649 deadline = time.time() + timeout
654 test.assert_in_range(counter, 0, 100, "number of packets ignored")
655 time_left = deadline - time.time()
657 raise CaptureTimeoutError("Packet did not arrive within timeout")
658 p = test.pg0.wait_for_packet(timeout=time_left)
659 test.logger.debug(ppp("BFD: Got packet:", p))
660 if pcap_time_min is not None and p.time < pcap_time_min:
661 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
662 "pcap time min %s):" %
663 (p.time, pcap_time_min), p))
667 # strip an IP layer and move to the next
672 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
674 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
677 test.test_session.verify_bfd(p)
682 class BFD4TestCase(VppTestCase):
683 """Bidirectional Forwarding Detection (BFD)"""
686 vpp_clock_offset = None
692 super(BFD4TestCase, cls).setUpClass()
693 cls.vapi.cli("set log class bfd level debug")
695 cls.create_pg_interfaces([0])
696 cls.create_loopback_interfaces(1)
697 cls.loopback0 = cls.lo_interfaces[0]
698 cls.loopback0.config_ip4()
699 cls.loopback0.admin_up()
701 cls.pg0.configure_ipv4_neighbors()
703 cls.pg0.resolve_arp()
706 super(BFD4TestCase, cls).tearDownClass()
710 def tearDownClass(cls):
711 super(BFD4TestCase, cls).tearDownClass()
714 super(BFD4TestCase, self).setUp()
715 self.factory = AuthKeyFactory()
716 self.vapi.want_bfd_events()
717 self.pg0.enable_capture()
719 self.vpp_session = VppBFDUDPSession(self, self.pg0,
721 self.vpp_session.add_vpp_config()
722 self.vpp_session.admin_up()
723 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
724 except BaseException:
725 self.vapi.want_bfd_events(enable_disable=0)
729 if not self.vpp_dead:
730 self.vapi.want_bfd_events(enable_disable=0)
731 self.vapi.collect_events() # clear the event queue
732 super(BFD4TestCase, self).tearDown()
734 def test_session_up(self):
735 """ bring BFD session up """
738 def test_session_up_by_ip(self):
739 """ bring BFD session up - first frame looked up by address pair """
740 self.logger.info("BFD: Sending Slow control frame")
741 self.test_session.update(my_discriminator=randint(0, 40000000))
742 self.test_session.send_packet()
743 self.pg0.enable_capture()
744 p = self.pg0.wait_for_packet(1)
745 self.assert_equal(p[BFD].your_discriminator,
746 self.test_session.my_discriminator,
747 "BFD - your discriminator")
748 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
749 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
751 self.logger.info("BFD: Waiting for event")
752 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
753 verify_event(self, e, expected_state=BFDState.init)
754 self.logger.info("BFD: Sending Up")
755 self.test_session.send_packet()
756 self.logger.info("BFD: Waiting for event")
757 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758 verify_event(self, e, expected_state=BFDState.up)
759 self.logger.info("BFD: Session is Up")
760 self.test_session.update(state=BFDState.up)
761 self.test_session.send_packet()
762 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
764 def test_session_down(self):
765 """ bring BFD session down """
767 bfd_session_down(self)
769 def test_hold_up(self):
770 """ hold BFD session up """
772 for dummy in range(self.test_session.detect_mult * 2):
773 wait_for_bfd_packet(self)
774 self.test_session.send_packet()
775 self.assert_equal(len(self.vapi.collect_events()), 0,
776 "number of bfd events")
778 def test_slow_timer(self):
779 """ verify slow periodic control frames while session down """
781 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
782 prev_packet = wait_for_bfd_packet(self, 2)
783 for dummy in range(packet_count):
784 next_packet = wait_for_bfd_packet(self, 2)
785 time_diff = next_packet.time - prev_packet.time
786 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
787 # to work around timing issues
788 self.assert_in_range(
789 time_diff, 0.70, 1.05, "time between slow packets")
790 prev_packet = next_packet
792 def test_zero_remote_min_rx(self):
793 """ no packets when zero remote required min rx interval """
795 self.test_session.update(required_min_rx=0)
796 self.test_session.send_packet()
797 for dummy in range(self.test_session.detect_mult):
798 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
799 "sleep before transmitting bfd packet")
800 self.test_session.send_packet()
802 p = wait_for_bfd_packet(self, timeout=0)
803 self.logger.error(ppp("Received unexpected packet:", p))
804 except CaptureTimeoutError:
807 len(self.vapi.collect_events()), 0, "number of bfd events")
808 self.test_session.update(required_min_rx=300000)
809 for dummy in range(3):
810 self.test_session.send_packet()
812 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
814 len(self.vapi.collect_events()), 0, "number of bfd events")
816 def test_conn_down(self):
817 """ verify session goes down after inactivity """
819 detection_time = self.test_session.detect_mult *\
820 self.vpp_session.required_min_rx / USEC_IN_SEC
821 self.sleep(detection_time, "waiting for BFD session time-out")
822 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
823 verify_event(self, e, expected_state=BFDState.down)
825 def test_peer_discr_reset_sess_down(self):
826 """ peer discriminator reset after session goes down """
828 detection_time = self.test_session.detect_mult *\
829 self.vpp_session.required_min_rx / USEC_IN_SEC
830 self.sleep(detection_time, "waiting for BFD session time-out")
831 self.test_session.my_discriminator = 0
832 wait_for_bfd_packet(self,
833 pcap_time_min=time.time() - self.vpp_clock_offset)
835 def test_large_required_min_rx(self):
836 """ large remote required min rx interval """
838 p = wait_for_bfd_packet(self)
840 self.test_session.update(required_min_rx=interval)
841 self.test_session.send_packet()
842 time_mark = time.time()
844 # busy wait here, trying to collect a packet or event, vpp is not
845 # allowed to send packets and the session will timeout first - so the
846 # Up->Down event must arrive before any packets do
847 while time.time() < time_mark + interval / USEC_IN_SEC:
849 p = wait_for_bfd_packet(self, timeout=0)
850 # if vpp managed to send a packet before we did the session
851 # session update, then that's fine, ignore it
852 if p.time < time_mark - self.vpp_clock_offset:
854 self.logger.error(ppp("Received unexpected packet:", p))
856 except CaptureTimeoutError:
858 events = self.vapi.collect_events()
860 verify_event(self, events[0], BFDState.down)
862 self.assert_equal(count, 0, "number of packets received")
864 def test_immediate_remote_min_rx_reduction(self):
865 """ immediately honor remote required min rx reduction """
866 self.vpp_session.remove_vpp_config()
867 self.vpp_session = VppBFDUDPSession(
868 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
869 self.pg0.enable_capture()
870 self.vpp_session.add_vpp_config()
871 self.test_session.update(desired_min_tx=1000000,
872 required_min_rx=1000000)
874 reference_packet = wait_for_bfd_packet(self)
875 time_mark = time.time()
877 self.test_session.update(required_min_rx=interval)
878 self.test_session.send_packet()
879 extra_time = time.time() - time_mark
880 p = wait_for_bfd_packet(self)
881 # first packet is allowed to be late by time we spent doing the update
882 # calculated in extra_time
883 self.assert_in_range(p.time - reference_packet.time,
884 .95 * 0.75 * interval / USEC_IN_SEC,
885 1.05 * interval / USEC_IN_SEC + extra_time,
886 "time between BFD packets")
888 for dummy in range(3):
889 p = wait_for_bfd_packet(self)
890 diff = p.time - reference_packet.time
891 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
892 1.05 * interval / USEC_IN_SEC,
893 "time between BFD packets")
896 def test_modify_req_min_rx_double(self):
897 """ modify session - double required min rx """
899 p = wait_for_bfd_packet(self)
900 self.test_session.update(desired_min_tx=10000,
901 required_min_rx=10000)
902 self.test_session.send_packet()
903 # double required min rx
904 self.vpp_session.modify_parameters(
905 required_min_rx=2 * self.vpp_session.required_min_rx)
906 p = wait_for_bfd_packet(
907 self, pcap_time_min=time.time() - self.vpp_clock_offset)
908 # poll bit needs to be set
909 self.assertIn("P", p.sprintf("%BFD.flags%"),
910 "Poll bit not set in BFD packet")
911 # finish poll sequence with final packet
912 final = self.test_session.create_packet()
913 final[BFD].flags = "F"
914 timeout = self.test_session.detect_mult * \
915 max(self.test_session.desired_min_tx,
916 self.vpp_session.required_min_rx) / USEC_IN_SEC
917 self.test_session.send_packet(final)
918 time_mark = time.time()
919 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
920 verify_event(self, e, expected_state=BFDState.down)
921 time_to_event = time.time() - time_mark
922 self.assert_in_range(time_to_event, .9 * timeout,
923 1.1 * timeout, "session timeout")
925 def test_modify_req_min_rx_halve(self):
926 """ modify session - halve required min rx """
927 self.vpp_session.modify_parameters(
928 required_min_rx=2 * self.vpp_session.required_min_rx)
930 p = wait_for_bfd_packet(self)
931 self.test_session.update(desired_min_tx=10000,
932 required_min_rx=10000)
933 self.test_session.send_packet()
934 p = wait_for_bfd_packet(
935 self, pcap_time_min=time.time() - self.vpp_clock_offset)
936 # halve required min rx
937 old_required_min_rx = self.vpp_session.required_min_rx
938 self.vpp_session.modify_parameters(
939 required_min_rx=self.vpp_session.required_min_rx // 2)
940 # now we wait 0.8*3*old-req-min-rx and the session should still be up
941 self.sleep(0.8 * self.vpp_session.detect_mult *
942 old_required_min_rx / USEC_IN_SEC,
943 "wait before finishing poll sequence")
944 self.assert_equal(len(self.vapi.collect_events()), 0,
945 "number of bfd events")
946 p = wait_for_bfd_packet(self)
947 # poll bit needs to be set
948 self.assertIn("P", p.sprintf("%BFD.flags%"),
949 "Poll bit not set in BFD packet")
950 # finish poll sequence with final packet
951 final = self.test_session.create_packet()
952 final[BFD].flags = "F"
953 self.test_session.send_packet(final)
954 # now the session should time out under new conditions
955 detection_time = self.test_session.detect_mult *\
956 self.vpp_session.required_min_rx / USEC_IN_SEC
958 e = self.vapi.wait_for_event(
959 2 * detection_time, "bfd_udp_session_details")
961 self.assert_in_range(after - before,
962 0.9 * detection_time,
963 1.1 * detection_time,
964 "time before bfd session goes down")
965 verify_event(self, e, expected_state=BFDState.down)
967 def test_modify_detect_mult(self):
968 """ modify detect multiplier """
970 p = wait_for_bfd_packet(self)
971 self.vpp_session.modify_parameters(detect_mult=1)
972 p = wait_for_bfd_packet(
973 self, pcap_time_min=time.time() - self.vpp_clock_offset)
974 self.assert_equal(self.vpp_session.detect_mult,
977 # poll bit must not be set
978 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
979 "Poll bit not set in BFD packet")
980 self.vpp_session.modify_parameters(detect_mult=10)
981 p = wait_for_bfd_packet(
982 self, pcap_time_min=time.time() - self.vpp_clock_offset)
983 self.assert_equal(self.vpp_session.detect_mult,
986 # poll bit must not be set
987 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
988 "Poll bit not set in BFD packet")
990 def test_queued_poll(self):
991 """ test poll sequence queueing """
993 p = wait_for_bfd_packet(self)
994 self.vpp_session.modify_parameters(
995 required_min_rx=2 * self.vpp_session.required_min_rx)
996 p = wait_for_bfd_packet(self)
997 poll_sequence_start = time.time()
998 poll_sequence_length_min = 0.5
999 send_final_after = time.time() + poll_sequence_length_min
1000 # poll bit needs to be set
1001 self.assertIn("P", p.sprintf("%BFD.flags%"),
1002 "Poll bit not set in BFD packet")
1003 self.assert_equal(p[BFD].required_min_rx_interval,
1004 self.vpp_session.required_min_rx,
1005 "BFD required min rx interval")
1006 self.vpp_session.modify_parameters(
1007 required_min_rx=2 * self.vpp_session.required_min_rx)
1008 # 2nd poll sequence should be queued now
1009 # don't send the reply back yet, wait for some time to emulate
1010 # longer round-trip time
1012 while time.time() < send_final_after:
1013 self.test_session.send_packet()
1014 p = wait_for_bfd_packet(self)
1015 self.assert_equal(len(self.vapi.collect_events()), 0,
1016 "number of bfd events")
1017 self.assert_equal(p[BFD].required_min_rx_interval,
1018 self.vpp_session.required_min_rx,
1019 "BFD required min rx interval")
1021 # poll bit must be set
1022 self.assertIn("P", p.sprintf("%BFD.flags%"),
1023 "Poll bit not set in BFD packet")
1024 final = self.test_session.create_packet()
1025 final[BFD].flags = "F"
1026 self.test_session.send_packet(final)
1027 # finish 1st with final
1028 poll_sequence_length = time.time() - poll_sequence_start
1029 # vpp must wait for some time before starting new poll sequence
1030 poll_no_2_started = False
1031 for dummy in range(2 * packet_count):
1032 p = wait_for_bfd_packet(self)
1033 self.assert_equal(len(self.vapi.collect_events()), 0,
1034 "number of bfd events")
1035 if "P" in p.sprintf("%BFD.flags%"):
1036 poll_no_2_started = True
1037 if time.time() < poll_sequence_start + poll_sequence_length:
1038 raise Exception("VPP started 2nd poll sequence too soon")
1039 final = self.test_session.create_packet()
1040 final[BFD].flags = "F"
1041 self.test_session.send_packet(final)
1044 self.test_session.send_packet()
1045 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1046 # finish 2nd with final
1047 final = self.test_session.create_packet()
1048 final[BFD].flags = "F"
1049 self.test_session.send_packet(final)
1050 p = wait_for_bfd_packet(self)
1051 # poll bit must not be set
1052 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1053 "Poll bit set in BFD packet")
1055 # returning inconsistent results requiring retries in per-patch tests
1056 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1057 def test_poll_response(self):
1058 """ test correct response to control frame with poll bit set """
1059 bfd_session_up(self)
1060 poll = self.test_session.create_packet()
1061 poll[BFD].flags = "P"
1062 self.test_session.send_packet(poll)
1063 final = wait_for_bfd_packet(
1064 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1065 self.assertIn("F", final.sprintf("%BFD.flags%"))
1067 def test_no_periodic_if_remote_demand(self):
1068 """ no periodic frames outside poll sequence if remote demand set """
1069 bfd_session_up(self)
1070 demand = self.test_session.create_packet()
1071 demand[BFD].flags = "D"
1072 self.test_session.send_packet(demand)
1073 transmit_time = 0.9 \
1074 * max(self.vpp_session.required_min_rx,
1075 self.test_session.desired_min_tx) \
1078 for dummy in range(self.test_session.detect_mult * 2):
1079 self.sleep(transmit_time)
1080 self.test_session.send_packet(demand)
1082 p = wait_for_bfd_packet(self, timeout=0)
1083 self.logger.error(ppp("Received unexpected packet:", p))
1085 except CaptureTimeoutError:
1087 events = self.vapi.collect_events()
1089 self.logger.error("Received unexpected event: %s", e)
1090 self.assert_equal(count, 0, "number of packets received")
1091 self.assert_equal(len(events), 0, "number of events received")
1093 def test_echo_looped_back(self):
1094 """ echo packets looped back """
1095 # don't need a session in this case..
1096 self.vpp_session.remove_vpp_config()
1097 self.pg0.enable_capture()
1098 echo_packet_count = 10
1099 # random source port low enough to increment a few times..
1100 udp_sport_tx = randint(1, 50000)
1101 udp_sport_rx = udp_sport_tx
1102 echo_packet = (Ether(src=self.pg0.remote_mac,
1103 dst=self.pg0.local_mac) /
1104 IP(src=self.pg0.remote_ip4,
1105 dst=self.pg0.remote_ip4) /
1106 UDP(dport=BFD.udp_dport_echo) /
1107 Raw("this should be looped back"))
1108 for dummy in range(echo_packet_count):
1109 self.sleep(.01, "delay between echo packets")
1110 echo_packet[UDP].sport = udp_sport_tx
1112 self.logger.debug(ppp("Sending packet:", echo_packet))
1113 self.pg0.add_stream(echo_packet)
1115 for dummy in range(echo_packet_count):
1116 p = self.pg0.wait_for_packet(1)
1117 self.logger.debug(ppp("Got packet:", p))
1119 self.assert_equal(self.pg0.remote_mac,
1120 ether.dst, "Destination MAC")
1121 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1123 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1124 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1126 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1127 "UDP destination port")
1128 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1130 # need to compare the hex payload here, otherwise BFD_vpp_echo
1132 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1133 scapy.compat.raw(echo_packet[UDP].payload),
1134 "Received packet is not the echo packet sent")
1135 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1136 "ECHO packet identifier for test purposes)")
1138 def test_echo(self):
1139 """ echo function """
1140 bfd_session_up(self)
1141 self.test_session.update(required_min_echo_rx=150000)
1142 self.test_session.send_packet()
1143 detection_time = self.test_session.detect_mult *\
1144 self.vpp_session.required_min_rx / USEC_IN_SEC
1145 # echo shouldn't work without echo source set
1146 for dummy in range(10):
1147 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1148 self.sleep(sleep, "delay before sending bfd packet")
1149 self.test_session.send_packet()
1150 p = wait_for_bfd_packet(
1151 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1152 self.assert_equal(p[BFD].required_min_rx_interval,
1153 self.vpp_session.required_min_rx,
1154 "BFD required min rx interval")
1155 self.test_session.send_packet()
1156 self.vapi.bfd_udp_set_echo_source(
1157 sw_if_index=self.loopback0.sw_if_index)
1159 # should be turned on - loopback echo packets
1160 for dummy in range(3):
1161 loop_until = time.time() + 0.75 * detection_time
1162 while time.time() < loop_until:
1163 p = self.pg0.wait_for_packet(1)
1164 self.logger.debug(ppp("Got packet:", p))
1165 if p[UDP].dport == BFD.udp_dport_echo:
1167 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1168 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1169 "BFD ECHO src IP equal to loopback IP")
1170 self.logger.debug(ppp("Looping back packet:", p))
1171 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1172 "ECHO packet destination MAC address")
1173 p[Ether].dst = self.pg0.local_mac
1174 self.pg0.add_stream(p)
1177 elif p.haslayer(BFD):
1179 self.assertGreaterEqual(
1180 p[BFD].required_min_rx_interval,
1182 if "P" in p.sprintf("%BFD.flags%"):
1183 final = self.test_session.create_packet()
1184 final[BFD].flags = "F"
1185 self.test_session.send_packet(final)
1187 raise Exception(ppp("Received unknown packet:", p))
1189 self.assert_equal(len(self.vapi.collect_events()), 0,
1190 "number of bfd events")
1191 self.test_session.send_packet()
1192 self.assertTrue(echo_seen, "No echo packets received")
1194 def test_echo_fail(self):
1195 """ session goes down if echo function fails """
1196 bfd_session_up(self)
1197 self.test_session.update(required_min_echo_rx=150000)
1198 self.test_session.send_packet()
1199 detection_time = self.test_session.detect_mult *\
1200 self.vpp_session.required_min_rx / USEC_IN_SEC
1201 self.vapi.bfd_udp_set_echo_source(
1202 sw_if_index=self.loopback0.sw_if_index)
1203 # echo function should be used now, but we will drop the echo packets
1204 verified_diag = False
1205 for dummy in range(3):
1206 loop_until = time.time() + 0.75 * detection_time
1207 while time.time() < loop_until:
1208 p = self.pg0.wait_for_packet(1)
1209 self.logger.debug(ppp("Got packet:", p))
1210 if p[UDP].dport == BFD.udp_dport_echo:
1213 elif p.haslayer(BFD):
1214 if "P" in p.sprintf("%BFD.flags%"):
1215 self.assertGreaterEqual(
1216 p[BFD].required_min_rx_interval,
1218 final = self.test_session.create_packet()
1219 final[BFD].flags = "F"
1220 self.test_session.send_packet(final)
1221 if p[BFD].state == BFDState.down:
1222 self.assert_equal(p[BFD].diag,
1223 BFDDiagCode.echo_function_failed,
1225 verified_diag = True
1227 raise Exception(ppp("Received unknown packet:", p))
1228 self.test_session.send_packet()
1229 events = self.vapi.collect_events()
1230 self.assert_equal(len(events), 1, "number of bfd events")
1231 self.assert_equal(events[0].state, BFDState.down, BFDState)
1232 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1234 def test_echo_stop(self):
1235 """ echo function stops if peer sets required min echo rx zero """
1236 bfd_session_up(self)
1237 self.test_session.update(required_min_echo_rx=150000)
1238 self.test_session.send_packet()
1239 self.vapi.bfd_udp_set_echo_source(
1240 sw_if_index=self.loopback0.sw_if_index)
1241 # wait for first echo packet
1243 p = self.pg0.wait_for_packet(1)
1244 self.logger.debug(ppp("Got packet:", p))
1245 if p[UDP].dport == BFD.udp_dport_echo:
1246 self.logger.debug(ppp("Looping back packet:", p))
1247 p[Ether].dst = self.pg0.local_mac
1248 self.pg0.add_stream(p)
1251 elif p.haslayer(BFD):
1255 raise Exception(ppp("Received unknown packet:", p))
1256 self.test_session.update(required_min_echo_rx=0)
1257 self.test_session.send_packet()
1258 # echo packets shouldn't arrive anymore
1259 for dummy in range(5):
1260 wait_for_bfd_packet(
1261 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1262 self.test_session.send_packet()
1263 events = self.vapi.collect_events()
1264 self.assert_equal(len(events), 0, "number of bfd events")
1266 def test_echo_source_removed(self):
1267 """ echo function stops if echo source is removed """
1268 bfd_session_up(self)
1269 self.test_session.update(required_min_echo_rx=150000)
1270 self.test_session.send_packet()
1271 self.vapi.bfd_udp_set_echo_source(
1272 sw_if_index=self.loopback0.sw_if_index)
1273 # wait for first echo packet
1275 p = self.pg0.wait_for_packet(1)
1276 self.logger.debug(ppp("Got packet:", p))
1277 if p[UDP].dport == BFD.udp_dport_echo:
1278 self.logger.debug(ppp("Looping back packet:", p))
1279 p[Ether].dst = self.pg0.local_mac
1280 self.pg0.add_stream(p)
1283 elif p.haslayer(BFD):
1287 raise Exception(ppp("Received unknown packet:", p))
1288 self.vapi.bfd_udp_del_echo_source()
1289 self.test_session.send_packet()
1290 # echo packets shouldn't arrive anymore
1291 for dummy in range(5):
1292 wait_for_bfd_packet(
1293 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1294 self.test_session.send_packet()
1295 events = self.vapi.collect_events()
1296 self.assert_equal(len(events), 0, "number of bfd events")
1298 def test_stale_echo(self):
1299 """ stale echo packets don't keep a session up """
1300 bfd_session_up(self)
1301 self.test_session.update(required_min_echo_rx=150000)
1302 self.vapi.bfd_udp_set_echo_source(
1303 sw_if_index=self.loopback0.sw_if_index)
1304 self.test_session.send_packet()
1305 # should be turned on - loopback echo packets
1309 for dummy in range(10 * self.vpp_session.detect_mult):
1310 p = self.pg0.wait_for_packet(1)
1311 if p[UDP].dport == BFD.udp_dport_echo:
1312 if echo_packet is None:
1313 self.logger.debug(ppp("Got first echo packet:", p))
1315 timeout_at = time.time() + self.vpp_session.detect_mult * \
1316 self.test_session.required_min_echo_rx / USEC_IN_SEC
1318 self.logger.debug(ppp("Got followup echo packet:", p))
1319 self.logger.debug(ppp("Looping back first echo packet:", p))
1320 echo_packet[Ether].dst = self.pg0.local_mac
1321 self.pg0.add_stream(echo_packet)
1323 elif p.haslayer(BFD):
1324 self.logger.debug(ppp("Got packet:", p))
1325 if "P" in p.sprintf("%BFD.flags%"):
1326 final = self.test_session.create_packet()
1327 final[BFD].flags = "F"
1328 self.test_session.send_packet(final)
1329 if p[BFD].state == BFDState.down:
1330 self.assertIsNotNone(
1332 "Session went down before first echo packet received")
1334 self.assertGreaterEqual(
1336 "Session timeout at %s, but is expected at %s" %
1338 self.assert_equal(p[BFD].diag,
1339 BFDDiagCode.echo_function_failed,
1341 events = self.vapi.collect_events()
1342 self.assert_equal(len(events), 1, "number of bfd events")
1343 self.assert_equal(events[0].state, BFDState.down, BFDState)
1347 raise Exception(ppp("Received unknown packet:", p))
1348 self.test_session.send_packet()
1349 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1351 def test_invalid_echo_checksum(self):
1352 """ echo packets with invalid checksum don't keep a session up """
1353 bfd_session_up(self)
1354 self.test_session.update(required_min_echo_rx=150000)
1355 self.vapi.bfd_udp_set_echo_source(
1356 sw_if_index=self.loopback0.sw_if_index)
1357 self.test_session.send_packet()
1358 # should be turned on - loopback echo packets
1361 for dummy in range(10 * self.vpp_session.detect_mult):
1362 p = self.pg0.wait_for_packet(1)
1363 if p[UDP].dport == BFD.udp_dport_echo:
1364 self.logger.debug(ppp("Got echo packet:", p))
1365 if timeout_at is None:
1366 timeout_at = time.time() + self.vpp_session.detect_mult * \
1367 self.test_session.required_min_echo_rx / USEC_IN_SEC
1368 p[BFD_vpp_echo].checksum = getrandbits(64)
1369 p[Ether].dst = self.pg0.local_mac
1370 self.logger.debug(ppp("Looping back modified echo packet:", p))
1371 self.pg0.add_stream(p)
1373 elif p.haslayer(BFD):
1374 self.logger.debug(ppp("Got packet:", p))
1375 if "P" in p.sprintf("%BFD.flags%"):
1376 final = self.test_session.create_packet()
1377 final[BFD].flags = "F"
1378 self.test_session.send_packet(final)
1379 if p[BFD].state == BFDState.down:
1380 self.assertIsNotNone(
1382 "Session went down before first echo packet received")
1384 self.assertGreaterEqual(
1386 "Session timeout at %s, but is expected at %s" %
1388 self.assert_equal(p[BFD].diag,
1389 BFDDiagCode.echo_function_failed,
1391 events = self.vapi.collect_events()
1392 self.assert_equal(len(events), 1, "number of bfd events")
1393 self.assert_equal(events[0].state, BFDState.down, BFDState)
1397 raise Exception(ppp("Received unknown packet:", p))
1398 self.test_session.send_packet()
1399 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1401 def test_admin_up_down(self):
1402 """ put session admin-up and admin-down """
1403 bfd_session_up(self)
1404 self.vpp_session.admin_down()
1405 self.pg0.enable_capture()
1406 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1407 verify_event(self, e, expected_state=BFDState.admin_down)
1408 for dummy in range(2):
1409 p = wait_for_bfd_packet(self)
1410 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1411 # try to bring session up - shouldn't be possible
1412 self.test_session.update(state=BFDState.init)
1413 self.test_session.send_packet()
1414 for dummy in range(2):
1415 p = wait_for_bfd_packet(self)
1416 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1417 self.vpp_session.admin_up()
1418 self.test_session.update(state=BFDState.down)
1419 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1420 verify_event(self, e, expected_state=BFDState.down)
1421 p = wait_for_bfd_packet(
1422 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1423 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1424 self.test_session.send_packet()
1425 p = wait_for_bfd_packet(
1426 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1427 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1428 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1429 verify_event(self, e, expected_state=BFDState.init)
1430 self.test_session.update(state=BFDState.up)
1431 self.test_session.send_packet()
1432 p = wait_for_bfd_packet(
1433 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1434 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1435 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1436 verify_event(self, e, expected_state=BFDState.up)
1438 def test_config_change_remote_demand(self):
1439 """ configuration change while peer in demand mode """
1440 bfd_session_up(self)
1441 demand = self.test_session.create_packet()
1442 demand[BFD].flags = "D"
1443 self.test_session.send_packet(demand)
1444 self.vpp_session.modify_parameters(
1445 required_min_rx=2 * self.vpp_session.required_min_rx)
1446 p = wait_for_bfd_packet(
1447 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1448 # poll bit must be set
1449 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1450 # terminate poll sequence
1451 final = self.test_session.create_packet()
1452 final[BFD].flags = "D+F"
1453 self.test_session.send_packet(final)
1454 # vpp should be quiet now again
1455 transmit_time = 0.9 \
1456 * max(self.vpp_session.required_min_rx,
1457 self.test_session.desired_min_tx) \
1460 for dummy in range(self.test_session.detect_mult * 2):
1461 self.sleep(transmit_time)
1462 self.test_session.send_packet(demand)
1464 p = wait_for_bfd_packet(self, timeout=0)
1465 self.logger.error(ppp("Received unexpected packet:", p))
1467 except CaptureTimeoutError:
1469 events = self.vapi.collect_events()
1471 self.logger.error("Received unexpected event: %s", e)
1472 self.assert_equal(count, 0, "number of packets received")
1473 self.assert_equal(len(events), 0, "number of events received")
1475 def test_intf_deleted(self):
1476 """ interface with bfd session deleted """
1477 intf = VppLoInterface(self)
1480 sw_if_index = intf.sw_if_index
1481 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1482 vpp_session.add_vpp_config()
1483 vpp_session.admin_up()
1484 intf.remove_vpp_config()
1485 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1486 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1487 self.assertFalse(vpp_session.query_vpp_config())
1491 class BFD6TestCase(VppTestCase):
1492 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1495 vpp_clock_offset = None
1500 def setUpClass(cls):
1501 super(BFD6TestCase, cls).setUpClass()
1502 cls.vapi.cli("set log class bfd level debug")
1504 cls.create_pg_interfaces([0])
1505 cls.pg0.config_ip6()
1506 cls.pg0.configure_ipv6_neighbors()
1508 cls.pg0.resolve_ndp()
1509 cls.create_loopback_interfaces(1)
1510 cls.loopback0 = cls.lo_interfaces[0]
1511 cls.loopback0.config_ip6()
1512 cls.loopback0.admin_up()
1515 super(BFD6TestCase, cls).tearDownClass()
1519 def tearDownClass(cls):
1520 super(BFD6TestCase, cls).tearDownClass()
1523 super(BFD6TestCase, self).setUp()
1524 self.factory = AuthKeyFactory()
1525 self.vapi.want_bfd_events()
1526 self.pg0.enable_capture()
1528 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1529 self.pg0.remote_ip6,
1531 self.vpp_session.add_vpp_config()
1532 self.vpp_session.admin_up()
1533 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1534 self.logger.debug(self.vapi.cli("show adj nbr"))
1535 except BaseException:
1536 self.vapi.want_bfd_events(enable_disable=0)
1540 if not self.vpp_dead:
1541 self.vapi.want_bfd_events(enable_disable=0)
1542 self.vapi.collect_events() # clear the event queue
1543 super(BFD6TestCase, self).tearDown()
1545 def test_session_up(self):
1546 """ bring BFD session up """
1547 bfd_session_up(self)
1549 def test_session_up_by_ip(self):
1550 """ bring BFD session up - first frame looked up by address pair """
1551 self.logger.info("BFD: Sending Slow control frame")
1552 self.test_session.update(my_discriminator=randint(0, 40000000))
1553 self.test_session.send_packet()
1554 self.pg0.enable_capture()
1555 p = self.pg0.wait_for_packet(1)
1556 self.assert_equal(p[BFD].your_discriminator,
1557 self.test_session.my_discriminator,
1558 "BFD - your discriminator")
1559 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1560 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1562 self.logger.info("BFD: Waiting for event")
1563 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1564 verify_event(self, e, expected_state=BFDState.init)
1565 self.logger.info("BFD: Sending Up")
1566 self.test_session.send_packet()
1567 self.logger.info("BFD: Waiting for event")
1568 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1569 verify_event(self, e, expected_state=BFDState.up)
1570 self.logger.info("BFD: Session is Up")
1571 self.test_session.update(state=BFDState.up)
1572 self.test_session.send_packet()
1573 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1575 def test_hold_up(self):
1576 """ hold BFD session up """
1577 bfd_session_up(self)
1578 for dummy in range(self.test_session.detect_mult * 2):
1579 wait_for_bfd_packet(self)
1580 self.test_session.send_packet()
1581 self.assert_equal(len(self.vapi.collect_events()), 0,
1582 "number of bfd events")
1583 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1585 def test_echo_looped_back(self):
1586 """ echo packets looped back """
1587 # don't need a session in this case..
1588 self.vpp_session.remove_vpp_config()
1589 self.pg0.enable_capture()
1590 echo_packet_count = 10
1591 # random source port low enough to increment a few times..
1592 udp_sport_tx = randint(1, 50000)
1593 udp_sport_rx = udp_sport_tx
1594 echo_packet = (Ether(src=self.pg0.remote_mac,
1595 dst=self.pg0.local_mac) /
1596 IPv6(src=self.pg0.remote_ip6,
1597 dst=self.pg0.remote_ip6) /
1598 UDP(dport=BFD.udp_dport_echo) /
1599 Raw("this should be looped back"))
1600 for dummy in range(echo_packet_count):
1601 self.sleep(.01, "delay between echo packets")
1602 echo_packet[UDP].sport = udp_sport_tx
1604 self.logger.debug(ppp("Sending packet:", echo_packet))
1605 self.pg0.add_stream(echo_packet)
1607 for dummy in range(echo_packet_count):
1608 p = self.pg0.wait_for_packet(1)
1609 self.logger.debug(ppp("Got packet:", p))
1611 self.assert_equal(self.pg0.remote_mac,
1612 ether.dst, "Destination MAC")
1613 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1615 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1616 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1618 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1619 "UDP destination port")
1620 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1622 # need to compare the hex payload here, otherwise BFD_vpp_echo
1624 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1625 scapy.compat.raw(echo_packet[UDP].payload),
1626 "Received packet is not the echo packet sent")
1627 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1628 "ECHO packet identifier for test purposes)")
1629 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1630 "ECHO packet identifier for test purposes)")
1632 def test_echo(self):
1633 """ echo function """
1634 bfd_session_up(self)
1635 self.test_session.update(required_min_echo_rx=150000)
1636 self.test_session.send_packet()
1637 detection_time = self.test_session.detect_mult *\
1638 self.vpp_session.required_min_rx / USEC_IN_SEC
1639 # echo shouldn't work without echo source set
1640 for dummy in range(10):
1641 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1642 self.sleep(sleep, "delay before sending bfd packet")
1643 self.test_session.send_packet()
1644 p = wait_for_bfd_packet(
1645 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1646 self.assert_equal(p[BFD].required_min_rx_interval,
1647 self.vpp_session.required_min_rx,
1648 "BFD required min rx interval")
1649 self.test_session.send_packet()
1650 self.vapi.bfd_udp_set_echo_source(
1651 sw_if_index=self.loopback0.sw_if_index)
1653 # should be turned on - loopback echo packets
1654 for dummy in range(3):
1655 loop_until = time.time() + 0.75 * detection_time
1656 while time.time() < loop_until:
1657 p = self.pg0.wait_for_packet(1)
1658 self.logger.debug(ppp("Got packet:", p))
1659 if p[UDP].dport == BFD.udp_dport_echo:
1661 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1662 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1663 "BFD ECHO src IP equal to loopback IP")
1664 self.logger.debug(ppp("Looping back packet:", p))
1665 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1666 "ECHO packet destination MAC address")
1667 p[Ether].dst = self.pg0.local_mac
1668 self.pg0.add_stream(p)
1671 elif p.haslayer(BFD):
1673 self.assertGreaterEqual(
1674 p[BFD].required_min_rx_interval,
1676 if "P" in p.sprintf("%BFD.flags%"):
1677 final = self.test_session.create_packet()
1678 final[BFD].flags = "F"
1679 self.test_session.send_packet(final)
1681 raise Exception(ppp("Received unknown packet:", p))
1683 self.assert_equal(len(self.vapi.collect_events()), 0,
1684 "number of bfd events")
1685 self.test_session.send_packet()
1686 self.assertTrue(echo_seen, "No echo packets received")
1688 def test_intf_deleted(self):
1689 """ interface with bfd session deleted """
1690 intf = VppLoInterface(self)
1693 sw_if_index = intf.sw_if_index
1694 vpp_session = VppBFDUDPSession(
1695 self, intf, intf.remote_ip6, af=AF_INET6)
1696 vpp_session.add_vpp_config()
1697 vpp_session.admin_up()
1698 intf.remove_vpp_config()
1699 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1700 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1701 self.assertFalse(vpp_session.query_vpp_config())
1705 class BFDFIBTestCase(VppTestCase):
1706 """ BFD-FIB interactions (IPv6) """
1712 def setUpClass(cls):
1713 super(BFDFIBTestCase, cls).setUpClass()
1716 def tearDownClass(cls):
1717 super(BFDFIBTestCase, cls).tearDownClass()
1720 super(BFDFIBTestCase, self).setUp()
1721 self.create_pg_interfaces(range(1))
1723 self.vapi.want_bfd_events()
1724 self.pg0.enable_capture()
1726 for i in self.pg_interfaces:
1729 i.configure_ipv6_neighbors()
1732 if not self.vpp_dead:
1733 self.vapi.want_bfd_events(enable_disable=False)
1735 super(BFDFIBTestCase, self).tearDown()
1738 def pkt_is_not_data_traffic(p):
1739 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1740 if p.haslayer(BFD) or is_ipv6_misc(p):
1744 def test_session_with_fib(self):
1745 """ BFD-FIB interactions """
1747 # packets to match against both of the routes
1748 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1749 IPv6(src="3001::1", dst="2001::1") /
1750 UDP(sport=1234, dport=1234) /
1751 Raw(b'\xa5' * 100)),
1752 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1753 IPv6(src="3001::1", dst="2002::1") /
1754 UDP(sport=1234, dport=1234) /
1755 Raw(b'\xa5' * 100))]
1757 # A recursive and a non-recursive route via a next-hop that
1758 # will have a BFD session
1759 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1760 [VppRoutePath(self.pg0.remote_ip6,
1761 self.pg0.sw_if_index)])
1762 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1763 [VppRoutePath(self.pg0.remote_ip6,
1765 ip_2001_s_64.add_vpp_config()
1766 ip_2002_s_64.add_vpp_config()
1768 # bring the session up now the routes are present
1769 self.vpp_session = VppBFDUDPSession(self,
1771 self.pg0.remote_ip6,
1773 self.vpp_session.add_vpp_config()
1774 self.vpp_session.admin_up()
1775 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1777 # session is up - traffic passes
1778 bfd_session_up(self)
1780 self.pg0.add_stream(p)
1783 captured = self.pg0.wait_for_packet(
1785 filter_out_fn=self.pkt_is_not_data_traffic)
1786 self.assertEqual(captured[IPv6].dst,
1789 # session is up - traffic is dropped
1790 bfd_session_down(self)
1792 self.pg0.add_stream(p)
1794 with self.assertRaises(CaptureTimeoutError):
1795 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1797 # session is up - traffic passes
1798 bfd_session_up(self)
1800 self.pg0.add_stream(p)
1803 captured = self.pg0.wait_for_packet(
1805 filter_out_fn=self.pkt_is_not_data_traffic)
1806 self.assertEqual(captured[IPv6].dst,
1810 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1811 class BFDTunTestCase(VppTestCase):
1812 """ BFD over GRE tunnel """
1818 def setUpClass(cls):
1819 super(BFDTunTestCase, cls).setUpClass()
1822 def tearDownClass(cls):
1823 super(BFDTunTestCase, cls).tearDownClass()
1826 super(BFDTunTestCase, self).setUp()
1827 self.create_pg_interfaces(range(1))
1829 self.vapi.want_bfd_events()
1830 self.pg0.enable_capture()
1832 for i in self.pg_interfaces:
1838 if not self.vpp_dead:
1839 self.vapi.want_bfd_events(enable_disable=0)
1841 super(BFDTunTestCase, self).tearDown()
1844 def pkt_is_not_data_traffic(p):
1845 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1846 if p.haslayer(BFD) or is_ipv6_misc(p):
1850 def test_bfd_o_gre(self):
1853 # A GRE interface over which to run a BFD session
1854 gre_if = VppGreInterface(self,
1856 self.pg0.remote_ip4)
1857 gre_if.add_vpp_config()
1861 # bring the session up now the routes are present
1862 self.vpp_session = VppBFDUDPSession(self,
1866 self.vpp_session.add_vpp_config()
1867 self.vpp_session.admin_up()
1869 self.test_session = BFDTestSession(
1870 self, gre_if, AF_INET,
1871 tunnel_header=(IP(src=self.pg0.remote_ip4,
1872 dst=self.pg0.local_ip4) /
1874 phy_interface=self.pg0)
1876 # packets to match against both of the routes
1877 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1878 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1879 UDP(sport=1234, dport=1234) /
1880 Raw(b'\xa5' * 100))]
1882 # session is up - traffic passes
1883 bfd_session_up(self)
1885 self.send_and_expect(self.pg0, p, self.pg0)
1887 # bring session down
1888 bfd_session_down(self)
1892 class BFDSHA1TestCase(VppTestCase):
1893 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1896 vpp_clock_offset = None
1901 def setUpClass(cls):
1902 super(BFDSHA1TestCase, cls).setUpClass()
1903 cls.vapi.cli("set log class bfd level debug")
1905 cls.create_pg_interfaces([0])
1906 cls.pg0.config_ip4()
1908 cls.pg0.resolve_arp()
1911 super(BFDSHA1TestCase, cls).tearDownClass()
1915 def tearDownClass(cls):
1916 super(BFDSHA1TestCase, cls).tearDownClass()
1919 super(BFDSHA1TestCase, self).setUp()
1920 self.factory = AuthKeyFactory()
1921 self.vapi.want_bfd_events()
1922 self.pg0.enable_capture()
1925 if not self.vpp_dead:
1926 self.vapi.want_bfd_events(enable_disable=False)
1927 self.vapi.collect_events() # clear the event queue
1928 super(BFDSHA1TestCase, self).tearDown()
1930 def test_session_up(self):
1931 """ bring BFD session up """
1932 key = self.factory.create_random_key(self)
1933 key.add_vpp_config()
1934 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1935 self.pg0.remote_ip4,
1937 self.vpp_session.add_vpp_config()
1938 self.vpp_session.admin_up()
1939 self.test_session = BFDTestSession(
1940 self, self.pg0, AF_INET, sha1_key=key,
1941 bfd_key_id=self.vpp_session.bfd_key_id)
1942 bfd_session_up(self)
1944 def test_hold_up(self):
1945 """ hold BFD session up """
1946 key = self.factory.create_random_key(self)
1947 key.add_vpp_config()
1948 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1949 self.pg0.remote_ip4,
1951 self.vpp_session.add_vpp_config()
1952 self.vpp_session.admin_up()
1953 self.test_session = BFDTestSession(
1954 self, self.pg0, AF_INET, sha1_key=key,
1955 bfd_key_id=self.vpp_session.bfd_key_id)
1956 bfd_session_up(self)
1957 for dummy in range(self.test_session.detect_mult * 2):
1958 wait_for_bfd_packet(self)
1959 self.test_session.send_packet()
1960 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1962 def test_hold_up_meticulous(self):
1963 """ hold BFD session up - meticulous auth """
1964 key = self.factory.create_random_key(
1965 self, BFDAuthType.meticulous_keyed_sha1)
1966 key.add_vpp_config()
1967 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1968 self.pg0.remote_ip4, sha1_key=key)
1969 self.vpp_session.add_vpp_config()
1970 self.vpp_session.admin_up()
1971 # specify sequence number so that it wraps
1972 self.test_session = BFDTestSession(
1973 self, self.pg0, AF_INET, sha1_key=key,
1974 bfd_key_id=self.vpp_session.bfd_key_id,
1975 our_seq_number=0xFFFFFFFF - 4)
1976 bfd_session_up(self)
1977 for dummy in range(30):
1978 wait_for_bfd_packet(self)
1979 self.test_session.inc_seq_num()
1980 self.test_session.send_packet()
1981 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1983 def test_send_bad_seq_number(self):
1984 """ session is not kept alive by msgs with bad sequence numbers"""
1985 key = self.factory.create_random_key(
1986 self, BFDAuthType.meticulous_keyed_sha1)
1987 key.add_vpp_config()
1988 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1989 self.pg0.remote_ip4, sha1_key=key)
1990 self.vpp_session.add_vpp_config()
1991 self.test_session = BFDTestSession(
1992 self, self.pg0, AF_INET, sha1_key=key,
1993 bfd_key_id=self.vpp_session.bfd_key_id)
1994 bfd_session_up(self)
1995 detection_time = self.test_session.detect_mult *\
1996 self.vpp_session.required_min_rx / USEC_IN_SEC
1997 send_until = time.time() + 2 * detection_time
1998 while time.time() < send_until:
1999 self.test_session.send_packet()
2000 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2001 "time between bfd packets")
2002 e = self.vapi.collect_events()
2003 # session should be down now, because the sequence numbers weren't
2005 self.assert_equal(len(e), 1, "number of bfd events")
2006 verify_event(self, e[0], expected_state=BFDState.down)
2008 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2009 legitimate_test_session,
2011 rogue_bfd_values=None):
2012 """ execute a rogue session interaction scenario
2014 1. create vpp session, add config
2015 2. bring the legitimate session up
2016 3. copy the bfd values from legitimate session to rogue session
2017 4. apply rogue_bfd_values to rogue session
2018 5. set rogue session state to down
2019 6. send message to take the session down from the rogue session
2020 7. assert that the legitimate session is unaffected
2023 self.vpp_session = vpp_bfd_udp_session
2024 self.vpp_session.add_vpp_config()
2025 self.test_session = legitimate_test_session
2026 # bring vpp session up
2027 bfd_session_up(self)
2028 # send packet from rogue session
2029 rogue_test_session.update(
2030 my_discriminator=self.test_session.my_discriminator,
2031 your_discriminator=self.test_session.your_discriminator,
2032 desired_min_tx=self.test_session.desired_min_tx,
2033 required_min_rx=self.test_session.required_min_rx,
2034 detect_mult=self.test_session.detect_mult,
2035 diag=self.test_session.diag,
2036 state=self.test_session.state,
2037 auth_type=self.test_session.auth_type)
2038 if rogue_bfd_values:
2039 rogue_test_session.update(**rogue_bfd_values)
2040 rogue_test_session.update(state=BFDState.down)
2041 rogue_test_session.send_packet()
2042 wait_for_bfd_packet(self)
2043 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2045 def test_mismatch_auth(self):
2046 """ session is not brought down by unauthenticated msg """
2047 key = self.factory.create_random_key(self)
2048 key.add_vpp_config()
2049 vpp_session = VppBFDUDPSession(
2050 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2051 legitimate_test_session = BFDTestSession(
2052 self, self.pg0, AF_INET, sha1_key=key,
2053 bfd_key_id=vpp_session.bfd_key_id)
2054 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2055 self.execute_rogue_session_scenario(vpp_session,
2056 legitimate_test_session,
2059 def test_mismatch_bfd_key_id(self):
2060 """ session is not brought down by msg with non-existent key-id """
2061 key = self.factory.create_random_key(self)
2062 key.add_vpp_config()
2063 vpp_session = VppBFDUDPSession(
2064 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2065 # pick a different random bfd key id
2067 while x == vpp_session.bfd_key_id:
2069 legitimate_test_session = BFDTestSession(
2070 self, self.pg0, AF_INET, sha1_key=key,
2071 bfd_key_id=vpp_session.bfd_key_id)
2072 rogue_test_session = BFDTestSession(
2073 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2074 self.execute_rogue_session_scenario(vpp_session,
2075 legitimate_test_session,
2078 def test_mismatched_auth_type(self):
2079 """ session is not brought down by msg with wrong auth type """
2080 key = self.factory.create_random_key(self)
2081 key.add_vpp_config()
2082 vpp_session = VppBFDUDPSession(
2083 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2084 legitimate_test_session = BFDTestSession(
2085 self, self.pg0, AF_INET, sha1_key=key,
2086 bfd_key_id=vpp_session.bfd_key_id)
2087 rogue_test_session = BFDTestSession(
2088 self, self.pg0, AF_INET, sha1_key=key,
2089 bfd_key_id=vpp_session.bfd_key_id)
2090 self.execute_rogue_session_scenario(
2091 vpp_session, legitimate_test_session, rogue_test_session,
2092 {'auth_type': BFDAuthType.keyed_md5})
2094 def test_restart(self):
2095 """ simulate remote peer restart and resynchronization """
2096 key = self.factory.create_random_key(
2097 self, BFDAuthType.meticulous_keyed_sha1)
2098 key.add_vpp_config()
2099 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2100 self.pg0.remote_ip4, sha1_key=key)
2101 self.vpp_session.add_vpp_config()
2102 self.test_session = BFDTestSession(
2103 self, self.pg0, AF_INET, sha1_key=key,
2104 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2105 bfd_session_up(self)
2106 # don't send any packets for 2*detection_time
2107 detection_time = self.test_session.detect_mult *\
2108 self.vpp_session.required_min_rx / USEC_IN_SEC
2109 self.sleep(2 * detection_time, "simulating peer restart")
2110 events = self.vapi.collect_events()
2111 self.assert_equal(len(events), 1, "number of bfd events")
2112 verify_event(self, events[0], expected_state=BFDState.down)
2113 self.test_session.update(state=BFDState.down)
2114 # reset sequence number
2115 self.test_session.our_seq_number = 0
2116 self.test_session.vpp_seq_number = None
2117 # now throw away any pending packets
2118 self.pg0.enable_capture()
2119 self.test_session.my_discriminator = 0
2120 bfd_session_up(self)
2124 class BFDAuthOnOffTestCase(VppTestCase):
2125 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2132 def setUpClass(cls):
2133 super(BFDAuthOnOffTestCase, cls).setUpClass()
2134 cls.vapi.cli("set log class bfd level debug")
2136 cls.create_pg_interfaces([0])
2137 cls.pg0.config_ip4()
2139 cls.pg0.resolve_arp()
2142 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2146 def tearDownClass(cls):
2147 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2150 super(BFDAuthOnOffTestCase, self).setUp()
2151 self.factory = AuthKeyFactory()
2152 self.vapi.want_bfd_events()
2153 self.pg0.enable_capture()
2156 if not self.vpp_dead:
2157 self.vapi.want_bfd_events(enable_disable=False)
2158 self.vapi.collect_events() # clear the event queue
2159 super(BFDAuthOnOffTestCase, self).tearDown()
2161 def test_auth_on_immediate(self):
2162 """ turn auth on without disturbing session state (immediate) """
2163 key = self.factory.create_random_key(self)
2164 key.add_vpp_config()
2165 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2166 self.pg0.remote_ip4)
2167 self.vpp_session.add_vpp_config()
2168 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2169 bfd_session_up(self)
2170 for dummy in range(self.test_session.detect_mult * 2):
2171 p = wait_for_bfd_packet(self)
2172 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2173 self.test_session.send_packet()
2174 self.vpp_session.activate_auth(key)
2175 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2176 self.test_session.sha1_key = key
2177 for dummy in range(self.test_session.detect_mult * 2):
2178 p = wait_for_bfd_packet(self)
2179 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2180 self.test_session.send_packet()
2181 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2182 self.assert_equal(len(self.vapi.collect_events()), 0,
2183 "number of bfd events")
2185 def test_auth_off_immediate(self):
2186 """ turn auth off without disturbing session state (immediate) """
2187 key = self.factory.create_random_key(self)
2188 key.add_vpp_config()
2189 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2190 self.pg0.remote_ip4, sha1_key=key)
2191 self.vpp_session.add_vpp_config()
2192 self.test_session = BFDTestSession(
2193 self, self.pg0, AF_INET, sha1_key=key,
2194 bfd_key_id=self.vpp_session.bfd_key_id)
2195 bfd_session_up(self)
2196 # self.vapi.want_bfd_events(enable_disable=0)
2197 for dummy in range(self.test_session.detect_mult * 2):
2198 p = wait_for_bfd_packet(self)
2199 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2200 self.test_session.inc_seq_num()
2201 self.test_session.send_packet()
2202 self.vpp_session.deactivate_auth()
2203 self.test_session.bfd_key_id = None
2204 self.test_session.sha1_key = None
2205 for dummy in range(self.test_session.detect_mult * 2):
2206 p = wait_for_bfd_packet(self)
2207 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2208 self.test_session.inc_seq_num()
2209 self.test_session.send_packet()
2210 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2211 self.assert_equal(len(self.vapi.collect_events()), 0,
2212 "number of bfd events")
2214 def test_auth_change_key_immediate(self):
2215 """ change auth key without disturbing session state (immediate) """
2216 key1 = self.factory.create_random_key(self)
2217 key1.add_vpp_config()
2218 key2 = self.factory.create_random_key(self)
2219 key2.add_vpp_config()
2220 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2221 self.pg0.remote_ip4, sha1_key=key1)
2222 self.vpp_session.add_vpp_config()
2223 self.test_session = BFDTestSession(
2224 self, self.pg0, AF_INET, sha1_key=key1,
2225 bfd_key_id=self.vpp_session.bfd_key_id)
2226 bfd_session_up(self)
2227 for dummy in range(self.test_session.detect_mult * 2):
2228 p = wait_for_bfd_packet(self)
2229 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2230 self.test_session.send_packet()
2231 self.vpp_session.activate_auth(key2)
2232 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2233 self.test_session.sha1_key = key2
2234 for dummy in range(self.test_session.detect_mult * 2):
2235 p = wait_for_bfd_packet(self)
2236 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2237 self.test_session.send_packet()
2238 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2239 self.assert_equal(len(self.vapi.collect_events()), 0,
2240 "number of bfd events")
2242 def test_auth_on_delayed(self):
2243 """ turn auth on without disturbing session state (delayed) """
2244 key = self.factory.create_random_key(self)
2245 key.add_vpp_config()
2246 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2247 self.pg0.remote_ip4)
2248 self.vpp_session.add_vpp_config()
2249 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2250 bfd_session_up(self)
2251 for dummy in range(self.test_session.detect_mult * 2):
2252 wait_for_bfd_packet(self)
2253 self.test_session.send_packet()
2254 self.vpp_session.activate_auth(key, delayed=True)
2255 for dummy in range(self.test_session.detect_mult * 2):
2256 p = wait_for_bfd_packet(self)
2257 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2258 self.test_session.send_packet()
2259 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2260 self.test_session.sha1_key = key
2261 self.test_session.send_packet()
2262 for dummy in range(self.test_session.detect_mult * 2):
2263 p = wait_for_bfd_packet(self)
2264 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2265 self.test_session.send_packet()
2266 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2267 self.assert_equal(len(self.vapi.collect_events()), 0,
2268 "number of bfd events")
2270 def test_auth_off_delayed(self):
2271 """ turn auth off without disturbing session state (delayed) """
2272 key = self.factory.create_random_key(self)
2273 key.add_vpp_config()
2274 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2275 self.pg0.remote_ip4, sha1_key=key)
2276 self.vpp_session.add_vpp_config()
2277 self.test_session = BFDTestSession(
2278 self, self.pg0, AF_INET, sha1_key=key,
2279 bfd_key_id=self.vpp_session.bfd_key_id)
2280 bfd_session_up(self)
2281 for dummy in range(self.test_session.detect_mult * 2):
2282 p = wait_for_bfd_packet(self)
2283 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2284 self.test_session.send_packet()
2285 self.vpp_session.deactivate_auth(delayed=True)
2286 for dummy in range(self.test_session.detect_mult * 2):
2287 p = wait_for_bfd_packet(self)
2288 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2289 self.test_session.send_packet()
2290 self.test_session.bfd_key_id = None
2291 self.test_session.sha1_key = None
2292 self.test_session.send_packet()
2293 for dummy in range(self.test_session.detect_mult * 2):
2294 p = wait_for_bfd_packet(self)
2295 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2296 self.test_session.send_packet()
2297 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2298 self.assert_equal(len(self.vapi.collect_events()), 0,
2299 "number of bfd events")
2301 def test_auth_change_key_delayed(self):
2302 """ change auth key without disturbing session state (delayed) """
2303 key1 = self.factory.create_random_key(self)
2304 key1.add_vpp_config()
2305 key2 = self.factory.create_random_key(self)
2306 key2.add_vpp_config()
2307 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2308 self.pg0.remote_ip4, sha1_key=key1)
2309 self.vpp_session.add_vpp_config()
2310 self.vpp_session.admin_up()
2311 self.test_session = BFDTestSession(
2312 self, self.pg0, AF_INET, sha1_key=key1,
2313 bfd_key_id=self.vpp_session.bfd_key_id)
2314 bfd_session_up(self)
2315 for dummy in range(self.test_session.detect_mult * 2):
2316 p = wait_for_bfd_packet(self)
2317 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2318 self.test_session.send_packet()
2319 self.vpp_session.activate_auth(key2, delayed=True)
2320 for dummy in range(self.test_session.detect_mult * 2):
2321 p = wait_for_bfd_packet(self)
2322 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2323 self.test_session.send_packet()
2324 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2325 self.test_session.sha1_key = key2
2326 self.test_session.send_packet()
2327 for dummy in range(self.test_session.detect_mult * 2):
2328 p = wait_for_bfd_packet(self)
2329 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2330 self.test_session.send_packet()
2331 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2332 self.assert_equal(len(self.vapi.collect_events()), 0,
2333 "number of bfd events")
2337 class BFDCLITestCase(VppTestCase):
2338 """Bidirectional Forwarding Detection (BFD) (CLI) """
2342 def setUpClass(cls):
2343 super(BFDCLITestCase, cls).setUpClass()
2344 cls.vapi.cli("set log class bfd level debug")
2346 cls.create_pg_interfaces((0,))
2347 cls.pg0.config_ip4()
2348 cls.pg0.config_ip6()
2349 cls.pg0.resolve_arp()
2350 cls.pg0.resolve_ndp()
2353 super(BFDCLITestCase, cls).tearDownClass()
2357 def tearDownClass(cls):
2358 super(BFDCLITestCase, cls).tearDownClass()
2361 super(BFDCLITestCase, self).setUp()
2362 self.factory = AuthKeyFactory()
2363 self.pg0.enable_capture()
2367 self.vapi.want_bfd_events(enable_disable=False)
2368 except UnexpectedApiReturnValueError:
2369 # some tests aren't subscribed, so this is not an issue
2371 self.vapi.collect_events() # clear the event queue
2372 super(BFDCLITestCase, self).tearDown()
2374 def cli_verify_no_response(self, cli):
2375 """ execute a CLI, asserting that the response is empty """
2376 self.assert_equal(self.vapi.cli(cli),
2378 "CLI command response")
2380 def cli_verify_response(self, cli, expected):
2381 """ execute a CLI, asserting that the response matches expectation """
2383 reply = self.vapi.cli(cli)
2384 except CliFailedCommandError as cli_error:
2385 reply = str(cli_error)
2386 self.assert_equal(reply.strip(),
2388 "CLI command response")
2390 def test_show(self):
2391 """ show commands """
2392 k1 = self.factory.create_random_key(self)
2394 k2 = self.factory.create_random_key(
2395 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2397 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2399 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2402 self.logger.info(self.vapi.ppcli("show bfd keys"))
2403 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2404 self.logger.info(self.vapi.ppcli("show bfd"))
2406 def test_set_del_sha1_key(self):
2407 """ set/delete SHA1 auth key """
2408 k = self.factory.create_random_key(self)
2409 self.registry.register(k, self.logger)
2410 self.cli_verify_no_response(
2411 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2413 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2414 self.assertTrue(k.query_vpp_config())
2415 self.vpp_session = VppBFDUDPSession(
2416 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2417 self.vpp_session.add_vpp_config()
2418 self.test_session = \
2419 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2420 bfd_key_id=self.vpp_session.bfd_key_id)
2421 self.vapi.want_bfd_events()
2422 bfd_session_up(self)
2423 bfd_session_down(self)
2424 # try to replace the secret for the key - should fail because the key
2426 k2 = self.factory.create_random_key(self)
2427 self.cli_verify_response(
2428 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2430 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2431 "bfd key set: `bfd_auth_set_key' API call failed, "
2432 "rv=-103:BFD object in use")
2433 # manipulating the session using old secret should still work
2434 bfd_session_up(self)
2435 bfd_session_down(self)
2436 self.vpp_session.remove_vpp_config()
2437 self.cli_verify_no_response(
2438 "bfd key del conf-key-id %s" % k.conf_key_id)
2439 self.assertFalse(k.query_vpp_config())
2441 def test_set_del_meticulous_sha1_key(self):
2442 """ set/delete meticulous SHA1 auth key """
2443 k = self.factory.create_random_key(
2444 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2445 self.registry.register(k, self.logger)
2446 self.cli_verify_no_response(
2447 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2449 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2450 self.assertTrue(k.query_vpp_config())
2451 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2452 self.pg0.remote_ip6, af=AF_INET6,
2454 self.vpp_session.add_vpp_config()
2455 self.vpp_session.admin_up()
2456 self.test_session = \
2457 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2458 bfd_key_id=self.vpp_session.bfd_key_id)
2459 self.vapi.want_bfd_events()
2460 bfd_session_up(self)
2461 bfd_session_down(self)
2462 # try to replace the secret for the key - should fail because the key
2464 k2 = self.factory.create_random_key(self)
2465 self.cli_verify_response(
2466 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2468 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2469 "bfd key set: `bfd_auth_set_key' API call failed, "
2470 "rv=-103:BFD object in use")
2471 # manipulating the session using old secret should still work
2472 bfd_session_up(self)
2473 bfd_session_down(self)
2474 self.vpp_session.remove_vpp_config()
2475 self.cli_verify_no_response(
2476 "bfd key del conf-key-id %s" % k.conf_key_id)
2477 self.assertFalse(k.query_vpp_config())
2479 def test_add_mod_del_bfd_udp(self):
2480 """ create/modify/delete IPv4 BFD UDP session """
2481 vpp_session = VppBFDUDPSession(
2482 self, self.pg0, self.pg0.remote_ip4)
2483 self.registry.register(vpp_session, self.logger)
2484 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2485 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2486 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2487 self.pg0.remote_ip4,
2488 vpp_session.desired_min_tx,
2489 vpp_session.required_min_rx,
2490 vpp_session.detect_mult)
2491 self.cli_verify_no_response(cli_add_cmd)
2492 # 2nd add should fail
2493 self.cli_verify_response(
2495 "bfd udp session add: `bfd_add_add_session' API call"
2496 " failed, rv=-101:Duplicate BFD object")
2497 verify_bfd_session_config(self, vpp_session)
2498 mod_session = VppBFDUDPSession(
2499 self, self.pg0, self.pg0.remote_ip4,
2500 required_min_rx=2 * vpp_session.required_min_rx,
2501 desired_min_tx=3 * vpp_session.desired_min_tx,
2502 detect_mult=4 * vpp_session.detect_mult)
2503 self.cli_verify_no_response(
2504 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2505 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2506 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2507 mod_session.desired_min_tx, mod_session.required_min_rx,
2508 mod_session.detect_mult))
2509 verify_bfd_session_config(self, mod_session)
2510 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2511 "peer-addr %s" % (self.pg0.name,
2512 self.pg0.local_ip4, self.pg0.remote_ip4)
2513 self.cli_verify_no_response(cli_del_cmd)
2514 # 2nd del is expected to fail
2515 self.cli_verify_response(
2516 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2517 " failed, rv=-102:No such BFD object")
2518 self.assertFalse(vpp_session.query_vpp_config())
2520 def test_add_mod_del_bfd_udp6(self):
2521 """ create/modify/delete IPv6 BFD UDP session """
2522 vpp_session = VppBFDUDPSession(
2523 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2524 self.registry.register(vpp_session, self.logger)
2525 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2526 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2527 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2528 self.pg0.remote_ip6,
2529 vpp_session.desired_min_tx,
2530 vpp_session.required_min_rx,
2531 vpp_session.detect_mult)
2532 self.cli_verify_no_response(cli_add_cmd)
2533 # 2nd add should fail
2534 self.cli_verify_response(
2536 "bfd udp session add: `bfd_add_add_session' API call"
2537 " failed, rv=-101:Duplicate BFD object")
2538 verify_bfd_session_config(self, vpp_session)
2539 mod_session = VppBFDUDPSession(
2540 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2541 required_min_rx=2 * vpp_session.required_min_rx,
2542 desired_min_tx=3 * vpp_session.desired_min_tx,
2543 detect_mult=4 * vpp_session.detect_mult)
2544 self.cli_verify_no_response(
2545 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2546 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2547 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2548 mod_session.desired_min_tx,
2549 mod_session.required_min_rx, mod_session.detect_mult))
2550 verify_bfd_session_config(self, mod_session)
2551 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2552 "peer-addr %s" % (self.pg0.name,
2553 self.pg0.local_ip6, self.pg0.remote_ip6)
2554 self.cli_verify_no_response(cli_del_cmd)
2555 # 2nd del is expected to fail
2556 self.cli_verify_response(
2558 "bfd udp session del: `bfd_udp_del_session' API call"
2559 " failed, rv=-102:No such BFD object")
2560 self.assertFalse(vpp_session.query_vpp_config())
2562 def test_add_mod_del_bfd_udp_auth(self):
2563 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2564 key = self.factory.create_random_key(self)
2565 key.add_vpp_config()
2566 vpp_session = VppBFDUDPSession(
2567 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2568 self.registry.register(vpp_session, self.logger)
2569 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2570 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2571 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2572 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2573 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2574 vpp_session.detect_mult, key.conf_key_id,
2575 vpp_session.bfd_key_id)
2576 self.cli_verify_no_response(cli_add_cmd)
2577 # 2nd add should fail
2578 self.cli_verify_response(
2580 "bfd udp session add: `bfd_add_add_session' API call"
2581 " failed, rv=-101:Duplicate BFD object")
2582 verify_bfd_session_config(self, vpp_session)
2583 mod_session = VppBFDUDPSession(
2584 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2585 bfd_key_id=vpp_session.bfd_key_id,
2586 required_min_rx=2 * vpp_session.required_min_rx,
2587 desired_min_tx=3 * vpp_session.desired_min_tx,
2588 detect_mult=4 * vpp_session.detect_mult)
2589 self.cli_verify_no_response(
2590 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2591 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2592 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2593 mod_session.desired_min_tx,
2594 mod_session.required_min_rx, mod_session.detect_mult))
2595 verify_bfd_session_config(self, mod_session)
2596 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2597 "peer-addr %s" % (self.pg0.name,
2598 self.pg0.local_ip4, self.pg0.remote_ip4)
2599 self.cli_verify_no_response(cli_del_cmd)
2600 # 2nd del is expected to fail
2601 self.cli_verify_response(
2603 "bfd udp session del: `bfd_udp_del_session' API call"
2604 " failed, rv=-102:No such BFD object")
2605 self.assertFalse(vpp_session.query_vpp_config())
2607 def test_add_mod_del_bfd_udp6_auth(self):
2608 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2609 key = self.factory.create_random_key(
2610 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2611 key.add_vpp_config()
2612 vpp_session = VppBFDUDPSession(
2613 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2614 self.registry.register(vpp_session, self.logger)
2615 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2616 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2617 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2618 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2619 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2620 vpp_session.detect_mult, key.conf_key_id,
2621 vpp_session.bfd_key_id)
2622 self.cli_verify_no_response(cli_add_cmd)
2623 # 2nd add should fail
2624 self.cli_verify_response(
2626 "bfd udp session add: `bfd_add_add_session' API call"
2627 " failed, rv=-101:Duplicate BFD object")
2628 verify_bfd_session_config(self, vpp_session)
2629 mod_session = VppBFDUDPSession(
2630 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2631 bfd_key_id=vpp_session.bfd_key_id,
2632 required_min_rx=2 * vpp_session.required_min_rx,
2633 desired_min_tx=3 * vpp_session.desired_min_tx,
2634 detect_mult=4 * vpp_session.detect_mult)
2635 self.cli_verify_no_response(
2636 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2637 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2638 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2639 mod_session.desired_min_tx,
2640 mod_session.required_min_rx, mod_session.detect_mult))
2641 verify_bfd_session_config(self, mod_session)
2642 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2643 "peer-addr %s" % (self.pg0.name,
2644 self.pg0.local_ip6, self.pg0.remote_ip6)
2645 self.cli_verify_no_response(cli_del_cmd)
2646 # 2nd del is expected to fail
2647 self.cli_verify_response(
2649 "bfd udp session del: `bfd_udp_del_session' API call"
2650 " failed, rv=-102:No such BFD object")
2651 self.assertFalse(vpp_session.query_vpp_config())
2653 def test_auth_on_off(self):
2654 """ turn authentication on and off """
2655 key = self.factory.create_random_key(
2656 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2657 key.add_vpp_config()
2658 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2659 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2661 session.add_vpp_config()
2663 "bfd udp session auth activate interface %s local-addr %s "\
2664 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2665 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2666 key.conf_key_id, auth_session.bfd_key_id)
2667 self.cli_verify_no_response(cli_activate)
2668 verify_bfd_session_config(self, auth_session)
2669 self.cli_verify_no_response(cli_activate)
2670 verify_bfd_session_config(self, auth_session)
2672 "bfd udp session auth deactivate interface %s local-addr %s "\
2674 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2675 self.cli_verify_no_response(cli_deactivate)
2676 verify_bfd_session_config(self, session)
2677 self.cli_verify_no_response(cli_deactivate)
2678 verify_bfd_session_config(self, session)
2680 def test_auth_on_off_delayed(self):
2681 """ turn authentication on and off (delayed) """
2682 key = self.factory.create_random_key(
2683 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2684 key.add_vpp_config()
2685 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2686 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2688 session.add_vpp_config()
2690 "bfd udp session auth activate interface %s local-addr %s "\
2691 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2692 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2693 key.conf_key_id, auth_session.bfd_key_id)
2694 self.cli_verify_no_response(cli_activate)
2695 verify_bfd_session_config(self, auth_session)
2696 self.cli_verify_no_response(cli_activate)
2697 verify_bfd_session_config(self, auth_session)
2699 "bfd udp session auth deactivate interface %s local-addr %s "\
2700 "peer-addr %s delayed yes"\
2701 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2702 self.cli_verify_no_response(cli_deactivate)
2703 verify_bfd_session_config(self, session)
2704 self.cli_verify_no_response(cli_deactivate)
2705 verify_bfd_session_config(self, session)
2707 def test_admin_up_down(self):
2708 """ put session admin-up and admin-down """
2709 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2710 session.add_vpp_config()
2712 "bfd udp session set-flags admin down interface %s local-addr %s "\
2714 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2716 "bfd udp session set-flags admin up interface %s local-addr %s "\
2718 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2719 self.cli_verify_no_response(cli_down)
2720 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2721 self.cli_verify_no_response(cli_up)
2722 verify_bfd_session_config(self, session, state=BFDState.down)
2724 def test_set_del_udp_echo_source(self):
2725 """ set/del udp echo source """
2726 self.create_loopback_interfaces(1)
2727 self.loopback0 = self.lo_interfaces[0]
2728 self.loopback0.admin_up()
2729 self.cli_verify_response("show bfd echo-source",
2730 "UDP echo source is not set.")
2731 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2732 self.cli_verify_no_response(cli_set)
2733 self.cli_verify_response("show bfd echo-source",
2734 "UDP echo source is: %s\n"
2735 "IPv4 address usable as echo source: none\n"
2736 "IPv6 address usable as echo source: none" %
2737 self.loopback0.name)
2738 self.loopback0.config_ip4()
2739 echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2740 self.loopback0.local_ip4)) ^ 1))
2741 self.cli_verify_response("show bfd echo-source",
2742 "UDP echo source is: %s\n"
2743 "IPv4 address usable as echo source: %s\n"
2744 "IPv6 address usable as echo source: none" %
2745 (self.loopback0.name, echo_ip4))
2746 echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2747 self.loopback0.local_ip6)) ^ 1))
2748 self.loopback0.config_ip6()
2749 self.cli_verify_response("show bfd echo-source",
2750 "UDP echo source is: %s\n"
2751 "IPv4 address usable as echo source: %s\n"
2752 "IPv6 address usable as echo source: %s" %
2753 (self.loopback0.name, echo_ip4, echo_ip6))
2754 cli_del = "bfd udp echo-source del"
2755 self.cli_verify_no_response(cli_del)
2756 self.cli_verify_response("show bfd echo-source",
2757 "UDP echo source is not set.")
2760 if __name__ == '__main__':
2761 unittest.main(testRunner=VppTestRunner)