4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22 BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError
29 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
30 from vpp_gre_interface import VppGreInterface
35 class AuthKeyFactory(object):
36 """Factory class for creating auth keys with unique conf key ID"""
39 self._conf_key_ids = {}
41 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
42 """ create a random key with unique conf key id """
43 conf_key_id = randint(0, 0xFFFFFFFF)
44 while conf_key_id in self._conf_key_ids:
45 conf_key_id = randint(0, 0xFFFFFFFF)
46 self._conf_key_ids[conf_key_id] = 1
47 key = scapy.compat.raw(
48 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
49 return VppBFDAuthKey(test=test, auth_type=auth_type,
50 conf_key_id=conf_key_id, key=key)
53 @unittest.skipUnless(running_extended_tests, "part of extended tests")
54 class BFDAPITestCase(VppTestCase):
55 """Bidirectional Forwarding Detection (BFD) - API"""
62 super(BFDAPITestCase, cls).setUpClass()
63 cls.vapi.cli("set log class bfd level debug")
65 cls.create_pg_interfaces(range(2))
66 for i in cls.pg_interfaces:
72 super(BFDAPITestCase, cls).tearDownClass()
76 def tearDownClass(cls):
77 super(BFDAPITestCase, cls).tearDownClass()
80 super(BFDAPITestCase, self).setUp()
81 self.factory = AuthKeyFactory()
83 def test_add_bfd(self):
84 """ create a BFD session """
85 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
86 session.add_vpp_config()
87 self.logger.debug("Session state is %s", session.state)
88 session.remove_vpp_config()
89 session.add_vpp_config()
90 self.logger.debug("Session state is %s", session.state)
91 session.remove_vpp_config()
93 def test_double_add(self):
94 """ create the same BFD session twice (negative case) """
95 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
96 session.add_vpp_config()
98 with self.vapi.assert_negative_api_retval():
99 session.add_vpp_config()
101 session.remove_vpp_config()
103 def test_add_bfd6(self):
104 """ create IPv6 BFD session """
105 session = VppBFDUDPSession(
106 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
107 session.add_vpp_config()
108 self.logger.debug("Session state is %s", session.state)
109 session.remove_vpp_config()
110 session.add_vpp_config()
111 self.logger.debug("Session state is %s", session.state)
112 session.remove_vpp_config()
114 def test_mod_bfd(self):
115 """ modify BFD session parameters """
116 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
117 desired_min_tx=50000,
118 required_min_rx=10000,
120 session.add_vpp_config()
121 s = session.get_bfd_udp_session_dump_entry()
122 self.assert_equal(session.desired_min_tx,
124 "desired min transmit interval")
125 self.assert_equal(session.required_min_rx,
127 "required min receive interval")
128 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
130 required_min_rx=session.required_min_rx * 2,
131 detect_mult=session.detect_mult * 2)
132 s = session.get_bfd_udp_session_dump_entry()
133 self.assert_equal(session.desired_min_tx,
135 "desired min transmit interval")
136 self.assert_equal(session.required_min_rx,
138 "required min receive interval")
139 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
141 def test_add_sha1_keys(self):
142 """ add SHA1 keys """
144 keys = [self.factory.create_random_key(
145 self) for i in range(0, key_count)]
147 self.assertFalse(key.query_vpp_config())
151 self.assertTrue(key.query_vpp_config())
153 indexes = range(key_count)
158 key.remove_vpp_config()
160 for j in range(key_count):
163 self.assertFalse(key.query_vpp_config())
165 self.assertTrue(key.query_vpp_config())
166 # should be removed now
168 self.assertFalse(key.query_vpp_config())
169 # add back and remove again
173 self.assertTrue(key.query_vpp_config())
175 key.remove_vpp_config()
177 self.assertFalse(key.query_vpp_config())
179 def test_add_bfd_sha1(self):
180 """ create a BFD session (SHA1) """
181 key = self.factory.create_random_key(self)
183 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185 session.add_vpp_config()
186 self.logger.debug("Session state is %s", session.state)
187 session.remove_vpp_config()
188 session.add_vpp_config()
189 self.logger.debug("Session state is %s", session.state)
190 session.remove_vpp_config()
192 def test_double_add_sha1(self):
193 """ create the same BFD session twice (negative case) (SHA1) """
194 key = self.factory.create_random_key(self)
196 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
198 session.add_vpp_config()
199 with self.assertRaises(Exception):
200 session.add_vpp_config()
202 def test_add_auth_nonexistent_key(self):
203 """ create BFD session using non-existent SHA1 (negative case) """
204 session = VppBFDUDPSession(
205 self, self.pg0, self.pg0.remote_ip4,
206 sha1_key=self.factory.create_random_key(self))
207 with self.assertRaises(Exception):
208 session.add_vpp_config()
210 def test_shared_sha1_key(self):
211 """ share single SHA1 key between multiple BFD sessions """
212 key = self.factory.create_random_key(self)
215 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
218 sha1_key=key, af=AF_INET6),
219 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
221 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
222 sha1_key=key, af=AF_INET6)]
227 e = key.get_bfd_auth_keys_dump_entry()
228 self.assert_equal(e.use_count, len(sessions) - removed,
229 "Use count for shared key")
230 s.remove_vpp_config()
232 e = key.get_bfd_auth_keys_dump_entry()
233 self.assert_equal(e.use_count, len(sessions) - removed,
234 "Use count for shared key")
236 def test_activate_auth(self):
237 """ activate SHA1 authentication """
238 key = self.factory.create_random_key(self)
240 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
241 session.add_vpp_config()
242 session.activate_auth(key)
244 def test_deactivate_auth(self):
245 """ deactivate SHA1 authentication """
246 key = self.factory.create_random_key(self)
248 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
249 session.add_vpp_config()
250 session.activate_auth(key)
251 session.deactivate_auth()
253 def test_change_key(self):
254 """ change SHA1 key """
255 key1 = self.factory.create_random_key(self)
256 key2 = self.factory.create_random_key(self)
257 while key2.conf_key_id == key1.conf_key_id:
258 key2 = self.factory.create_random_key(self)
259 key1.add_vpp_config()
260 key2.add_vpp_config()
261 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
263 session.add_vpp_config()
264 session.activate_auth(key2)
266 def test_set_del_udp_echo_source(self):
267 """ set/del udp echo source """
268 self.create_loopback_interfaces(1)
269 self.loopback0 = self.lo_interfaces[0]
270 self.loopback0.admin_up()
271 echo_source = self.vapi.bfd_udp_get_echo_source()
272 self.assertFalse(echo_source.is_set)
273 self.assertFalse(echo_source.have_usable_ip4)
274 self.assertFalse(echo_source.have_usable_ip6)
276 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
277 echo_source = self.vapi.bfd_udp_get_echo_source()
278 self.assertTrue(echo_source.is_set)
279 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
280 self.assertFalse(echo_source.have_usable_ip4)
281 self.assertFalse(echo_source.have_usable_ip6)
283 self.loopback0.config_ip4()
284 unpacked = unpack("!L", self.loopback0.local_ip4n)
285 echo_ip4 = pack("!L", unpacked[0] ^ 1)
286 echo_source = self.vapi.bfd_udp_get_echo_source()
287 self.assertTrue(echo_source.is_set)
288 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
289 self.assertTrue(echo_source.have_usable_ip4)
290 self.assertEqual(echo_source.ip4_addr, echo_ip4)
291 self.assertFalse(echo_source.have_usable_ip6)
293 self.loopback0.config_ip6()
294 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
295 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
297 echo_source = self.vapi.bfd_udp_get_echo_source()
298 self.assertTrue(echo_source.is_set)
299 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
300 self.assertTrue(echo_source.have_usable_ip4)
301 self.assertEqual(echo_source.ip4_addr, echo_ip4)
302 self.assertTrue(echo_source.have_usable_ip6)
303 self.assertEqual(echo_source.ip6_addr, echo_ip6)
305 self.vapi.bfd_udp_del_echo_source()
306 echo_source = self.vapi.bfd_udp_get_echo_source()
307 self.assertFalse(echo_source.is_set)
308 self.assertFalse(echo_source.have_usable_ip4)
309 self.assertFalse(echo_source.have_usable_ip6)
312 class BFDTestSession(object):
313 """ BFD session as seen from test framework side """
315 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
316 bfd_key_id=None, our_seq_number=None,
317 tunnel_header=None, phy_interface=None):
320 self.sha1_key = sha1_key
321 self.bfd_key_id = bfd_key_id
322 self.interface = interface
324 self.phy_interface = phy_interface
326 self.phy_interface = self.interface
327 self.udp_sport = randint(49152, 65535)
328 if our_seq_number is None:
329 self.our_seq_number = randint(0, 40000000)
331 self.our_seq_number = our_seq_number
332 self.vpp_seq_number = None
333 self.my_discriminator = 0
334 self.desired_min_tx = 300000
335 self.required_min_rx = 300000
336 self.required_min_echo_rx = None
337 self.detect_mult = detect_mult
338 self.diag = BFDDiagCode.no_diagnostic
339 self.your_discriminator = None
340 self.state = BFDState.down
341 self.auth_type = BFDAuthType.no_auth
342 self.tunnel_header = tunnel_header
344 def inc_seq_num(self):
345 """ increment sequence number, wrapping if needed """
346 if self.our_seq_number == 0xFFFFFFFF:
347 self.our_seq_number = 0
349 self.our_seq_number += 1
351 def update(self, my_discriminator=None, your_discriminator=None,
352 desired_min_tx=None, required_min_rx=None,
353 required_min_echo_rx=None, detect_mult=None,
354 diag=None, state=None, auth_type=None):
355 """ update BFD parameters associated with session """
356 if my_discriminator is not None:
357 self.my_discriminator = my_discriminator
358 if your_discriminator is not None:
359 self.your_discriminator = your_discriminator
360 if required_min_rx is not None:
361 self.required_min_rx = required_min_rx
362 if required_min_echo_rx is not None:
363 self.required_min_echo_rx = required_min_echo_rx
364 if desired_min_tx is not None:
365 self.desired_min_tx = desired_min_tx
366 if detect_mult is not None:
367 self.detect_mult = detect_mult
370 if state is not None:
372 if auth_type is not None:
373 self.auth_type = auth_type
375 def fill_packet_fields(self, packet):
376 """ set packet fields with known values in packet """
378 if self.my_discriminator:
379 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
380 self.my_discriminator)
381 bfd.my_discriminator = self.my_discriminator
382 if self.your_discriminator:
383 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
384 self.your_discriminator)
385 bfd.your_discriminator = self.your_discriminator
386 if self.required_min_rx:
387 self.test.logger.debug(
388 "BFD: setting packet.required_min_rx_interval=%s",
389 self.required_min_rx)
390 bfd.required_min_rx_interval = self.required_min_rx
391 if self.required_min_echo_rx:
392 self.test.logger.debug(
393 "BFD: setting packet.required_min_echo_rx=%s",
394 self.required_min_echo_rx)
395 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
396 if self.desired_min_tx:
397 self.test.logger.debug(
398 "BFD: setting packet.desired_min_tx_interval=%s",
400 bfd.desired_min_tx_interval = self.desired_min_tx
402 self.test.logger.debug(
403 "BFD: setting packet.detect_mult=%s", self.detect_mult)
404 bfd.detect_mult = self.detect_mult
406 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
409 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
410 bfd.state = self.state
412 # this is used by a negative test-case
413 self.test.logger.debug("BFD: setting packet.auth_type=%s",
415 bfd.auth_type = self.auth_type
417 def create_packet(self):
418 """ create a BFD packet, reflecting the current state of session """
421 bfd.auth_type = self.sha1_key.auth_type
422 bfd.auth_len = BFD.sha1_auth_len
423 bfd.auth_key_id = self.bfd_key_id
424 bfd.auth_seq_num = self.our_seq_number
425 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
428 packet = Ether(src=self.phy_interface.remote_mac,
429 dst=self.phy_interface.local_mac)
430 if self.tunnel_header:
431 packet = packet / self.tunnel_header
432 if self.af == AF_INET6:
434 IPv6(src=self.interface.remote_ip6,
435 dst=self.interface.local_ip6,
437 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
441 IP(src=self.interface.remote_ip4,
442 dst=self.interface.local_ip4,
444 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
446 self.test.logger.debug("BFD: Creating packet")
447 self.fill_packet_fields(packet)
449 hash_material = scapy.compat.raw(
450 packet[BFD])[:32] + self.sha1_key.key + \
451 "\0" * (20 - len(self.sha1_key.key))
452 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
453 hashlib.sha1(hash_material).hexdigest())
454 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
457 def send_packet(self, packet=None, interface=None):
458 """ send packet on interface, creating the packet if needed """
460 packet = self.create_packet()
461 if interface is None:
462 interface = self.phy_interface
463 self.test.logger.debug(ppp("Sending packet:", packet))
464 interface.add_stream(packet)
467 def verify_sha1_auth(self, packet):
468 """ Verify correctness of authentication in BFD layer. """
470 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
471 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
473 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
474 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
475 if self.vpp_seq_number is None:
476 self.vpp_seq_number = bfd.auth_seq_num
477 self.test.logger.debug("Received initial sequence number: %s" %
480 recvd_seq_num = bfd.auth_seq_num
481 self.test.logger.debug("Received followup sequence number: %s" %
483 if self.vpp_seq_number < 0xffffffff:
484 if self.sha1_key.auth_type == \
485 BFDAuthType.meticulous_keyed_sha1:
486 self.test.assert_equal(recvd_seq_num,
487 self.vpp_seq_number + 1,
488 "BFD sequence number")
490 self.test.assert_in_range(recvd_seq_num,
492 self.vpp_seq_number + 1,
493 "BFD sequence number")
495 if self.sha1_key.auth_type == \
496 BFDAuthType.meticulous_keyed_sha1:
497 self.test.assert_equal(recvd_seq_num, 0,
498 "BFD sequence number")
500 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
501 "BFD sequence number not one of "
502 "(%s, 0)" % self.vpp_seq_number)
503 self.vpp_seq_number = recvd_seq_num
504 # last 20 bytes represent the hash - so replace them with the key,
505 # pad the result with zeros and hash the result
506 hash_material = bfd.original[:-20] + self.sha1_key.key + \
507 b"\0" * (20 - len(self.sha1_key.key))
508 expected_hash = hashlib.sha1(hash_material).hexdigest()
509 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
510 expected_hash, "Auth key hash")
512 def verify_bfd(self, packet):
513 """ Verify correctness of BFD layer. """
515 self.test.assert_equal(bfd.version, 1, "BFD version")
516 self.test.assert_equal(bfd.your_discriminator,
517 self.my_discriminator,
518 "BFD - your discriminator")
520 self.verify_sha1_auth(packet)
523 def bfd_session_up(test):
524 """ Bring BFD session up """
525 test.logger.info("BFD: Waiting for slow hello")
526 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
528 if hasattr(test, 'vpp_clock_offset'):
529 old_offset = test.vpp_clock_offset
530 test.vpp_clock_offset = time.time() - p.time
531 test.logger.debug("BFD: Calculated vpp clock offset: %s",
532 test.vpp_clock_offset)
534 test.assertAlmostEqual(
535 old_offset, test.vpp_clock_offset, delta=0.5,
536 msg="vpp clock offset not stable (new: %s, old: %s)" %
537 (test.vpp_clock_offset, old_offset))
538 test.logger.info("BFD: Sending Init")
539 test.test_session.update(my_discriminator=randint(0, 40000000),
540 your_discriminator=p[BFD].my_discriminator,
542 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
543 BFDAuthType.meticulous_keyed_sha1:
544 test.test_session.inc_seq_num()
545 test.test_session.send_packet()
546 test.logger.info("BFD: Waiting for event")
547 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
548 verify_event(test, e, expected_state=BFDState.up)
549 test.logger.info("BFD: Session is Up")
550 test.test_session.update(state=BFDState.up)
551 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
552 BFDAuthType.meticulous_keyed_sha1:
553 test.test_session.inc_seq_num()
554 test.test_session.send_packet()
555 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
558 def bfd_session_down(test):
559 """ Bring BFD session down """
560 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561 test.test_session.update(state=BFDState.down)
562 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
563 BFDAuthType.meticulous_keyed_sha1:
564 test.test_session.inc_seq_num()
565 test.test_session.send_packet()
566 test.logger.info("BFD: Waiting for event")
567 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
568 verify_event(test, e, expected_state=BFDState.down)
569 test.logger.info("BFD: Session is Down")
570 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
573 def verify_bfd_session_config(test, session, state=None):
574 dump = session.get_bfd_udp_session_dump_entry()
575 test.assertIsNotNone(dump)
576 # since dump is not none, we have verified that sw_if_index and addresses
577 # are valid (in get_bfd_udp_session_dump_entry)
579 test.assert_equal(dump.state, state, "session state")
580 test.assert_equal(dump.required_min_rx, session.required_min_rx,
581 "required min rx interval")
582 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
583 "desired min tx interval")
584 test.assert_equal(dump.detect_mult, session.detect_mult,
586 if session.sha1_key is None:
587 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
589 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
590 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
592 test.assert_equal(dump.conf_key_id,
593 session.sha1_key.conf_key_id,
597 def verify_ip(test, packet):
598 """ Verify correctness of IP layer. """
599 if test.vpp_session.af == AF_INET6:
601 local_ip = test.vpp_session.interface.local_ip6
602 remote_ip = test.vpp_session.interface.remote_ip6
603 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
606 local_ip = test.vpp_session.interface.local_ip4
607 remote_ip = test.vpp_session.interface.remote_ip4
608 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
609 test.assert_equal(ip.src, local_ip, "IP source address")
610 test.assert_equal(ip.dst, remote_ip, "IP destination address")
613 def verify_udp(test, packet):
614 """ Verify correctness of UDP layer. """
616 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
617 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
621 def verify_event(test, event, expected_state):
622 """ Verify correctness of event values. """
624 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
625 test.assert_equal(e.sw_if_index,
626 test.vpp_session.interface.sw_if_index,
627 "BFD interface index")
629 if test.vpp_session.af == AF_INET6:
631 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
632 if test.vpp_session.af == AF_INET:
633 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
634 "Local IPv4 address")
635 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
638 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
639 "Local IPv6 address")
640 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
642 test.assert_equal(e.state, expected_state, BFDState)
645 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
646 """ wait for BFD packet and verify its correctness
648 :param timeout: how long to wait
649 :param pcap_time_min: ignore packets with pcap timestamp lower than this
651 :returns: tuple (packet, time spent waiting for packet)
653 test.logger.info("BFD: Waiting for BFD packet")
654 deadline = time.time() + timeout
659 test.assert_in_range(counter, 0, 100, "number of packets ignored")
660 time_left = deadline - time.time()
662 raise CaptureTimeoutError("Packet did not arrive within timeout")
663 p = test.pg0.wait_for_packet(timeout=time_left)
664 test.logger.debug(ppp("BFD: Got packet:", p))
665 if pcap_time_min is not None and p.time < pcap_time_min:
666 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
667 "pcap time min %s):" %
668 (p.time, pcap_time_min), p))
672 # strip an IP layer and move to the next
677 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
679 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
682 test.test_session.verify_bfd(p)
686 @unittest.skipUnless(running_extended_tests, "part of extended tests")
687 class BFD4TestCase(VppTestCase):
688 """Bidirectional Forwarding Detection (BFD)"""
691 vpp_clock_offset = None
697 super(BFD4TestCase, cls).setUpClass()
698 cls.vapi.cli("set log class bfd level debug")
700 cls.create_pg_interfaces([0])
701 cls.create_loopback_interfaces(1)
702 cls.loopback0 = cls.lo_interfaces[0]
703 cls.loopback0.config_ip4()
704 cls.loopback0.admin_up()
706 cls.pg0.configure_ipv4_neighbors()
708 cls.pg0.resolve_arp()
711 super(BFD4TestCase, cls).tearDownClass()
715 def tearDownClass(cls):
716 super(BFD4TestCase, cls).tearDownClass()
719 super(BFD4TestCase, self).setUp()
720 self.factory = AuthKeyFactory()
721 self.vapi.want_bfd_events()
722 self.pg0.enable_capture()
724 self.vpp_session = VppBFDUDPSession(self, self.pg0,
726 self.vpp_session.add_vpp_config()
727 self.vpp_session.admin_up()
728 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
730 self.vapi.want_bfd_events(enable_disable=0)
734 if not self.vpp_dead:
735 self.vapi.want_bfd_events(enable_disable=0)
736 self.vapi.collect_events() # clear the event queue
737 super(BFD4TestCase, self).tearDown()
739 def test_session_up(self):
740 """ bring BFD session up """
743 def test_session_up_by_ip(self):
744 """ bring BFD session up - first frame looked up by address pair """
745 self.logger.info("BFD: Sending Slow control frame")
746 self.test_session.update(my_discriminator=randint(0, 40000000))
747 self.test_session.send_packet()
748 self.pg0.enable_capture()
749 p = self.pg0.wait_for_packet(1)
750 self.assert_equal(p[BFD].your_discriminator,
751 self.test_session.my_discriminator,
752 "BFD - your discriminator")
753 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
754 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
756 self.logger.info("BFD: Waiting for event")
757 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758 verify_event(self, e, expected_state=BFDState.init)
759 self.logger.info("BFD: Sending Up")
760 self.test_session.send_packet()
761 self.logger.info("BFD: Waiting for event")
762 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
763 verify_event(self, e, expected_state=BFDState.up)
764 self.logger.info("BFD: Session is Up")
765 self.test_session.update(state=BFDState.up)
766 self.test_session.send_packet()
767 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
769 def test_session_down(self):
770 """ bring BFD session down """
772 bfd_session_down(self)
774 @unittest.skipUnless(running_extended_tests, "part of extended tests")
775 def test_hold_up(self):
776 """ hold BFD session up """
778 for dummy in range(self.test_session.detect_mult * 2):
779 wait_for_bfd_packet(self)
780 self.test_session.send_packet()
781 self.assert_equal(len(self.vapi.collect_events()), 0,
782 "number of bfd events")
784 @unittest.skipUnless(running_extended_tests, "part of extended tests")
785 def test_slow_timer(self):
786 """ verify slow periodic control frames while session down """
788 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
789 prev_packet = wait_for_bfd_packet(self, 2)
790 for dummy in range(packet_count):
791 next_packet = wait_for_bfd_packet(self, 2)
792 time_diff = next_packet.time - prev_packet.time
793 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
794 # to work around timing issues
795 self.assert_in_range(
796 time_diff, 0.70, 1.05, "time between slow packets")
797 prev_packet = next_packet
799 @unittest.skipUnless(running_extended_tests, "part of extended tests")
800 def test_zero_remote_min_rx(self):
801 """ no packets when zero remote required min rx interval """
803 self.test_session.update(required_min_rx=0)
804 self.test_session.send_packet()
805 for dummy in range(self.test_session.detect_mult):
806 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
807 "sleep before transmitting bfd packet")
808 self.test_session.send_packet()
810 p = wait_for_bfd_packet(self, timeout=0)
811 self.logger.error(ppp("Received unexpected packet:", p))
812 except CaptureTimeoutError:
815 len(self.vapi.collect_events()), 0, "number of bfd events")
816 self.test_session.update(required_min_rx=300000)
817 for dummy in range(3):
818 self.test_session.send_packet()
820 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
822 len(self.vapi.collect_events()), 0, "number of bfd events")
824 @unittest.skipUnless(running_extended_tests, "part of extended tests")
825 def test_conn_down(self):
826 """ verify session goes down after inactivity """
828 detection_time = self.test_session.detect_mult *\
829 self.vpp_session.required_min_rx / USEC_IN_SEC
830 self.sleep(detection_time, "waiting for BFD session time-out")
831 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
832 verify_event(self, e, expected_state=BFDState.down)
834 @unittest.skipUnless(running_extended_tests, "part of extended tests")
835 def test_large_required_min_rx(self):
836 """ large remote required min rx interval """
838 p = wait_for_bfd_packet(self)
840 self.test_session.update(required_min_rx=interval)
841 self.test_session.send_packet()
842 time_mark = time.time()
844 # busy wait here, trying to collect a packet or event, vpp is not
845 # allowed to send packets and the session will timeout first - so the
846 # Up->Down event must arrive before any packets do
847 while time.time() < time_mark + interval / USEC_IN_SEC:
849 p = wait_for_bfd_packet(self, timeout=0)
850 # if vpp managed to send a packet before we did the session
851 # session update, then that's fine, ignore it
852 if p.time < time_mark - self.vpp_clock_offset:
854 self.logger.error(ppp("Received unexpected packet:", p))
856 except CaptureTimeoutError:
858 events = self.vapi.collect_events()
860 verify_event(self, events[0], BFDState.down)
862 self.assert_equal(count, 0, "number of packets received")
864 @unittest.skipUnless(running_extended_tests, "part of extended tests")
865 def test_immediate_remote_min_rx_reduction(self):
866 """ immediately honor remote required min rx reduction """
867 self.vpp_session.remove_vpp_config()
868 self.vpp_session = VppBFDUDPSession(
869 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
870 self.pg0.enable_capture()
871 self.vpp_session.add_vpp_config()
872 self.test_session.update(desired_min_tx=1000000,
873 required_min_rx=1000000)
875 reference_packet = wait_for_bfd_packet(self)
876 time_mark = time.time()
878 self.test_session.update(required_min_rx=interval)
879 self.test_session.send_packet()
880 extra_time = time.time() - time_mark
881 p = wait_for_bfd_packet(self)
882 # first packet is allowed to be late by time we spent doing the update
883 # calculated in extra_time
884 self.assert_in_range(p.time - reference_packet.time,
885 .95 * 0.75 * interval / USEC_IN_SEC,
886 1.05 * interval / USEC_IN_SEC + extra_time,
887 "time between BFD packets")
889 for dummy in range(3):
890 p = wait_for_bfd_packet(self)
891 diff = p.time - reference_packet.time
892 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
893 1.05 * interval / USEC_IN_SEC,
894 "time between BFD packets")
897 @unittest.skipUnless(running_extended_tests, "part of extended tests")
898 def test_modify_req_min_rx_double(self):
899 """ modify session - double required min rx """
901 p = wait_for_bfd_packet(self)
902 self.test_session.update(desired_min_tx=10000,
903 required_min_rx=10000)
904 self.test_session.send_packet()
905 # double required min rx
906 self.vpp_session.modify_parameters(
907 required_min_rx=2 * self.vpp_session.required_min_rx)
908 p = wait_for_bfd_packet(
909 self, pcap_time_min=time.time() - self.vpp_clock_offset)
910 # poll bit needs to be set
911 self.assertIn("P", p.sprintf("%BFD.flags%"),
912 "Poll bit not set in BFD packet")
913 # finish poll sequence with final packet
914 final = self.test_session.create_packet()
915 final[BFD].flags = "F"
916 timeout = self.test_session.detect_mult * \
917 max(self.test_session.desired_min_tx,
918 self.vpp_session.required_min_rx) / USEC_IN_SEC
919 self.test_session.send_packet(final)
920 time_mark = time.time()
921 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
922 verify_event(self, e, expected_state=BFDState.down)
923 time_to_event = time.time() - time_mark
924 self.assert_in_range(time_to_event, .9 * timeout,
925 1.1 * timeout, "session timeout")
927 @unittest.skipUnless(running_extended_tests, "part of extended tests")
928 def test_modify_req_min_rx_halve(self):
929 """ modify session - halve required min rx """
930 self.vpp_session.modify_parameters(
931 required_min_rx=2 * self.vpp_session.required_min_rx)
933 p = wait_for_bfd_packet(self)
934 self.test_session.update(desired_min_tx=10000,
935 required_min_rx=10000)
936 self.test_session.send_packet()
937 p = wait_for_bfd_packet(
938 self, pcap_time_min=time.time() - self.vpp_clock_offset)
939 # halve required min rx
940 old_required_min_rx = self.vpp_session.required_min_rx
941 self.vpp_session.modify_parameters(
942 required_min_rx=self.vpp_session.required_min_rx // 2)
943 # now we wait 0.8*3*old-req-min-rx and the session should still be up
944 self.sleep(0.8 * self.vpp_session.detect_mult *
945 old_required_min_rx / USEC_IN_SEC,
946 "wait before finishing poll sequence")
947 self.assert_equal(len(self.vapi.collect_events()), 0,
948 "number of bfd events")
949 p = wait_for_bfd_packet(self)
950 # poll bit needs to be set
951 self.assertIn("P", p.sprintf("%BFD.flags%"),
952 "Poll bit not set in BFD packet")
953 # finish poll sequence with final packet
954 final = self.test_session.create_packet()
955 final[BFD].flags = "F"
956 self.test_session.send_packet(final)
957 # now the session should time out under new conditions
958 detection_time = self.test_session.detect_mult *\
959 self.vpp_session.required_min_rx / USEC_IN_SEC
961 e = self.vapi.wait_for_event(
962 2 * detection_time, "bfd_udp_session_details")
964 self.assert_in_range(after - before,
965 0.9 * detection_time,
966 1.1 * detection_time,
967 "time before bfd session goes down")
968 verify_event(self, e, expected_state=BFDState.down)
970 @unittest.skipUnless(running_extended_tests, "part of extended tests")
971 def test_modify_detect_mult(self):
972 """ modify detect multiplier """
974 p = wait_for_bfd_packet(self)
975 self.vpp_session.modify_parameters(detect_mult=1)
976 p = wait_for_bfd_packet(
977 self, pcap_time_min=time.time() - self.vpp_clock_offset)
978 self.assert_equal(self.vpp_session.detect_mult,
981 # poll bit must not be set
982 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
983 "Poll bit not set in BFD packet")
984 self.vpp_session.modify_parameters(detect_mult=10)
985 p = wait_for_bfd_packet(
986 self, pcap_time_min=time.time() - self.vpp_clock_offset)
987 self.assert_equal(self.vpp_session.detect_mult,
990 # poll bit must not be set
991 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
992 "Poll bit not set in BFD packet")
994 @unittest.skipUnless(running_extended_tests, "part of extended tests")
995 def test_queued_poll(self):
996 """ test poll sequence queueing """
998 p = wait_for_bfd_packet(self)
999 self.vpp_session.modify_parameters(
1000 required_min_rx=2 * self.vpp_session.required_min_rx)
1001 p = wait_for_bfd_packet(self)
1002 poll_sequence_start = time.time()
1003 poll_sequence_length_min = 0.5
1004 send_final_after = time.time() + poll_sequence_length_min
1005 # poll bit needs to be set
1006 self.assertIn("P", p.sprintf("%BFD.flags%"),
1007 "Poll bit not set in BFD packet")
1008 self.assert_equal(p[BFD].required_min_rx_interval,
1009 self.vpp_session.required_min_rx,
1010 "BFD required min rx interval")
1011 self.vpp_session.modify_parameters(
1012 required_min_rx=2 * self.vpp_session.required_min_rx)
1013 # 2nd poll sequence should be queued now
1014 # don't send the reply back yet, wait for some time to emulate
1015 # longer round-trip time
1017 while time.time() < send_final_after:
1018 self.test_session.send_packet()
1019 p = wait_for_bfd_packet(self)
1020 self.assert_equal(len(self.vapi.collect_events()), 0,
1021 "number of bfd events")
1022 self.assert_equal(p[BFD].required_min_rx_interval,
1023 self.vpp_session.required_min_rx,
1024 "BFD required min rx interval")
1026 # poll bit must be set
1027 self.assertIn("P", p.sprintf("%BFD.flags%"),
1028 "Poll bit not set in BFD packet")
1029 final = self.test_session.create_packet()
1030 final[BFD].flags = "F"
1031 self.test_session.send_packet(final)
1032 # finish 1st with final
1033 poll_sequence_length = time.time() - poll_sequence_start
1034 # vpp must wait for some time before starting new poll sequence
1035 poll_no_2_started = False
1036 for dummy in range(2 * packet_count):
1037 p = wait_for_bfd_packet(self)
1038 self.assert_equal(len(self.vapi.collect_events()), 0,
1039 "number of bfd events")
1040 if "P" in p.sprintf("%BFD.flags%"):
1041 poll_no_2_started = True
1042 if time.time() < poll_sequence_start + poll_sequence_length:
1043 raise Exception("VPP started 2nd poll sequence too soon")
1044 final = self.test_session.create_packet()
1045 final[BFD].flags = "F"
1046 self.test_session.send_packet(final)
1049 self.test_session.send_packet()
1050 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1051 # finish 2nd with final
1052 final = self.test_session.create_packet()
1053 final[BFD].flags = "F"
1054 self.test_session.send_packet(final)
1055 p = wait_for_bfd_packet(self)
1056 # poll bit must not be set
1057 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1058 "Poll bit set in BFD packet")
1060 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1061 def test_poll_response(self):
1062 """ test correct response to control frame with poll bit set """
1063 bfd_session_up(self)
1064 poll = self.test_session.create_packet()
1065 poll[BFD].flags = "P"
1066 self.test_session.send_packet(poll)
1067 final = wait_for_bfd_packet(
1068 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1069 self.assertIn("F", final.sprintf("%BFD.flags%"))
1071 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1072 def test_no_periodic_if_remote_demand(self):
1073 """ no periodic frames outside poll sequence if remote demand set """
1074 bfd_session_up(self)
1075 demand = self.test_session.create_packet()
1076 demand[BFD].flags = "D"
1077 self.test_session.send_packet(demand)
1078 transmit_time = 0.9 \
1079 * max(self.vpp_session.required_min_rx,
1080 self.test_session.desired_min_tx) \
1083 for dummy in range(self.test_session.detect_mult * 2):
1084 self.sleep(transmit_time)
1085 self.test_session.send_packet(demand)
1087 p = wait_for_bfd_packet(self, timeout=0)
1088 self.logger.error(ppp("Received unexpected packet:", p))
1090 except CaptureTimeoutError:
1092 events = self.vapi.collect_events()
1094 self.logger.error("Received unexpected event: %s", e)
1095 self.assert_equal(count, 0, "number of packets received")
1096 self.assert_equal(len(events), 0, "number of events received")
1098 def test_echo_looped_back(self):
1099 """ echo packets looped back """
1100 # don't need a session in this case..
1101 self.vpp_session.remove_vpp_config()
1102 self.pg0.enable_capture()
1103 echo_packet_count = 10
1104 # random source port low enough to increment a few times..
1105 udp_sport_tx = randint(1, 50000)
1106 udp_sport_rx = udp_sport_tx
1107 echo_packet = (Ether(src=self.pg0.remote_mac,
1108 dst=self.pg0.local_mac) /
1109 IP(src=self.pg0.remote_ip4,
1110 dst=self.pg0.remote_ip4) /
1111 UDP(dport=BFD.udp_dport_echo) /
1112 Raw("this should be looped back"))
1113 for dummy in range(echo_packet_count):
1114 self.sleep(.01, "delay between echo packets")
1115 echo_packet[UDP].sport = udp_sport_tx
1117 self.logger.debug(ppp("Sending packet:", echo_packet))
1118 self.pg0.add_stream(echo_packet)
1120 for dummy in range(echo_packet_count):
1121 p = self.pg0.wait_for_packet(1)
1122 self.logger.debug(ppp("Got packet:", p))
1124 self.assert_equal(self.pg0.remote_mac,
1125 ether.dst, "Destination MAC")
1126 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1128 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1129 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1131 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1132 "UDP destination port")
1133 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1135 # need to compare the hex payload here, otherwise BFD_vpp_echo
1137 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1138 scapy.compat.raw(echo_packet[UDP].payload),
1139 "Received packet is not the echo packet sent")
1140 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1141 "ECHO packet identifier for test purposes)")
1143 def test_echo(self):
1144 """ echo function """
1145 bfd_session_up(self)
1146 self.test_session.update(required_min_echo_rx=150000)
1147 self.test_session.send_packet()
1148 detection_time = self.test_session.detect_mult *\
1149 self.vpp_session.required_min_rx / USEC_IN_SEC
1150 # echo shouldn't work without echo source set
1151 for dummy in range(10):
1152 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1153 self.sleep(sleep, "delay before sending bfd packet")
1154 self.test_session.send_packet()
1155 p = wait_for_bfd_packet(
1156 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1157 self.assert_equal(p[BFD].required_min_rx_interval,
1158 self.vpp_session.required_min_rx,
1159 "BFD required min rx interval")
1160 self.test_session.send_packet()
1161 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1163 # should be turned on - loopback echo packets
1164 for dummy in range(3):
1165 loop_until = time.time() + 0.75 * detection_time
1166 while time.time() < loop_until:
1167 p = self.pg0.wait_for_packet(1)
1168 self.logger.debug(ppp("Got packet:", p))
1169 if p[UDP].dport == BFD.udp_dport_echo:
1171 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1172 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1173 "BFD ECHO src IP equal to loopback IP")
1174 self.logger.debug(ppp("Looping back packet:", p))
1175 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1176 "ECHO packet destination MAC address")
1177 p[Ether].dst = self.pg0.local_mac
1178 self.pg0.add_stream(p)
1181 elif p.haslayer(BFD):
1183 self.assertGreaterEqual(
1184 p[BFD].required_min_rx_interval,
1186 if "P" in p.sprintf("%BFD.flags%"):
1187 final = self.test_session.create_packet()
1188 final[BFD].flags = "F"
1189 self.test_session.send_packet(final)
1191 raise Exception(ppp("Received unknown packet:", p))
1193 self.assert_equal(len(self.vapi.collect_events()), 0,
1194 "number of bfd events")
1195 self.test_session.send_packet()
1196 self.assertTrue(echo_seen, "No echo packets received")
1198 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1199 def test_echo_fail(self):
1200 """ session goes down if echo function fails """
1201 bfd_session_up(self)
1202 self.test_session.update(required_min_echo_rx=150000)
1203 self.test_session.send_packet()
1204 detection_time = self.test_session.detect_mult *\
1205 self.vpp_session.required_min_rx / USEC_IN_SEC
1206 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1207 # echo function should be used now, but we will drop the echo packets
1208 verified_diag = False
1209 for dummy in range(3):
1210 loop_until = time.time() + 0.75 * detection_time
1211 while time.time() < loop_until:
1212 p = self.pg0.wait_for_packet(1)
1213 self.logger.debug(ppp("Got packet:", p))
1214 if p[UDP].dport == BFD.udp_dport_echo:
1217 elif p.haslayer(BFD):
1218 if "P" in p.sprintf("%BFD.flags%"):
1219 self.assertGreaterEqual(
1220 p[BFD].required_min_rx_interval,
1222 final = self.test_session.create_packet()
1223 final[BFD].flags = "F"
1224 self.test_session.send_packet(final)
1225 if p[BFD].state == BFDState.down:
1226 self.assert_equal(p[BFD].diag,
1227 BFDDiagCode.echo_function_failed,
1229 verified_diag = True
1231 raise Exception(ppp("Received unknown packet:", p))
1232 self.test_session.send_packet()
1233 events = self.vapi.collect_events()
1234 self.assert_equal(len(events), 1, "number of bfd events")
1235 self.assert_equal(events[0].state, BFDState.down, BFDState)
1236 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1238 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1239 def test_echo_stop(self):
1240 """ echo function stops if peer sets required min echo rx zero """
1241 bfd_session_up(self)
1242 self.test_session.update(required_min_echo_rx=150000)
1243 self.test_session.send_packet()
1244 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1245 # wait for first echo packet
1247 p = self.pg0.wait_for_packet(1)
1248 self.logger.debug(ppp("Got packet:", p))
1249 if p[UDP].dport == BFD.udp_dport_echo:
1250 self.logger.debug(ppp("Looping back packet:", p))
1251 p[Ether].dst = self.pg0.local_mac
1252 self.pg0.add_stream(p)
1255 elif p.haslayer(BFD):
1259 raise Exception(ppp("Received unknown packet:", p))
1260 self.test_session.update(required_min_echo_rx=0)
1261 self.test_session.send_packet()
1262 # echo packets shouldn't arrive anymore
1263 for dummy in range(5):
1264 wait_for_bfd_packet(
1265 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1266 self.test_session.send_packet()
1267 events = self.vapi.collect_events()
1268 self.assert_equal(len(events), 0, "number of bfd events")
1270 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1271 def test_echo_source_removed(self):
1272 """ echo function stops if echo source is removed """
1273 bfd_session_up(self)
1274 self.test_session.update(required_min_echo_rx=150000)
1275 self.test_session.send_packet()
1276 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1277 # wait for first echo packet
1279 p = self.pg0.wait_for_packet(1)
1280 self.logger.debug(ppp("Got packet:", p))
1281 if p[UDP].dport == BFD.udp_dport_echo:
1282 self.logger.debug(ppp("Looping back packet:", p))
1283 p[Ether].dst = self.pg0.local_mac
1284 self.pg0.add_stream(p)
1287 elif p.haslayer(BFD):
1291 raise Exception(ppp("Received unknown packet:", p))
1292 self.vapi.bfd_udp_del_echo_source()
1293 self.test_session.send_packet()
1294 # echo packets shouldn't arrive anymore
1295 for dummy in range(5):
1296 wait_for_bfd_packet(
1297 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1298 self.test_session.send_packet()
1299 events = self.vapi.collect_events()
1300 self.assert_equal(len(events), 0, "number of bfd events")
1302 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1303 def test_stale_echo(self):
1304 """ stale echo packets don't keep a session up """
1305 bfd_session_up(self)
1306 self.test_session.update(required_min_echo_rx=150000)
1307 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1308 self.test_session.send_packet()
1309 # should be turned on - loopback echo packets
1313 for dummy in range(10 * self.vpp_session.detect_mult):
1314 p = self.pg0.wait_for_packet(1)
1315 if p[UDP].dport == BFD.udp_dport_echo:
1316 if echo_packet is None:
1317 self.logger.debug(ppp("Got first echo packet:", p))
1319 timeout_at = time.time() + self.vpp_session.detect_mult * \
1320 self.test_session.required_min_echo_rx / USEC_IN_SEC
1322 self.logger.debug(ppp("Got followup echo packet:", p))
1323 self.logger.debug(ppp("Looping back first echo packet:", p))
1324 echo_packet[Ether].dst = self.pg0.local_mac
1325 self.pg0.add_stream(echo_packet)
1327 elif p.haslayer(BFD):
1328 self.logger.debug(ppp("Got packet:", p))
1329 if "P" in p.sprintf("%BFD.flags%"):
1330 final = self.test_session.create_packet()
1331 final[BFD].flags = "F"
1332 self.test_session.send_packet(final)
1333 if p[BFD].state == BFDState.down:
1334 self.assertIsNotNone(
1336 "Session went down before first echo packet received")
1338 self.assertGreaterEqual(
1340 "Session timeout at %s, but is expected at %s" %
1342 self.assert_equal(p[BFD].diag,
1343 BFDDiagCode.echo_function_failed,
1345 events = self.vapi.collect_events()
1346 self.assert_equal(len(events), 1, "number of bfd events")
1347 self.assert_equal(events[0].state, BFDState.down, BFDState)
1351 raise Exception(ppp("Received unknown packet:", p))
1352 self.test_session.send_packet()
1353 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1355 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1356 def test_invalid_echo_checksum(self):
1357 """ echo packets with invalid checksum don't keep a session up """
1358 bfd_session_up(self)
1359 self.test_session.update(required_min_echo_rx=150000)
1360 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1361 self.test_session.send_packet()
1362 # should be turned on - loopback echo packets
1365 for dummy in range(10 * self.vpp_session.detect_mult):
1366 p = self.pg0.wait_for_packet(1)
1367 if p[UDP].dport == BFD.udp_dport_echo:
1368 self.logger.debug(ppp("Got echo packet:", p))
1369 if timeout_at is None:
1370 timeout_at = time.time() + self.vpp_session.detect_mult * \
1371 self.test_session.required_min_echo_rx / USEC_IN_SEC
1372 p[BFD_vpp_echo].checksum = getrandbits(64)
1373 p[Ether].dst = self.pg0.local_mac
1374 self.logger.debug(ppp("Looping back modified echo packet:", p))
1375 self.pg0.add_stream(p)
1377 elif p.haslayer(BFD):
1378 self.logger.debug(ppp("Got packet:", p))
1379 if "P" in p.sprintf("%BFD.flags%"):
1380 final = self.test_session.create_packet()
1381 final[BFD].flags = "F"
1382 self.test_session.send_packet(final)
1383 if p[BFD].state == BFDState.down:
1384 self.assertIsNotNone(
1386 "Session went down before first echo packet received")
1388 self.assertGreaterEqual(
1390 "Session timeout at %s, but is expected at %s" %
1392 self.assert_equal(p[BFD].diag,
1393 BFDDiagCode.echo_function_failed,
1395 events = self.vapi.collect_events()
1396 self.assert_equal(len(events), 1, "number of bfd events")
1397 self.assert_equal(events[0].state, BFDState.down, BFDState)
1401 raise Exception(ppp("Received unknown packet:", p))
1402 self.test_session.send_packet()
1403 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1405 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1406 def test_admin_up_down(self):
1407 """ put session admin-up and admin-down """
1408 bfd_session_up(self)
1409 self.vpp_session.admin_down()
1410 self.pg0.enable_capture()
1411 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1412 verify_event(self, e, expected_state=BFDState.admin_down)
1413 for dummy in range(2):
1414 p = wait_for_bfd_packet(self)
1415 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1416 # try to bring session up - shouldn't be possible
1417 self.test_session.update(state=BFDState.init)
1418 self.test_session.send_packet()
1419 for dummy in range(2):
1420 p = wait_for_bfd_packet(self)
1421 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1422 self.vpp_session.admin_up()
1423 self.test_session.update(state=BFDState.down)
1424 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1425 verify_event(self, e, expected_state=BFDState.down)
1426 p = wait_for_bfd_packet(
1427 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1428 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1429 self.test_session.send_packet()
1430 p = wait_for_bfd_packet(
1431 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1432 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1433 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1434 verify_event(self, e, expected_state=BFDState.init)
1435 self.test_session.update(state=BFDState.up)
1436 self.test_session.send_packet()
1437 p = wait_for_bfd_packet(
1438 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1439 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1440 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1441 verify_event(self, e, expected_state=BFDState.up)
1443 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1444 def test_config_change_remote_demand(self):
1445 """ configuration change while peer in demand mode """
1446 bfd_session_up(self)
1447 demand = self.test_session.create_packet()
1448 demand[BFD].flags = "D"
1449 self.test_session.send_packet(demand)
1450 self.vpp_session.modify_parameters(
1451 required_min_rx=2 * self.vpp_session.required_min_rx)
1452 p = wait_for_bfd_packet(
1453 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1454 # poll bit must be set
1455 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1456 # terminate poll sequence
1457 final = self.test_session.create_packet()
1458 final[BFD].flags = "D+F"
1459 self.test_session.send_packet(final)
1460 # vpp should be quiet now again
1461 transmit_time = 0.9 \
1462 * max(self.vpp_session.required_min_rx,
1463 self.test_session.desired_min_tx) \
1466 for dummy in range(self.test_session.detect_mult * 2):
1467 self.sleep(transmit_time)
1468 self.test_session.send_packet(demand)
1470 p = wait_for_bfd_packet(self, timeout=0)
1471 self.logger.error(ppp("Received unexpected packet:", p))
1473 except CaptureTimeoutError:
1475 events = self.vapi.collect_events()
1477 self.logger.error("Received unexpected event: %s", e)
1478 self.assert_equal(count, 0, "number of packets received")
1479 self.assert_equal(len(events), 0, "number of events received")
1481 def test_intf_deleted(self):
1482 """ interface with bfd session deleted """
1483 intf = VppLoInterface(self)
1486 sw_if_index = intf.sw_if_index
1487 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1488 vpp_session.add_vpp_config()
1489 vpp_session.admin_up()
1490 intf.remove_vpp_config()
1491 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1492 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1493 self.assertFalse(vpp_session.query_vpp_config())
1496 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1497 class BFD6TestCase(VppTestCase):
1498 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1501 vpp_clock_offset = None
1506 def setUpClass(cls):
1507 super(BFD6TestCase, cls).setUpClass()
1508 cls.vapi.cli("set log class bfd level debug")
1510 cls.create_pg_interfaces([0])
1511 cls.pg0.config_ip6()
1512 cls.pg0.configure_ipv6_neighbors()
1514 cls.pg0.resolve_ndp()
1515 cls.create_loopback_interfaces(1)
1516 cls.loopback0 = cls.lo_interfaces[0]
1517 cls.loopback0.config_ip6()
1518 cls.loopback0.admin_up()
1521 super(BFD6TestCase, cls).tearDownClass()
1525 def tearDownClass(cls):
1526 super(BFD6TestCase, cls).tearDownClass()
1529 super(BFD6TestCase, self).setUp()
1530 self.factory = AuthKeyFactory()
1531 self.vapi.want_bfd_events()
1532 self.pg0.enable_capture()
1534 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1535 self.pg0.remote_ip6,
1537 self.vpp_session.add_vpp_config()
1538 self.vpp_session.admin_up()
1539 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1540 self.logger.debug(self.vapi.cli("show adj nbr"))
1542 self.vapi.want_bfd_events(enable_disable=0)
1546 if not self.vpp_dead:
1547 self.vapi.want_bfd_events(enable_disable=0)
1548 self.vapi.collect_events() # clear the event queue
1549 super(BFD6TestCase, self).tearDown()
1551 def test_session_up(self):
1552 """ bring BFD session up """
1553 bfd_session_up(self)
1555 def test_session_up_by_ip(self):
1556 """ bring BFD session up - first frame looked up by address pair """
1557 self.logger.info("BFD: Sending Slow control frame")
1558 self.test_session.update(my_discriminator=randint(0, 40000000))
1559 self.test_session.send_packet()
1560 self.pg0.enable_capture()
1561 p = self.pg0.wait_for_packet(1)
1562 self.assert_equal(p[BFD].your_discriminator,
1563 self.test_session.my_discriminator,
1564 "BFD - your discriminator")
1565 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1566 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1568 self.logger.info("BFD: Waiting for event")
1569 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1570 verify_event(self, e, expected_state=BFDState.init)
1571 self.logger.info("BFD: Sending Up")
1572 self.test_session.send_packet()
1573 self.logger.info("BFD: Waiting for event")
1574 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1575 verify_event(self, e, expected_state=BFDState.up)
1576 self.logger.info("BFD: Session is Up")
1577 self.test_session.update(state=BFDState.up)
1578 self.test_session.send_packet()
1579 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1581 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1582 def test_hold_up(self):
1583 """ hold BFD session up """
1584 bfd_session_up(self)
1585 for dummy in range(self.test_session.detect_mult * 2):
1586 wait_for_bfd_packet(self)
1587 self.test_session.send_packet()
1588 self.assert_equal(len(self.vapi.collect_events()), 0,
1589 "number of bfd events")
1590 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1592 def test_echo_looped_back(self):
1593 """ echo packets looped back """
1594 # don't need a session in this case..
1595 self.vpp_session.remove_vpp_config()
1596 self.pg0.enable_capture()
1597 echo_packet_count = 10
1598 # random source port low enough to increment a few times..
1599 udp_sport_tx = randint(1, 50000)
1600 udp_sport_rx = udp_sport_tx
1601 echo_packet = (Ether(src=self.pg0.remote_mac,
1602 dst=self.pg0.local_mac) /
1603 IPv6(src=self.pg0.remote_ip6,
1604 dst=self.pg0.remote_ip6) /
1605 UDP(dport=BFD.udp_dport_echo) /
1606 Raw("this should be looped back"))
1607 for dummy in range(echo_packet_count):
1608 self.sleep(.01, "delay between echo packets")
1609 echo_packet[UDP].sport = udp_sport_tx
1611 self.logger.debug(ppp("Sending packet:", echo_packet))
1612 self.pg0.add_stream(echo_packet)
1614 for dummy in range(echo_packet_count):
1615 p = self.pg0.wait_for_packet(1)
1616 self.logger.debug(ppp("Got packet:", p))
1618 self.assert_equal(self.pg0.remote_mac,
1619 ether.dst, "Destination MAC")
1620 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1622 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1623 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1625 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1626 "UDP destination port")
1627 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1629 # need to compare the hex payload here, otherwise BFD_vpp_echo
1631 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1632 scapy.compat.raw(echo_packet[UDP].payload),
1633 "Received packet is not the echo packet sent")
1634 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1635 "ECHO packet identifier for test purposes)")
1636 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1637 "ECHO packet identifier for test purposes)")
1639 def test_echo(self):
1640 """ echo function """
1641 bfd_session_up(self)
1642 self.test_session.update(required_min_echo_rx=150000)
1643 self.test_session.send_packet()
1644 detection_time = self.test_session.detect_mult *\
1645 self.vpp_session.required_min_rx / USEC_IN_SEC
1646 # echo shouldn't work without echo source set
1647 for dummy in range(10):
1648 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1649 self.sleep(sleep, "delay before sending bfd packet")
1650 self.test_session.send_packet()
1651 p = wait_for_bfd_packet(
1652 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1653 self.assert_equal(p[BFD].required_min_rx_interval,
1654 self.vpp_session.required_min_rx,
1655 "BFD required min rx interval")
1656 self.test_session.send_packet()
1657 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1659 # should be turned on - loopback echo packets
1660 for dummy in range(3):
1661 loop_until = time.time() + 0.75 * detection_time
1662 while time.time() < loop_until:
1663 p = self.pg0.wait_for_packet(1)
1664 self.logger.debug(ppp("Got packet:", p))
1665 if p[UDP].dport == BFD.udp_dport_echo:
1667 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1668 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1669 "BFD ECHO src IP equal to loopback IP")
1670 self.logger.debug(ppp("Looping back packet:", p))
1671 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1672 "ECHO packet destination MAC address")
1673 p[Ether].dst = self.pg0.local_mac
1674 self.pg0.add_stream(p)
1677 elif p.haslayer(BFD):
1679 self.assertGreaterEqual(
1680 p[BFD].required_min_rx_interval,
1682 if "P" in p.sprintf("%BFD.flags%"):
1683 final = self.test_session.create_packet()
1684 final[BFD].flags = "F"
1685 self.test_session.send_packet(final)
1687 raise Exception(ppp("Received unknown packet:", p))
1689 self.assert_equal(len(self.vapi.collect_events()), 0,
1690 "number of bfd events")
1691 self.test_session.send_packet()
1692 self.assertTrue(echo_seen, "No echo packets received")
1694 def test_intf_deleted(self):
1695 """ interface with bfd session deleted """
1696 intf = VppLoInterface(self)
1699 sw_if_index = intf.sw_if_index
1700 vpp_session = VppBFDUDPSession(
1701 self, intf, intf.remote_ip6, af=AF_INET6)
1702 vpp_session.add_vpp_config()
1703 vpp_session.admin_up()
1704 intf.remove_vpp_config()
1705 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1706 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1707 self.assertFalse(vpp_session.query_vpp_config())
1710 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1711 class BFDFIBTestCase(VppTestCase):
1712 """ BFD-FIB interactions (IPv6) """
1718 def setUpClass(cls):
1719 super(BFDFIBTestCase, cls).setUpClass()
1722 def tearDownClass(cls):
1723 super(BFDFIBTestCase, cls).tearDownClass()
1726 super(BFDFIBTestCase, self).setUp()
1727 self.create_pg_interfaces(range(1))
1729 self.vapi.want_bfd_events()
1730 self.pg0.enable_capture()
1732 for i in self.pg_interfaces:
1735 i.configure_ipv6_neighbors()
1738 if not self.vpp_dead:
1739 self.vapi.want_bfd_events(enable_disable=0)
1741 super(BFDFIBTestCase, self).tearDown()
1744 def pkt_is_not_data_traffic(p):
1745 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1746 if p.haslayer(BFD) or is_ipv6_misc(p):
1750 def test_session_with_fib(self):
1751 """ BFD-FIB interactions """
1753 # packets to match against both of the routes
1754 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1755 IPv6(src="3001::1", dst="2001::1") /
1756 UDP(sport=1234, dport=1234) /
1758 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1759 IPv6(src="3001::1", dst="2002::1") /
1760 UDP(sport=1234, dport=1234) /
1763 # A recursive and a non-recursive route via a next-hop that
1764 # will have a BFD session
1765 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1766 [VppRoutePath(self.pg0.remote_ip6,
1767 self.pg0.sw_if_index,
1768 proto=DpoProto.DPO_PROTO_IP6)],
1770 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1771 [VppRoutePath(self.pg0.remote_ip6,
1773 proto=DpoProto.DPO_PROTO_IP6)],
1775 ip_2001_s_64.add_vpp_config()
1776 ip_2002_s_64.add_vpp_config()
1778 # bring the session up now the routes are present
1779 self.vpp_session = VppBFDUDPSession(self,
1781 self.pg0.remote_ip6,
1783 self.vpp_session.add_vpp_config()
1784 self.vpp_session.admin_up()
1785 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1787 # session is up - traffic passes
1788 bfd_session_up(self)
1790 self.pg0.add_stream(p)
1793 captured = self.pg0.wait_for_packet(
1795 filter_out_fn=self.pkt_is_not_data_traffic)
1796 self.assertEqual(captured[IPv6].dst,
1799 # session is up - traffic is dropped
1800 bfd_session_down(self)
1802 self.pg0.add_stream(p)
1804 with self.assertRaises(CaptureTimeoutError):
1805 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1807 # session is up - traffic passes
1808 bfd_session_up(self)
1810 self.pg0.add_stream(p)
1813 captured = self.pg0.wait_for_packet(
1815 filter_out_fn=self.pkt_is_not_data_traffic)
1816 self.assertEqual(captured[IPv6].dst,
1820 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1821 class BFDTunTestCase(VppTestCase):
1822 """ BFD over GRE tunnel """
1828 def setUpClass(cls):
1829 super(BFDTunTestCase, cls).setUpClass()
1832 def tearDownClass(cls):
1833 super(BFDTunTestCase, cls).tearDownClass()
1836 super(BFDTunTestCase, self).setUp()
1837 self.create_pg_interfaces(range(1))
1839 self.vapi.want_bfd_events()
1840 self.pg0.enable_capture()
1842 for i in self.pg_interfaces:
1848 if not self.vpp_dead:
1849 self.vapi.want_bfd_events(enable_disable=0)
1851 super(BFDTunTestCase, self).tearDown()
1854 def pkt_is_not_data_traffic(p):
1855 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1856 if p.haslayer(BFD) or is_ipv6_misc(p):
1860 def test_bfd_o_gre(self):
1863 # A GRE interface over which to run a BFD session
1864 gre_if = VppGreInterface(self,
1866 self.pg0.remote_ip4)
1867 gre_if.add_vpp_config()
1871 # bring the session up now the routes are present
1872 self.vpp_session = VppBFDUDPSession(self,
1876 self.vpp_session.add_vpp_config()
1877 self.vpp_session.admin_up()
1879 self.test_session = BFDTestSession(
1880 self, gre_if, AF_INET,
1881 tunnel_header=(IP(src=self.pg0.remote_ip4,
1882 dst=self.pg0.local_ip4) /
1884 phy_interface=self.pg0)
1886 # packets to match against both of the routes
1887 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1888 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1889 UDP(sport=1234, dport=1234) /
1892 # session is up - traffic passes
1893 bfd_session_up(self)
1895 self.send_and_expect(self.pg0, p, self.pg0)
1897 # bring session down
1898 bfd_session_down(self)
1901 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1902 class BFDSHA1TestCase(VppTestCase):
1903 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1906 vpp_clock_offset = None
1911 def setUpClass(cls):
1912 super(BFDSHA1TestCase, cls).setUpClass()
1913 cls.vapi.cli("set log class bfd level debug")
1915 cls.create_pg_interfaces([0])
1916 cls.pg0.config_ip4()
1918 cls.pg0.resolve_arp()
1921 super(BFDSHA1TestCase, cls).tearDownClass()
1925 def tearDownClass(cls):
1926 super(BFDSHA1TestCase, cls).tearDownClass()
1929 super(BFDSHA1TestCase, self).setUp()
1930 self.factory = AuthKeyFactory()
1931 self.vapi.want_bfd_events()
1932 self.pg0.enable_capture()
1935 if not self.vpp_dead:
1936 self.vapi.want_bfd_events(enable_disable=0)
1937 self.vapi.collect_events() # clear the event queue
1938 super(BFDSHA1TestCase, self).tearDown()
1940 def test_session_up(self):
1941 """ bring BFD session up """
1942 key = self.factory.create_random_key(self)
1943 key.add_vpp_config()
1944 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1945 self.pg0.remote_ip4,
1947 self.vpp_session.add_vpp_config()
1948 self.vpp_session.admin_up()
1949 self.test_session = BFDTestSession(
1950 self, self.pg0, AF_INET, sha1_key=key,
1951 bfd_key_id=self.vpp_session.bfd_key_id)
1952 bfd_session_up(self)
1954 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1955 def test_hold_up(self):
1956 """ hold BFD session up """
1957 key = self.factory.create_random_key(self)
1958 key.add_vpp_config()
1959 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1960 self.pg0.remote_ip4,
1962 self.vpp_session.add_vpp_config()
1963 self.vpp_session.admin_up()
1964 self.test_session = BFDTestSession(
1965 self, self.pg0, AF_INET, sha1_key=key,
1966 bfd_key_id=self.vpp_session.bfd_key_id)
1967 bfd_session_up(self)
1968 for dummy in range(self.test_session.detect_mult * 2):
1969 wait_for_bfd_packet(self)
1970 self.test_session.send_packet()
1971 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1973 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1974 def test_hold_up_meticulous(self):
1975 """ hold BFD session up - meticulous auth """
1976 key = self.factory.create_random_key(
1977 self, BFDAuthType.meticulous_keyed_sha1)
1978 key.add_vpp_config()
1979 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1980 self.pg0.remote_ip4, sha1_key=key)
1981 self.vpp_session.add_vpp_config()
1982 self.vpp_session.admin_up()
1983 # specify sequence number so that it wraps
1984 self.test_session = BFDTestSession(
1985 self, self.pg0, AF_INET, sha1_key=key,
1986 bfd_key_id=self.vpp_session.bfd_key_id,
1987 our_seq_number=0xFFFFFFFF - 4)
1988 bfd_session_up(self)
1989 for dummy in range(30):
1990 wait_for_bfd_packet(self)
1991 self.test_session.inc_seq_num()
1992 self.test_session.send_packet()
1993 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1995 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1996 def test_send_bad_seq_number(self):
1997 """ session is not kept alive by msgs with bad sequence numbers"""
1998 key = self.factory.create_random_key(
1999 self, BFDAuthType.meticulous_keyed_sha1)
2000 key.add_vpp_config()
2001 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2002 self.pg0.remote_ip4, sha1_key=key)
2003 self.vpp_session.add_vpp_config()
2004 self.test_session = BFDTestSession(
2005 self, self.pg0, AF_INET, sha1_key=key,
2006 bfd_key_id=self.vpp_session.bfd_key_id)
2007 bfd_session_up(self)
2008 detection_time = self.test_session.detect_mult *\
2009 self.vpp_session.required_min_rx / USEC_IN_SEC
2010 send_until = time.time() + 2 * detection_time
2011 while time.time() < send_until:
2012 self.test_session.send_packet()
2013 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2014 "time between bfd packets")
2015 e = self.vapi.collect_events()
2016 # session should be down now, because the sequence numbers weren't
2018 self.assert_equal(len(e), 1, "number of bfd events")
2019 verify_event(self, e[0], expected_state=BFDState.down)
2021 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2022 legitimate_test_session,
2024 rogue_bfd_values=None):
2025 """ execute a rogue session interaction scenario
2027 1. create vpp session, add config
2028 2. bring the legitimate session up
2029 3. copy the bfd values from legitimate session to rogue session
2030 4. apply rogue_bfd_values to rogue session
2031 5. set rogue session state to down
2032 6. send message to take the session down from the rogue session
2033 7. assert that the legitimate session is unaffected
2036 self.vpp_session = vpp_bfd_udp_session
2037 self.vpp_session.add_vpp_config()
2038 self.test_session = legitimate_test_session
2039 # bring vpp session up
2040 bfd_session_up(self)
2041 # send packet from rogue session
2042 rogue_test_session.update(
2043 my_discriminator=self.test_session.my_discriminator,
2044 your_discriminator=self.test_session.your_discriminator,
2045 desired_min_tx=self.test_session.desired_min_tx,
2046 required_min_rx=self.test_session.required_min_rx,
2047 detect_mult=self.test_session.detect_mult,
2048 diag=self.test_session.diag,
2049 state=self.test_session.state,
2050 auth_type=self.test_session.auth_type)
2051 if rogue_bfd_values:
2052 rogue_test_session.update(**rogue_bfd_values)
2053 rogue_test_session.update(state=BFDState.down)
2054 rogue_test_session.send_packet()
2055 wait_for_bfd_packet(self)
2056 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2058 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2059 def test_mismatch_auth(self):
2060 """ session is not brought down by unauthenticated msg """
2061 key = self.factory.create_random_key(self)
2062 key.add_vpp_config()
2063 vpp_session = VppBFDUDPSession(
2064 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2065 legitimate_test_session = BFDTestSession(
2066 self, self.pg0, AF_INET, sha1_key=key,
2067 bfd_key_id=vpp_session.bfd_key_id)
2068 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2069 self.execute_rogue_session_scenario(vpp_session,
2070 legitimate_test_session,
2073 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2074 def test_mismatch_bfd_key_id(self):
2075 """ session is not brought down by msg with non-existent key-id """
2076 key = self.factory.create_random_key(self)
2077 key.add_vpp_config()
2078 vpp_session = VppBFDUDPSession(
2079 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2080 # pick a different random bfd key id
2082 while x == vpp_session.bfd_key_id:
2084 legitimate_test_session = BFDTestSession(
2085 self, self.pg0, AF_INET, sha1_key=key,
2086 bfd_key_id=vpp_session.bfd_key_id)
2087 rogue_test_session = BFDTestSession(
2088 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2089 self.execute_rogue_session_scenario(vpp_session,
2090 legitimate_test_session,
2093 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2094 def test_mismatched_auth_type(self):
2095 """ session is not brought down by msg with wrong auth type """
2096 key = self.factory.create_random_key(self)
2097 key.add_vpp_config()
2098 vpp_session = VppBFDUDPSession(
2099 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2100 legitimate_test_session = BFDTestSession(
2101 self, self.pg0, AF_INET, sha1_key=key,
2102 bfd_key_id=vpp_session.bfd_key_id)
2103 rogue_test_session = BFDTestSession(
2104 self, self.pg0, AF_INET, sha1_key=key,
2105 bfd_key_id=vpp_session.bfd_key_id)
2106 self.execute_rogue_session_scenario(
2107 vpp_session, legitimate_test_session, rogue_test_session,
2108 {'auth_type': BFDAuthType.keyed_md5})
2110 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2111 def test_restart(self):
2112 """ simulate remote peer restart and resynchronization """
2113 key = self.factory.create_random_key(
2114 self, BFDAuthType.meticulous_keyed_sha1)
2115 key.add_vpp_config()
2116 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2117 self.pg0.remote_ip4, sha1_key=key)
2118 self.vpp_session.add_vpp_config()
2119 self.test_session = BFDTestSession(
2120 self, self.pg0, AF_INET, sha1_key=key,
2121 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2122 bfd_session_up(self)
2123 # don't send any packets for 2*detection_time
2124 detection_time = self.test_session.detect_mult *\
2125 self.vpp_session.required_min_rx / USEC_IN_SEC
2126 self.sleep(2 * detection_time, "simulating peer restart")
2127 events = self.vapi.collect_events()
2128 self.assert_equal(len(events), 1, "number of bfd events")
2129 verify_event(self, events[0], expected_state=BFDState.down)
2130 self.test_session.update(state=BFDState.down)
2131 # reset sequence number
2132 self.test_session.our_seq_number = 0
2133 self.test_session.vpp_seq_number = None
2134 # now throw away any pending packets
2135 self.pg0.enable_capture()
2136 bfd_session_up(self)
2139 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2140 class BFDAuthOnOffTestCase(VppTestCase):
2141 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2148 def setUpClass(cls):
2149 super(BFDAuthOnOffTestCase, cls).setUpClass()
2150 cls.vapi.cli("set log class bfd level debug")
2152 cls.create_pg_interfaces([0])
2153 cls.pg0.config_ip4()
2155 cls.pg0.resolve_arp()
2158 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2162 def tearDownClass(cls):
2163 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2166 super(BFDAuthOnOffTestCase, self).setUp()
2167 self.factory = AuthKeyFactory()
2168 self.vapi.want_bfd_events()
2169 self.pg0.enable_capture()
2172 if not self.vpp_dead:
2173 self.vapi.want_bfd_events(enable_disable=0)
2174 self.vapi.collect_events() # clear the event queue
2175 super(BFDAuthOnOffTestCase, self).tearDown()
2177 def test_auth_on_immediate(self):
2178 """ turn auth on without disturbing session state (immediate) """
2179 key = self.factory.create_random_key(self)
2180 key.add_vpp_config()
2181 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2182 self.pg0.remote_ip4)
2183 self.vpp_session.add_vpp_config()
2184 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2185 bfd_session_up(self)
2186 for dummy in range(self.test_session.detect_mult * 2):
2187 p = wait_for_bfd_packet(self)
2188 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2189 self.test_session.send_packet()
2190 self.vpp_session.activate_auth(key)
2191 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2192 self.test_session.sha1_key = key
2193 for dummy in range(self.test_session.detect_mult * 2):
2194 p = wait_for_bfd_packet(self)
2195 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2196 self.test_session.send_packet()
2197 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2198 self.assert_equal(len(self.vapi.collect_events()), 0,
2199 "number of bfd events")
2201 def test_auth_off_immediate(self):
2202 """ turn auth off without disturbing session state (immediate) """
2203 key = self.factory.create_random_key(self)
2204 key.add_vpp_config()
2205 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2206 self.pg0.remote_ip4, sha1_key=key)
2207 self.vpp_session.add_vpp_config()
2208 self.test_session = BFDTestSession(
2209 self, self.pg0, AF_INET, sha1_key=key,
2210 bfd_key_id=self.vpp_session.bfd_key_id)
2211 bfd_session_up(self)
2212 # self.vapi.want_bfd_events(enable_disable=0)
2213 for dummy in range(self.test_session.detect_mult * 2):
2214 p = wait_for_bfd_packet(self)
2215 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2216 self.test_session.inc_seq_num()
2217 self.test_session.send_packet()
2218 self.vpp_session.deactivate_auth()
2219 self.test_session.bfd_key_id = None
2220 self.test_session.sha1_key = None
2221 for dummy in range(self.test_session.detect_mult * 2):
2222 p = wait_for_bfd_packet(self)
2223 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2224 self.test_session.inc_seq_num()
2225 self.test_session.send_packet()
2226 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2227 self.assert_equal(len(self.vapi.collect_events()), 0,
2228 "number of bfd events")
2230 def test_auth_change_key_immediate(self):
2231 """ change auth key without disturbing session state (immediate) """
2232 key1 = self.factory.create_random_key(self)
2233 key1.add_vpp_config()
2234 key2 = self.factory.create_random_key(self)
2235 key2.add_vpp_config()
2236 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2237 self.pg0.remote_ip4, sha1_key=key1)
2238 self.vpp_session.add_vpp_config()
2239 self.test_session = BFDTestSession(
2240 self, self.pg0, AF_INET, sha1_key=key1,
2241 bfd_key_id=self.vpp_session.bfd_key_id)
2242 bfd_session_up(self)
2243 for dummy in range(self.test_session.detect_mult * 2):
2244 p = wait_for_bfd_packet(self)
2245 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2246 self.test_session.send_packet()
2247 self.vpp_session.activate_auth(key2)
2248 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2249 self.test_session.sha1_key = key2
2250 for dummy in range(self.test_session.detect_mult * 2):
2251 p = wait_for_bfd_packet(self)
2252 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2253 self.test_session.send_packet()
2254 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2255 self.assert_equal(len(self.vapi.collect_events()), 0,
2256 "number of bfd events")
2258 def test_auth_on_delayed(self):
2259 """ turn auth on without disturbing session state (delayed) """
2260 key = self.factory.create_random_key(self)
2261 key.add_vpp_config()
2262 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2263 self.pg0.remote_ip4)
2264 self.vpp_session.add_vpp_config()
2265 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2266 bfd_session_up(self)
2267 for dummy in range(self.test_session.detect_mult * 2):
2268 wait_for_bfd_packet(self)
2269 self.test_session.send_packet()
2270 self.vpp_session.activate_auth(key, delayed=True)
2271 for dummy in range(self.test_session.detect_mult * 2):
2272 p = wait_for_bfd_packet(self)
2273 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2274 self.test_session.send_packet()
2275 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2276 self.test_session.sha1_key = key
2277 self.test_session.send_packet()
2278 for dummy in range(self.test_session.detect_mult * 2):
2279 p = wait_for_bfd_packet(self)
2280 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2281 self.test_session.send_packet()
2282 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2283 self.assert_equal(len(self.vapi.collect_events()), 0,
2284 "number of bfd events")
2286 def test_auth_off_delayed(self):
2287 """ turn auth off without disturbing session state (delayed) """
2288 key = self.factory.create_random_key(self)
2289 key.add_vpp_config()
2290 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2291 self.pg0.remote_ip4, sha1_key=key)
2292 self.vpp_session.add_vpp_config()
2293 self.test_session = BFDTestSession(
2294 self, self.pg0, AF_INET, sha1_key=key,
2295 bfd_key_id=self.vpp_session.bfd_key_id)
2296 bfd_session_up(self)
2297 for dummy in range(self.test_session.detect_mult * 2):
2298 p = wait_for_bfd_packet(self)
2299 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2300 self.test_session.send_packet()
2301 self.vpp_session.deactivate_auth(delayed=True)
2302 for dummy in range(self.test_session.detect_mult * 2):
2303 p = wait_for_bfd_packet(self)
2304 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2305 self.test_session.send_packet()
2306 self.test_session.bfd_key_id = None
2307 self.test_session.sha1_key = None
2308 self.test_session.send_packet()
2309 for dummy in range(self.test_session.detect_mult * 2):
2310 p = wait_for_bfd_packet(self)
2311 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2312 self.test_session.send_packet()
2313 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2314 self.assert_equal(len(self.vapi.collect_events()), 0,
2315 "number of bfd events")
2317 def test_auth_change_key_delayed(self):
2318 """ change auth key without disturbing session state (delayed) """
2319 key1 = self.factory.create_random_key(self)
2320 key1.add_vpp_config()
2321 key2 = self.factory.create_random_key(self)
2322 key2.add_vpp_config()
2323 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2324 self.pg0.remote_ip4, sha1_key=key1)
2325 self.vpp_session.add_vpp_config()
2326 self.vpp_session.admin_up()
2327 self.test_session = BFDTestSession(
2328 self, self.pg0, AF_INET, sha1_key=key1,
2329 bfd_key_id=self.vpp_session.bfd_key_id)
2330 bfd_session_up(self)
2331 for dummy in range(self.test_session.detect_mult * 2):
2332 p = wait_for_bfd_packet(self)
2333 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2334 self.test_session.send_packet()
2335 self.vpp_session.activate_auth(key2, delayed=True)
2336 for dummy in range(self.test_session.detect_mult * 2):
2337 p = wait_for_bfd_packet(self)
2338 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2339 self.test_session.send_packet()
2340 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2341 self.test_session.sha1_key = key2
2342 self.test_session.send_packet()
2343 for dummy in range(self.test_session.detect_mult * 2):
2344 p = wait_for_bfd_packet(self)
2345 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2346 self.test_session.send_packet()
2347 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2348 self.assert_equal(len(self.vapi.collect_events()), 0,
2349 "number of bfd events")
2352 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2353 class BFDCLITestCase(VppTestCase):
2354 """Bidirectional Forwarding Detection (BFD) (CLI) """
2358 def setUpClass(cls):
2359 super(BFDCLITestCase, cls).setUpClass()
2360 cls.vapi.cli("set log class bfd level debug")
2362 cls.create_pg_interfaces((0,))
2363 cls.pg0.config_ip4()
2364 cls.pg0.config_ip6()
2365 cls.pg0.resolve_arp()
2366 cls.pg0.resolve_ndp()
2369 super(BFDCLITestCase, cls).tearDownClass()
2373 def tearDownClass(cls):
2374 super(BFDCLITestCase, cls).tearDownClass()
2377 super(BFDCLITestCase, self).setUp()
2378 self.factory = AuthKeyFactory()
2379 self.pg0.enable_capture()
2383 self.vapi.want_bfd_events(enable_disable=0)
2384 except UnexpectedApiReturnValueError:
2385 # some tests aren't subscribed, so this is not an issue
2387 self.vapi.collect_events() # clear the event queue
2388 super(BFDCLITestCase, self).tearDown()
2390 def cli_verify_no_response(self, cli):
2391 """ execute a CLI, asserting that the response is empty """
2392 self.assert_equal(self.vapi.cli(cli),
2394 "CLI command response")
2396 def cli_verify_response(self, cli, expected):
2397 """ execute a CLI, asserting that the response matches expectation """
2398 self.assert_equal(self.vapi.cli(cli).strip(),
2400 "CLI command response")
2402 def test_show(self):
2403 """ show commands """
2404 k1 = self.factory.create_random_key(self)
2406 k2 = self.factory.create_random_key(
2407 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2409 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2411 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2414 self.logger.info(self.vapi.ppcli("show bfd keys"))
2415 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2416 self.logger.info(self.vapi.ppcli("show bfd"))
2418 def test_set_del_sha1_key(self):
2419 """ set/delete SHA1 auth key """
2420 k = self.factory.create_random_key(self)
2421 self.registry.register(k, self.logger)
2422 self.cli_verify_no_response(
2423 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2425 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2426 self.assertTrue(k.query_vpp_config())
2427 self.vpp_session = VppBFDUDPSession(
2428 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2429 self.vpp_session.add_vpp_config()
2430 self.test_session = \
2431 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2432 bfd_key_id=self.vpp_session.bfd_key_id)
2433 self.vapi.want_bfd_events()
2434 bfd_session_up(self)
2435 bfd_session_down(self)
2436 # try to replace the secret for the key - should fail because the key
2438 k2 = self.factory.create_random_key(self)
2439 self.cli_verify_response(
2440 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2442 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2443 "bfd key set: `bfd_auth_set_key' API call failed, "
2444 "rv=-103:BFD object in use")
2445 # manipulating the session using old secret should still work
2446 bfd_session_up(self)
2447 bfd_session_down(self)
2448 self.vpp_session.remove_vpp_config()
2449 self.cli_verify_no_response(
2450 "bfd key del conf-key-id %s" % k.conf_key_id)
2451 self.assertFalse(k.query_vpp_config())
2453 def test_set_del_meticulous_sha1_key(self):
2454 """ set/delete meticulous SHA1 auth key """
2455 k = self.factory.create_random_key(
2456 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2457 self.registry.register(k, self.logger)
2458 self.cli_verify_no_response(
2459 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2461 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2462 self.assertTrue(k.query_vpp_config())
2463 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2464 self.pg0.remote_ip6, af=AF_INET6,
2466 self.vpp_session.add_vpp_config()
2467 self.vpp_session.admin_up()
2468 self.test_session = \
2469 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2470 bfd_key_id=self.vpp_session.bfd_key_id)
2471 self.vapi.want_bfd_events()
2472 bfd_session_up(self)
2473 bfd_session_down(self)
2474 # try to replace the secret for the key - should fail because the key
2476 k2 = self.factory.create_random_key(self)
2477 self.cli_verify_response(
2478 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2480 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2481 "bfd key set: `bfd_auth_set_key' API call failed, "
2482 "rv=-103:BFD object in use")
2483 # manipulating the session using old secret should still work
2484 bfd_session_up(self)
2485 bfd_session_down(self)
2486 self.vpp_session.remove_vpp_config()
2487 self.cli_verify_no_response(
2488 "bfd key del conf-key-id %s" % k.conf_key_id)
2489 self.assertFalse(k.query_vpp_config())
2491 def test_add_mod_del_bfd_udp(self):
2492 """ create/modify/delete IPv4 BFD UDP session """
2493 vpp_session = VppBFDUDPSession(
2494 self, self.pg0, self.pg0.remote_ip4)
2495 self.registry.register(vpp_session, self.logger)
2496 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2497 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2498 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2499 self.pg0.remote_ip4,
2500 vpp_session.desired_min_tx,
2501 vpp_session.required_min_rx,
2502 vpp_session.detect_mult)
2503 self.cli_verify_no_response(cli_add_cmd)
2504 # 2nd add should fail
2505 self.cli_verify_response(
2507 "bfd udp session add: `bfd_add_add_session' API call"
2508 " failed, rv=-101:Duplicate BFD object")
2509 verify_bfd_session_config(self, vpp_session)
2510 mod_session = VppBFDUDPSession(
2511 self, self.pg0, self.pg0.remote_ip4,
2512 required_min_rx=2 * vpp_session.required_min_rx,
2513 desired_min_tx=3 * vpp_session.desired_min_tx,
2514 detect_mult=4 * vpp_session.detect_mult)
2515 self.cli_verify_no_response(
2516 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2517 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2518 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2519 mod_session.desired_min_tx, mod_session.required_min_rx,
2520 mod_session.detect_mult))
2521 verify_bfd_session_config(self, mod_session)
2522 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2523 "peer-addr %s" % (self.pg0.name,
2524 self.pg0.local_ip4, self.pg0.remote_ip4)
2525 self.cli_verify_no_response(cli_del_cmd)
2526 # 2nd del is expected to fail
2527 self.cli_verify_response(
2528 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2529 " failed, rv=-102:No such BFD object")
2530 self.assertFalse(vpp_session.query_vpp_config())
2532 def test_add_mod_del_bfd_udp6(self):
2533 """ create/modify/delete IPv6 BFD UDP session """
2534 vpp_session = VppBFDUDPSession(
2535 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2536 self.registry.register(vpp_session, self.logger)
2537 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2538 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2539 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2540 self.pg0.remote_ip6,
2541 vpp_session.desired_min_tx,
2542 vpp_session.required_min_rx,
2543 vpp_session.detect_mult)
2544 self.cli_verify_no_response(cli_add_cmd)
2545 # 2nd add should fail
2546 self.cli_verify_response(
2548 "bfd udp session add: `bfd_add_add_session' API call"
2549 " failed, rv=-101:Duplicate BFD object")
2550 verify_bfd_session_config(self, vpp_session)
2551 mod_session = VppBFDUDPSession(
2552 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2553 required_min_rx=2 * vpp_session.required_min_rx,
2554 desired_min_tx=3 * vpp_session.desired_min_tx,
2555 detect_mult=4 * vpp_session.detect_mult)
2556 self.cli_verify_no_response(
2557 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2558 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2559 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2560 mod_session.desired_min_tx,
2561 mod_session.required_min_rx, mod_session.detect_mult))
2562 verify_bfd_session_config(self, mod_session)
2563 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2564 "peer-addr %s" % (self.pg0.name,
2565 self.pg0.local_ip6, self.pg0.remote_ip6)
2566 self.cli_verify_no_response(cli_del_cmd)
2567 # 2nd del is expected to fail
2568 self.cli_verify_response(
2570 "bfd udp session del: `bfd_udp_del_session' API call"
2571 " failed, rv=-102:No such BFD object")
2572 self.assertFalse(vpp_session.query_vpp_config())
2574 def test_add_mod_del_bfd_udp_auth(self):
2575 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2576 key = self.factory.create_random_key(self)
2577 key.add_vpp_config()
2578 vpp_session = VppBFDUDPSession(
2579 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2580 self.registry.register(vpp_session, self.logger)
2581 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2582 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2583 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2584 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2585 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2586 vpp_session.detect_mult, key.conf_key_id,
2587 vpp_session.bfd_key_id)
2588 self.cli_verify_no_response(cli_add_cmd)
2589 # 2nd add should fail
2590 self.cli_verify_response(
2592 "bfd udp session add: `bfd_add_add_session' API call"
2593 " failed, rv=-101:Duplicate BFD object")
2594 verify_bfd_session_config(self, vpp_session)
2595 mod_session = VppBFDUDPSession(
2596 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2597 bfd_key_id=vpp_session.bfd_key_id,
2598 required_min_rx=2 * vpp_session.required_min_rx,
2599 desired_min_tx=3 * vpp_session.desired_min_tx,
2600 detect_mult=4 * vpp_session.detect_mult)
2601 self.cli_verify_no_response(
2602 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2603 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2604 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2605 mod_session.desired_min_tx,
2606 mod_session.required_min_rx, mod_session.detect_mult))
2607 verify_bfd_session_config(self, mod_session)
2608 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2609 "peer-addr %s" % (self.pg0.name,
2610 self.pg0.local_ip4, self.pg0.remote_ip4)
2611 self.cli_verify_no_response(cli_del_cmd)
2612 # 2nd del is expected to fail
2613 self.cli_verify_response(
2615 "bfd udp session del: `bfd_udp_del_session' API call"
2616 " failed, rv=-102:No such BFD object")
2617 self.assertFalse(vpp_session.query_vpp_config())
2619 def test_add_mod_del_bfd_udp6_auth(self):
2620 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2621 key = self.factory.create_random_key(
2622 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2623 key.add_vpp_config()
2624 vpp_session = VppBFDUDPSession(
2625 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2626 self.registry.register(vpp_session, self.logger)
2627 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2628 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2629 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2630 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2631 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2632 vpp_session.detect_mult, key.conf_key_id,
2633 vpp_session.bfd_key_id)
2634 self.cli_verify_no_response(cli_add_cmd)
2635 # 2nd add should fail
2636 self.cli_verify_response(
2638 "bfd udp session add: `bfd_add_add_session' API call"
2639 " failed, rv=-101:Duplicate BFD object")
2640 verify_bfd_session_config(self, vpp_session)
2641 mod_session = VppBFDUDPSession(
2642 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2643 bfd_key_id=vpp_session.bfd_key_id,
2644 required_min_rx=2 * vpp_session.required_min_rx,
2645 desired_min_tx=3 * vpp_session.desired_min_tx,
2646 detect_mult=4 * vpp_session.detect_mult)
2647 self.cli_verify_no_response(
2648 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2649 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2650 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2651 mod_session.desired_min_tx,
2652 mod_session.required_min_rx, mod_session.detect_mult))
2653 verify_bfd_session_config(self, mod_session)
2654 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2655 "peer-addr %s" % (self.pg0.name,
2656 self.pg0.local_ip6, self.pg0.remote_ip6)
2657 self.cli_verify_no_response(cli_del_cmd)
2658 # 2nd del is expected to fail
2659 self.cli_verify_response(
2661 "bfd udp session del: `bfd_udp_del_session' API call"
2662 " failed, rv=-102:No such BFD object")
2663 self.assertFalse(vpp_session.query_vpp_config())
2665 def test_auth_on_off(self):
2666 """ turn authentication on and off """
2667 key = self.factory.create_random_key(
2668 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2669 key.add_vpp_config()
2670 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2671 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2673 session.add_vpp_config()
2675 "bfd udp session auth activate interface %s local-addr %s "\
2676 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2677 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2678 key.conf_key_id, auth_session.bfd_key_id)
2679 self.cli_verify_no_response(cli_activate)
2680 verify_bfd_session_config(self, auth_session)
2681 self.cli_verify_no_response(cli_activate)
2682 verify_bfd_session_config(self, auth_session)
2684 "bfd udp session auth deactivate interface %s local-addr %s "\
2686 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2687 self.cli_verify_no_response(cli_deactivate)
2688 verify_bfd_session_config(self, session)
2689 self.cli_verify_no_response(cli_deactivate)
2690 verify_bfd_session_config(self, session)
2692 def test_auth_on_off_delayed(self):
2693 """ turn authentication on and off (delayed) """
2694 key = self.factory.create_random_key(
2695 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2696 key.add_vpp_config()
2697 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2698 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2700 session.add_vpp_config()
2702 "bfd udp session auth activate interface %s local-addr %s "\
2703 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2704 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2705 key.conf_key_id, auth_session.bfd_key_id)
2706 self.cli_verify_no_response(cli_activate)
2707 verify_bfd_session_config(self, auth_session)
2708 self.cli_verify_no_response(cli_activate)
2709 verify_bfd_session_config(self, auth_session)
2711 "bfd udp session auth deactivate interface %s local-addr %s "\
2712 "peer-addr %s delayed yes"\
2713 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2714 self.cli_verify_no_response(cli_deactivate)
2715 verify_bfd_session_config(self, session)
2716 self.cli_verify_no_response(cli_deactivate)
2717 verify_bfd_session_config(self, session)
2719 def test_admin_up_down(self):
2720 """ put session admin-up and admin-down """
2721 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2722 session.add_vpp_config()
2724 "bfd udp session set-flags admin down interface %s local-addr %s "\
2726 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2728 "bfd udp session set-flags admin up interface %s local-addr %s "\
2730 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2731 self.cli_verify_no_response(cli_down)
2732 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2733 self.cli_verify_no_response(cli_up)
2734 verify_bfd_session_config(self, session, state=BFDState.down)
2736 def test_set_del_udp_echo_source(self):
2737 """ set/del udp echo source """
2738 self.create_loopback_interfaces(1)
2739 self.loopback0 = self.lo_interfaces[0]
2740 self.loopback0.admin_up()
2741 self.cli_verify_response("show bfd echo-source",
2742 "UDP echo source is not set.")
2743 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2744 self.cli_verify_no_response(cli_set)
2745 self.cli_verify_response("show bfd echo-source",
2746 "UDP echo source is: %s\n"
2747 "IPv4 address usable as echo source: none\n"
2748 "IPv6 address usable as echo source: none" %
2749 self.loopback0.name)
2750 self.loopback0.config_ip4()
2751 unpacked = unpack("!L", self.loopback0.local_ip4n)
2752 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2753 self.cli_verify_response("show bfd echo-source",
2754 "UDP echo source is: %s\n"
2755 "IPv4 address usable as echo source: %s\n"
2756 "IPv6 address usable as echo source: none" %
2757 (self.loopback0.name, echo_ip4))
2758 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2759 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2760 unpacked[2], unpacked[3] ^ 1))
2761 self.loopback0.config_ip6()
2762 self.cli_verify_response("show bfd echo-source",
2763 "UDP echo source is: %s\n"
2764 "IPv4 address usable as echo source: %s\n"
2765 "IPv6 address usable as echo source: %s" %
2766 (self.loopback0.name, echo_ip4, echo_ip6))
2767 cli_del = "bfd udp echo-source del"
2768 self.cli_verify_no_response(cli_del)
2769 self.cli_verify_response("show bfd echo-source",
2770 "UDP echo source is not set.")
2772 if __name__ == '__main__':
2773 unittest.main(testRunner=VppTestRunner)