4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22 BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError
29 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
30 from vpp_gre_interface import VppGreInterface
35 class AuthKeyFactory(object):
36 """Factory class for creating auth keys with unique conf key ID"""
39 self._conf_key_ids = {}
41 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
42 """ create a random key with unique conf key id """
43 conf_key_id = randint(0, 0xFFFFFFFF)
44 while conf_key_id in self._conf_key_ids:
45 conf_key_id = randint(0, 0xFFFFFFFF)
46 self._conf_key_ids[conf_key_id] = 1
47 key = scapy.compat.raw(
48 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
49 return VppBFDAuthKey(test=test, auth_type=auth_type,
50 conf_key_id=conf_key_id, key=key)
53 @unittest.skipUnless(running_extended_tests, "part of extended tests")
54 class BFDAPITestCase(VppTestCase):
55 """Bidirectional Forwarding Detection (BFD) - API"""
62 super(BFDAPITestCase, cls).setUpClass()
63 cls.vapi.cli("set log class bfd level debug")
65 cls.create_pg_interfaces(range(2))
66 for i in cls.pg_interfaces:
72 super(BFDAPITestCase, cls).tearDownClass()
76 def tearDownClass(cls):
77 super(BFDAPITestCase, cls).tearDownClass()
80 super(BFDAPITestCase, self).setUp()
81 self.factory = AuthKeyFactory()
83 def test_add_bfd(self):
84 """ create a BFD session """
85 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
86 session.add_vpp_config()
87 self.logger.debug("Session state is %s", session.state)
88 session.remove_vpp_config()
89 session.add_vpp_config()
90 self.logger.debug("Session state is %s", session.state)
91 session.remove_vpp_config()
93 def test_double_add(self):
94 """ create the same BFD session twice (negative case) """
95 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
96 session.add_vpp_config()
98 with self.vapi.assert_negative_api_retval():
99 session.add_vpp_config()
101 session.remove_vpp_config()
103 def test_add_bfd6(self):
104 """ create IPv6 BFD session """
105 session = VppBFDUDPSession(
106 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
107 session.add_vpp_config()
108 self.logger.debug("Session state is %s", session.state)
109 session.remove_vpp_config()
110 session.add_vpp_config()
111 self.logger.debug("Session state is %s", session.state)
112 session.remove_vpp_config()
114 def test_mod_bfd(self):
115 """ modify BFD session parameters """
116 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
117 desired_min_tx=50000,
118 required_min_rx=10000,
120 session.add_vpp_config()
121 s = session.get_bfd_udp_session_dump_entry()
122 self.assert_equal(session.desired_min_tx,
124 "desired min transmit interval")
125 self.assert_equal(session.required_min_rx,
127 "required min receive interval")
128 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
130 required_min_rx=session.required_min_rx * 2,
131 detect_mult=session.detect_mult * 2)
132 s = session.get_bfd_udp_session_dump_entry()
133 self.assert_equal(session.desired_min_tx,
135 "desired min transmit interval")
136 self.assert_equal(session.required_min_rx,
138 "required min receive interval")
139 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
141 def test_add_sha1_keys(self):
142 """ add SHA1 keys """
144 keys = [self.factory.create_random_key(
145 self) for i in range(0, key_count)]
147 self.assertFalse(key.query_vpp_config())
151 self.assertTrue(key.query_vpp_config())
153 indexes = range(key_count)
158 key.remove_vpp_config()
160 for j in range(key_count):
163 self.assertFalse(key.query_vpp_config())
165 self.assertTrue(key.query_vpp_config())
166 # should be removed now
168 self.assertFalse(key.query_vpp_config())
169 # add back and remove again
173 self.assertTrue(key.query_vpp_config())
175 key.remove_vpp_config()
177 self.assertFalse(key.query_vpp_config())
179 def test_add_bfd_sha1(self):
180 """ create a BFD session (SHA1) """
181 key = self.factory.create_random_key(self)
183 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185 session.add_vpp_config()
186 self.logger.debug("Session state is %s", session.state)
187 session.remove_vpp_config()
188 session.add_vpp_config()
189 self.logger.debug("Session state is %s", session.state)
190 session.remove_vpp_config()
192 def test_double_add_sha1(self):
193 """ create the same BFD session twice (negative case) (SHA1) """
194 key = self.factory.create_random_key(self)
196 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
198 session.add_vpp_config()
199 with self.assertRaises(Exception):
200 session.add_vpp_config()
202 def test_add_auth_nonexistent_key(self):
203 """ create BFD session using non-existent SHA1 (negative case) """
204 session = VppBFDUDPSession(
205 self, self.pg0, self.pg0.remote_ip4,
206 sha1_key=self.factory.create_random_key(self))
207 with self.assertRaises(Exception):
208 session.add_vpp_config()
210 def test_shared_sha1_key(self):
211 """ share single SHA1 key between multiple BFD sessions """
212 key = self.factory.create_random_key(self)
215 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
218 sha1_key=key, af=AF_INET6),
219 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
221 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
222 sha1_key=key, af=AF_INET6)]
227 e = key.get_bfd_auth_keys_dump_entry()
228 self.assert_equal(e.use_count, len(sessions) - removed,
229 "Use count for shared key")
230 s.remove_vpp_config()
232 e = key.get_bfd_auth_keys_dump_entry()
233 self.assert_equal(e.use_count, len(sessions) - removed,
234 "Use count for shared key")
236 def test_activate_auth(self):
237 """ activate SHA1 authentication """
238 key = self.factory.create_random_key(self)
240 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
241 session.add_vpp_config()
242 session.activate_auth(key)
244 def test_deactivate_auth(self):
245 """ deactivate SHA1 authentication """
246 key = self.factory.create_random_key(self)
248 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
249 session.add_vpp_config()
250 session.activate_auth(key)
251 session.deactivate_auth()
253 def test_change_key(self):
254 """ change SHA1 key """
255 key1 = self.factory.create_random_key(self)
256 key2 = self.factory.create_random_key(self)
257 while key2.conf_key_id == key1.conf_key_id:
258 key2 = self.factory.create_random_key(self)
259 key1.add_vpp_config()
260 key2.add_vpp_config()
261 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
263 session.add_vpp_config()
264 session.activate_auth(key2)
266 def test_set_del_udp_echo_source(self):
267 """ set/del udp echo source """
268 self.create_loopback_interfaces(1)
269 self.loopback0 = self.lo_interfaces[0]
270 self.loopback0.admin_up()
271 echo_source = self.vapi.bfd_udp_get_echo_source()
272 self.assertFalse(echo_source.is_set)
273 self.assertFalse(echo_source.have_usable_ip4)
274 self.assertFalse(echo_source.have_usable_ip6)
276 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
277 echo_source = self.vapi.bfd_udp_get_echo_source()
278 self.assertTrue(echo_source.is_set)
279 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
280 self.assertFalse(echo_source.have_usable_ip4)
281 self.assertFalse(echo_source.have_usable_ip6)
283 self.loopback0.config_ip4()
284 unpacked = unpack("!L", self.loopback0.local_ip4n)
285 echo_ip4 = pack("!L", unpacked[0] ^ 1)
286 echo_source = self.vapi.bfd_udp_get_echo_source()
287 self.assertTrue(echo_source.is_set)
288 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
289 self.assertTrue(echo_source.have_usable_ip4)
290 self.assertEqual(echo_source.ip4_addr, echo_ip4)
291 self.assertFalse(echo_source.have_usable_ip6)
293 self.loopback0.config_ip6()
294 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
295 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
297 echo_source = self.vapi.bfd_udp_get_echo_source()
298 self.assertTrue(echo_source.is_set)
299 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
300 self.assertTrue(echo_source.have_usable_ip4)
301 self.assertEqual(echo_source.ip4_addr, echo_ip4)
302 self.assertTrue(echo_source.have_usable_ip6)
303 self.assertEqual(echo_source.ip6_addr, echo_ip6)
305 self.vapi.bfd_udp_del_echo_source()
306 echo_source = self.vapi.bfd_udp_get_echo_source()
307 self.assertFalse(echo_source.is_set)
308 self.assertFalse(echo_source.have_usable_ip4)
309 self.assertFalse(echo_source.have_usable_ip6)
312 class BFDTestSession(object):
313 """ BFD session as seen from test framework side """
315 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
316 bfd_key_id=None, our_seq_number=None,
317 tunnel_header=None, phy_interface=None):
320 self.sha1_key = sha1_key
321 self.bfd_key_id = bfd_key_id
322 self.interface = interface
324 self.phy_interface = phy_interface
326 self.phy_interface = self.interface
327 self.udp_sport = randint(49152, 65535)
328 if our_seq_number is None:
329 self.our_seq_number = randint(0, 40000000)
331 self.our_seq_number = our_seq_number
332 self.vpp_seq_number = None
333 self.my_discriminator = 0
334 self.desired_min_tx = 300000
335 self.required_min_rx = 300000
336 self.required_min_echo_rx = None
337 self.detect_mult = detect_mult
338 self.diag = BFDDiagCode.no_diagnostic
339 self.your_discriminator = None
340 self.state = BFDState.down
341 self.auth_type = BFDAuthType.no_auth
342 self.tunnel_header = tunnel_header
344 def inc_seq_num(self):
345 """ increment sequence number, wrapping if needed """
346 if self.our_seq_number == 0xFFFFFFFF:
347 self.our_seq_number = 0
349 self.our_seq_number += 1
351 def update(self, my_discriminator=None, your_discriminator=None,
352 desired_min_tx=None, required_min_rx=None,
353 required_min_echo_rx=None, detect_mult=None,
354 diag=None, state=None, auth_type=None):
355 """ update BFD parameters associated with session """
356 if my_discriminator is not None:
357 self.my_discriminator = my_discriminator
358 if your_discriminator is not None:
359 self.your_discriminator = your_discriminator
360 if required_min_rx is not None:
361 self.required_min_rx = required_min_rx
362 if required_min_echo_rx is not None:
363 self.required_min_echo_rx = required_min_echo_rx
364 if desired_min_tx is not None:
365 self.desired_min_tx = desired_min_tx
366 if detect_mult is not None:
367 self.detect_mult = detect_mult
370 if state is not None:
372 if auth_type is not None:
373 self.auth_type = auth_type
375 def fill_packet_fields(self, packet):
376 """ set packet fields with known values in packet """
378 if self.my_discriminator:
379 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
380 self.my_discriminator)
381 bfd.my_discriminator = self.my_discriminator
382 if self.your_discriminator:
383 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
384 self.your_discriminator)
385 bfd.your_discriminator = self.your_discriminator
386 if self.required_min_rx:
387 self.test.logger.debug(
388 "BFD: setting packet.required_min_rx_interval=%s",
389 self.required_min_rx)
390 bfd.required_min_rx_interval = self.required_min_rx
391 if self.required_min_echo_rx:
392 self.test.logger.debug(
393 "BFD: setting packet.required_min_echo_rx=%s",
394 self.required_min_echo_rx)
395 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
396 if self.desired_min_tx:
397 self.test.logger.debug(
398 "BFD: setting packet.desired_min_tx_interval=%s",
400 bfd.desired_min_tx_interval = self.desired_min_tx
402 self.test.logger.debug(
403 "BFD: setting packet.detect_mult=%s", self.detect_mult)
404 bfd.detect_mult = self.detect_mult
406 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
409 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
410 bfd.state = self.state
412 # this is used by a negative test-case
413 self.test.logger.debug("BFD: setting packet.auth_type=%s",
415 bfd.auth_type = self.auth_type
417 def create_packet(self):
418 """ create a BFD packet, reflecting the current state of session """
421 bfd.auth_type = self.sha1_key.auth_type
422 bfd.auth_len = BFD.sha1_auth_len
423 bfd.auth_key_id = self.bfd_key_id
424 bfd.auth_seq_num = self.our_seq_number
425 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
428 packet = Ether(src=self.phy_interface.remote_mac,
429 dst=self.phy_interface.local_mac)
430 if self.tunnel_header:
431 packet = packet / self.tunnel_header
432 if self.af == AF_INET6:
434 IPv6(src=self.interface.remote_ip6,
435 dst=self.interface.local_ip6,
437 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
441 IP(src=self.interface.remote_ip4,
442 dst=self.interface.local_ip4,
444 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
446 self.test.logger.debug("BFD: Creating packet")
447 self.fill_packet_fields(packet)
449 hash_material = scapy.compat.raw(
450 packet[BFD])[:32] + self.sha1_key.key + \
451 "\0" * (20 - len(self.sha1_key.key))
452 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
453 hashlib.sha1(hash_material).hexdigest())
454 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
457 def send_packet(self, packet=None, interface=None):
458 """ send packet on interface, creating the packet if needed """
460 packet = self.create_packet()
461 if interface is None:
462 interface = self.phy_interface
463 self.test.logger.debug(ppp("Sending packet:", packet))
464 interface.add_stream(packet)
467 def verify_sha1_auth(self, packet):
468 """ Verify correctness of authentication in BFD layer. """
470 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
471 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
473 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
474 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
475 if self.vpp_seq_number is None:
476 self.vpp_seq_number = bfd.auth_seq_num
477 self.test.logger.debug("Received initial sequence number: %s" %
480 recvd_seq_num = bfd.auth_seq_num
481 self.test.logger.debug("Received followup sequence number: %s" %
483 if self.vpp_seq_number < 0xffffffff:
484 if self.sha1_key.auth_type == \
485 BFDAuthType.meticulous_keyed_sha1:
486 self.test.assert_equal(recvd_seq_num,
487 self.vpp_seq_number + 1,
488 "BFD sequence number")
490 self.test.assert_in_range(recvd_seq_num,
492 self.vpp_seq_number + 1,
493 "BFD sequence number")
495 if self.sha1_key.auth_type == \
496 BFDAuthType.meticulous_keyed_sha1:
497 self.test.assert_equal(recvd_seq_num, 0,
498 "BFD sequence number")
500 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
501 "BFD sequence number not one of "
502 "(%s, 0)" % self.vpp_seq_number)
503 self.vpp_seq_number = recvd_seq_num
504 # last 20 bytes represent the hash - so replace them with the key,
505 # pad the result with zeros and hash the result
506 hash_material = bfd.original[:-20] + self.sha1_key.key + \
507 b"\0" * (20 - len(self.sha1_key.key))
508 expected_hash = hashlib.sha1(hash_material).hexdigest()
509 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
510 expected_hash, "Auth key hash")
512 def verify_bfd(self, packet):
513 """ Verify correctness of BFD layer. """
515 self.test.assert_equal(bfd.version, 1, "BFD version")
516 self.test.assert_equal(bfd.your_discriminator,
517 self.my_discriminator,
518 "BFD - your discriminator")
520 self.verify_sha1_auth(packet)
523 def bfd_session_up(test):
524 """ Bring BFD session up """
525 test.logger.info("BFD: Waiting for slow hello")
526 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
528 if hasattr(test, 'vpp_clock_offset'):
529 old_offset = test.vpp_clock_offset
530 test.vpp_clock_offset = time.time() - p.time
531 test.logger.debug("BFD: Calculated vpp clock offset: %s",
532 test.vpp_clock_offset)
534 test.assertAlmostEqual(
535 old_offset, test.vpp_clock_offset, delta=0.5,
536 msg="vpp clock offset not stable (new: %s, old: %s)" %
537 (test.vpp_clock_offset, old_offset))
538 test.logger.info("BFD: Sending Init")
539 test.test_session.update(my_discriminator=randint(0, 40000000),
540 your_discriminator=p[BFD].my_discriminator,
542 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
543 BFDAuthType.meticulous_keyed_sha1:
544 test.test_session.inc_seq_num()
545 test.test_session.send_packet()
546 test.logger.info("BFD: Waiting for event")
547 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
548 verify_event(test, e, expected_state=BFDState.up)
549 test.logger.info("BFD: Session is Up")
550 test.test_session.update(state=BFDState.up)
551 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
552 BFDAuthType.meticulous_keyed_sha1:
553 test.test_session.inc_seq_num()
554 test.test_session.send_packet()
555 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
558 def bfd_session_down(test):
559 """ Bring BFD session down """
560 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561 test.test_session.update(state=BFDState.down)
562 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
563 BFDAuthType.meticulous_keyed_sha1:
564 test.test_session.inc_seq_num()
565 test.test_session.send_packet()
566 test.logger.info("BFD: Waiting for event")
567 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
568 verify_event(test, e, expected_state=BFDState.down)
569 test.logger.info("BFD: Session is Down")
570 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
573 def verify_bfd_session_config(test, session, state=None):
574 dump = session.get_bfd_udp_session_dump_entry()
575 test.assertIsNotNone(dump)
576 # since dump is not none, we have verified that sw_if_index and addresses
577 # are valid (in get_bfd_udp_session_dump_entry)
579 test.assert_equal(dump.state, state, "session state")
580 test.assert_equal(dump.required_min_rx, session.required_min_rx,
581 "required min rx interval")
582 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
583 "desired min tx interval")
584 test.assert_equal(dump.detect_mult, session.detect_mult,
586 if session.sha1_key is None:
587 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
589 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
590 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
592 test.assert_equal(dump.conf_key_id,
593 session.sha1_key.conf_key_id,
597 def verify_ip(test, packet):
598 """ Verify correctness of IP layer. """
599 if test.vpp_session.af == AF_INET6:
601 local_ip = test.vpp_session.interface.local_ip6
602 remote_ip = test.vpp_session.interface.remote_ip6
603 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
606 local_ip = test.vpp_session.interface.local_ip4
607 remote_ip = test.vpp_session.interface.remote_ip4
608 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
609 test.assert_equal(ip.src, local_ip, "IP source address")
610 test.assert_equal(ip.dst, remote_ip, "IP destination address")
613 def verify_udp(test, packet):
614 """ Verify correctness of UDP layer. """
616 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
617 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
621 def verify_event(test, event, expected_state):
622 """ Verify correctness of event values. """
624 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
625 test.assert_equal(e.sw_if_index,
626 test.vpp_session.interface.sw_if_index,
627 "BFD interface index")
629 if test.vpp_session.af == AF_INET6:
631 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
632 if test.vpp_session.af == AF_INET:
633 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
634 "Local IPv4 address")
635 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
638 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
639 "Local IPv6 address")
640 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
642 test.assert_equal(e.state, expected_state, BFDState)
645 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
646 """ wait for BFD packet and verify its correctness
648 :param timeout: how long to wait
649 :param pcap_time_min: ignore packets with pcap timestamp lower than this
651 :returns: tuple (packet, time spent waiting for packet)
653 test.logger.info("BFD: Waiting for BFD packet")
654 deadline = time.time() + timeout
659 test.assert_in_range(counter, 0, 100, "number of packets ignored")
660 time_left = deadline - time.time()
662 raise CaptureTimeoutError("Packet did not arrive within timeout")
663 p = test.pg0.wait_for_packet(timeout=time_left)
664 test.logger.debug(ppp("BFD: Got packet:", p))
665 if pcap_time_min is not None and p.time < pcap_time_min:
666 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
667 "pcap time min %s):" %
668 (p.time, pcap_time_min), p))
672 # strip an IP layer and move to the next
677 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
679 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
682 test.test_session.verify_bfd(p)
686 @unittest.skipUnless(running_extended_tests, "part of extended tests")
687 class BFD4TestCase(VppTestCase):
688 """Bidirectional Forwarding Detection (BFD)"""
691 vpp_clock_offset = None
697 super(BFD4TestCase, cls).setUpClass()
698 cls.vapi.cli("set log class bfd level debug")
700 cls.create_pg_interfaces([0])
701 cls.create_loopback_interfaces(1)
702 cls.loopback0 = cls.lo_interfaces[0]
703 cls.loopback0.config_ip4()
704 cls.loopback0.admin_up()
706 cls.pg0.configure_ipv4_neighbors()
708 cls.pg0.resolve_arp()
711 super(BFD4TestCase, cls).tearDownClass()
715 def tearDownClass(cls):
716 super(BFD4TestCase, cls).tearDownClass()
719 super(BFD4TestCase, self).setUp()
720 self.factory = AuthKeyFactory()
721 self.vapi.want_bfd_events()
722 self.pg0.enable_capture()
724 self.vpp_session = VppBFDUDPSession(self, self.pg0,
726 self.vpp_session.add_vpp_config()
727 self.vpp_session.admin_up()
728 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
730 self.vapi.want_bfd_events(enable_disable=0)
734 if not self.vpp_dead:
735 self.vapi.want_bfd_events(enable_disable=0)
736 self.vapi.collect_events() # clear the event queue
737 super(BFD4TestCase, self).tearDown()
739 def test_session_up(self):
740 """ bring BFD session up """
743 def test_session_up_by_ip(self):
744 """ bring BFD session up - first frame looked up by address pair """
745 self.logger.info("BFD: Sending Slow control frame")
746 self.test_session.update(my_discriminator=randint(0, 40000000))
747 self.test_session.send_packet()
748 self.pg0.enable_capture()
749 p = self.pg0.wait_for_packet(1)
750 self.assert_equal(p[BFD].your_discriminator,
751 self.test_session.my_discriminator,
752 "BFD - your discriminator")
753 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
754 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
756 self.logger.info("BFD: Waiting for event")
757 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758 verify_event(self, e, expected_state=BFDState.init)
759 self.logger.info("BFD: Sending Up")
760 self.test_session.send_packet()
761 self.logger.info("BFD: Waiting for event")
762 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
763 verify_event(self, e, expected_state=BFDState.up)
764 self.logger.info("BFD: Session is Up")
765 self.test_session.update(state=BFDState.up)
766 self.test_session.send_packet()
767 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
769 def test_session_down(self):
770 """ bring BFD session down """
772 bfd_session_down(self)
774 @unittest.skipUnless(running_extended_tests, "part of extended tests")
775 def test_hold_up(self):
776 """ hold BFD session up """
778 for dummy in range(self.test_session.detect_mult * 2):
779 wait_for_bfd_packet(self)
780 self.test_session.send_packet()
781 self.assert_equal(len(self.vapi.collect_events()), 0,
782 "number of bfd events")
784 @unittest.skipUnless(running_extended_tests, "part of extended tests")
785 def test_slow_timer(self):
786 """ verify slow periodic control frames while session down """
788 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
789 prev_packet = wait_for_bfd_packet(self, 2)
790 for dummy in range(packet_count):
791 next_packet = wait_for_bfd_packet(self, 2)
792 time_diff = next_packet.time - prev_packet.time
793 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
794 # to work around timing issues
795 self.assert_in_range(
796 time_diff, 0.70, 1.05, "time between slow packets")
797 prev_packet = next_packet
799 @unittest.skipUnless(running_extended_tests, "part of extended tests")
800 def test_zero_remote_min_rx(self):
801 """ no packets when zero remote required min rx interval """
803 self.test_session.update(required_min_rx=0)
804 self.test_session.send_packet()
805 for dummy in range(self.test_session.detect_mult):
806 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
807 "sleep before transmitting bfd packet")
808 self.test_session.send_packet()
810 p = wait_for_bfd_packet(self, timeout=0)
811 self.logger.error(ppp("Received unexpected packet:", p))
812 except CaptureTimeoutError:
815 len(self.vapi.collect_events()), 0, "number of bfd events")
816 self.test_session.update(required_min_rx=300000)
817 for dummy in range(3):
818 self.test_session.send_packet()
820 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
822 len(self.vapi.collect_events()), 0, "number of bfd events")
824 @unittest.skipUnless(running_extended_tests, "part of extended tests")
825 def test_conn_down(self):
826 """ verify session goes down after inactivity """
828 detection_time = self.test_session.detect_mult *\
829 self.vpp_session.required_min_rx / USEC_IN_SEC
830 self.sleep(detection_time, "waiting for BFD session time-out")
831 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
832 verify_event(self, e, expected_state=BFDState.down)
834 @unittest.skipUnless(running_extended_tests, "part of extended tests")
835 def test_large_required_min_rx(self):
836 """ large remote required min rx interval """
838 p = wait_for_bfd_packet(self)
840 self.test_session.update(required_min_rx=interval)
841 self.test_session.send_packet()
842 time_mark = time.time()
844 # busy wait here, trying to collect a packet or event, vpp is not
845 # allowed to send packets and the session will timeout first - so the
846 # Up->Down event must arrive before any packets do
847 while time.time() < time_mark + interval / USEC_IN_SEC:
849 p = wait_for_bfd_packet(self, timeout=0)
850 # if vpp managed to send a packet before we did the session
851 # session update, then that's fine, ignore it
852 if p.time < time_mark - self.vpp_clock_offset:
854 self.logger.error(ppp("Received unexpected packet:", p))
856 except CaptureTimeoutError:
858 events = self.vapi.collect_events()
860 verify_event(self, events[0], BFDState.down)
862 self.assert_equal(count, 0, "number of packets received")
864 @unittest.skipUnless(running_extended_tests, "part of extended tests")
865 def test_immediate_remote_min_rx_reduction(self):
866 """ immediately honor remote required min rx reduction """
867 self.vpp_session.remove_vpp_config()
868 self.vpp_session = VppBFDUDPSession(
869 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
870 self.pg0.enable_capture()
871 self.vpp_session.add_vpp_config()
872 self.test_session.update(desired_min_tx=1000000,
873 required_min_rx=1000000)
875 reference_packet = wait_for_bfd_packet(self)
876 time_mark = time.time()
878 self.test_session.update(required_min_rx=interval)
879 self.test_session.send_packet()
880 extra_time = time.time() - time_mark
881 p = wait_for_bfd_packet(self)
882 # first packet is allowed to be late by time we spent doing the update
883 # calculated in extra_time
884 self.assert_in_range(p.time - reference_packet.time,
885 .95 * 0.75 * interval / USEC_IN_SEC,
886 1.05 * interval / USEC_IN_SEC + extra_time,
887 "time between BFD packets")
889 for dummy in range(3):
890 p = wait_for_bfd_packet(self)
891 diff = p.time - reference_packet.time
892 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
893 1.05 * interval / USEC_IN_SEC,
894 "time between BFD packets")
897 @unittest.skipUnless(running_extended_tests, "part of extended tests")
898 def test_modify_req_min_rx_double(self):
899 """ modify session - double required min rx """
901 p = wait_for_bfd_packet(self)
902 self.test_session.update(desired_min_tx=10000,
903 required_min_rx=10000)
904 self.test_session.send_packet()
905 # double required min rx
906 self.vpp_session.modify_parameters(
907 required_min_rx=2 * self.vpp_session.required_min_rx)
908 p = wait_for_bfd_packet(
909 self, pcap_time_min=time.time() - self.vpp_clock_offset)
910 # poll bit needs to be set
911 self.assertIn("P", p.sprintf("%BFD.flags%"),
912 "Poll bit not set in BFD packet")
913 # finish poll sequence with final packet
914 final = self.test_session.create_packet()
915 final[BFD].flags = "F"
916 timeout = self.test_session.detect_mult * \
917 max(self.test_session.desired_min_tx,
918 self.vpp_session.required_min_rx) / USEC_IN_SEC
919 self.test_session.send_packet(final)
920 time_mark = time.time()
921 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
922 verify_event(self, e, expected_state=BFDState.down)
923 time_to_event = time.time() - time_mark
924 self.assert_in_range(time_to_event, .9 * timeout,
925 1.1 * timeout, "session timeout")
927 @unittest.skipUnless(running_extended_tests, "part of extended tests")
928 def test_modify_req_min_rx_halve(self):
929 """ modify session - halve required min rx """
930 self.vpp_session.modify_parameters(
931 required_min_rx=2 * self.vpp_session.required_min_rx)
933 p = wait_for_bfd_packet(self)
934 self.test_session.update(desired_min_tx=10000,
935 required_min_rx=10000)
936 self.test_session.send_packet()
937 p = wait_for_bfd_packet(
938 self, pcap_time_min=time.time() - self.vpp_clock_offset)
939 # halve required min rx
940 old_required_min_rx = self.vpp_session.required_min_rx
941 self.vpp_session.modify_parameters(
942 required_min_rx=self.vpp_session.required_min_rx // 2)
943 # now we wait 0.8*3*old-req-min-rx and the session should still be up
944 self.sleep(0.8 * self.vpp_session.detect_mult *
945 old_required_min_rx / USEC_IN_SEC,
946 "wait before finishing poll sequence")
947 self.assert_equal(len(self.vapi.collect_events()), 0,
948 "number of bfd events")
949 p = wait_for_bfd_packet(self)
950 # poll bit needs to be set
951 self.assertIn("P", p.sprintf("%BFD.flags%"),
952 "Poll bit not set in BFD packet")
953 # finish poll sequence with final packet
954 final = self.test_session.create_packet()
955 final[BFD].flags = "F"
956 self.test_session.send_packet(final)
957 # now the session should time out under new conditions
958 detection_time = self.test_session.detect_mult *\
959 self.vpp_session.required_min_rx / USEC_IN_SEC
961 e = self.vapi.wait_for_event(
962 2 * detection_time, "bfd_udp_session_details")
964 self.assert_in_range(after - before,
965 0.9 * detection_time,
966 1.1 * detection_time,
967 "time before bfd session goes down")
968 verify_event(self, e, expected_state=BFDState.down)
970 @unittest.skipUnless(running_extended_tests, "part of extended tests")
971 def test_modify_detect_mult(self):
972 """ modify detect multiplier """
974 p = wait_for_bfd_packet(self)
975 self.vpp_session.modify_parameters(detect_mult=1)
976 p = wait_for_bfd_packet(
977 self, pcap_time_min=time.time() - self.vpp_clock_offset)
978 self.assert_equal(self.vpp_session.detect_mult,
981 # poll bit must not be set
982 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
983 "Poll bit not set in BFD packet")
984 self.vpp_session.modify_parameters(detect_mult=10)
985 p = wait_for_bfd_packet(
986 self, pcap_time_min=time.time() - self.vpp_clock_offset)
987 self.assert_equal(self.vpp_session.detect_mult,
990 # poll bit must not be set
991 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
992 "Poll bit not set in BFD packet")
994 @unittest.skipUnless(running_extended_tests, "part of extended tests")
995 def test_queued_poll(self):
996 """ test poll sequence queueing """
998 p = wait_for_bfd_packet(self)
999 self.vpp_session.modify_parameters(
1000 required_min_rx=2 * self.vpp_session.required_min_rx)
1001 p = wait_for_bfd_packet(self)
1002 poll_sequence_start = time.time()
1003 poll_sequence_length_min = 0.5
1004 send_final_after = time.time() + poll_sequence_length_min
1005 # poll bit needs to be set
1006 self.assertIn("P", p.sprintf("%BFD.flags%"),
1007 "Poll bit not set in BFD packet")
1008 self.assert_equal(p[BFD].required_min_rx_interval,
1009 self.vpp_session.required_min_rx,
1010 "BFD required min rx interval")
1011 self.vpp_session.modify_parameters(
1012 required_min_rx=2 * self.vpp_session.required_min_rx)
1013 # 2nd poll sequence should be queued now
1014 # don't send the reply back yet, wait for some time to emulate
1015 # longer round-trip time
1017 while time.time() < send_final_after:
1018 self.test_session.send_packet()
1019 p = wait_for_bfd_packet(self)
1020 self.assert_equal(len(self.vapi.collect_events()), 0,
1021 "number of bfd events")
1022 self.assert_equal(p[BFD].required_min_rx_interval,
1023 self.vpp_session.required_min_rx,
1024 "BFD required min rx interval")
1026 # poll bit must be set
1027 self.assertIn("P", p.sprintf("%BFD.flags%"),
1028 "Poll bit not set in BFD packet")
1029 final = self.test_session.create_packet()
1030 final[BFD].flags = "F"
1031 self.test_session.send_packet(final)
1032 # finish 1st with final
1033 poll_sequence_length = time.time() - poll_sequence_start
1034 # vpp must wait for some time before starting new poll sequence
1035 poll_no_2_started = False
1036 for dummy in range(2 * packet_count):
1037 p = wait_for_bfd_packet(self)
1038 self.assert_equal(len(self.vapi.collect_events()), 0,
1039 "number of bfd events")
1040 if "P" in p.sprintf("%BFD.flags%"):
1041 poll_no_2_started = True
1042 if time.time() < poll_sequence_start + poll_sequence_length:
1043 raise Exception("VPP started 2nd poll sequence too soon")
1044 final = self.test_session.create_packet()
1045 final[BFD].flags = "F"
1046 self.test_session.send_packet(final)
1049 self.test_session.send_packet()
1050 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1051 # finish 2nd with final
1052 final = self.test_session.create_packet()
1053 final[BFD].flags = "F"
1054 self.test_session.send_packet(final)
1055 p = wait_for_bfd_packet(self)
1056 # poll bit must not be set
1057 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1058 "Poll bit set in BFD packet")
1060 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1061 def test_poll_response(self):
1062 """ test correct response to control frame with poll bit set """
1063 bfd_session_up(self)
1064 poll = self.test_session.create_packet()
1065 poll[BFD].flags = "P"
1066 self.test_session.send_packet(poll)
1067 final = wait_for_bfd_packet(
1068 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1069 self.assertIn("F", final.sprintf("%BFD.flags%"))
1071 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1072 def test_no_periodic_if_remote_demand(self):
1073 """ no periodic frames outside poll sequence if remote demand set """
1074 bfd_session_up(self)
1075 demand = self.test_session.create_packet()
1076 demand[BFD].flags = "D"
1077 self.test_session.send_packet(demand)
1078 transmit_time = 0.9 \
1079 * max(self.vpp_session.required_min_rx,
1080 self.test_session.desired_min_tx) \
1083 for dummy in range(self.test_session.detect_mult * 2):
1084 self.sleep(transmit_time)
1085 self.test_session.send_packet(demand)
1087 p = wait_for_bfd_packet(self, timeout=0)
1088 self.logger.error(ppp("Received unexpected packet:", p))
1090 except CaptureTimeoutError:
1092 events = self.vapi.collect_events()
1094 self.logger.error("Received unexpected event: %s", e)
1095 self.assert_equal(count, 0, "number of packets received")
1096 self.assert_equal(len(events), 0, "number of events received")
1098 def test_echo_looped_back(self):
1099 """ echo packets looped back """
1100 # don't need a session in this case..
1101 self.vpp_session.remove_vpp_config()
1102 self.pg0.enable_capture()
1103 echo_packet_count = 10
1104 # random source port low enough to increment a few times..
1105 udp_sport_tx = randint(1, 50000)
1106 udp_sport_rx = udp_sport_tx
1107 echo_packet = (Ether(src=self.pg0.remote_mac,
1108 dst=self.pg0.local_mac) /
1109 IP(src=self.pg0.remote_ip4,
1110 dst=self.pg0.remote_ip4) /
1111 UDP(dport=BFD.udp_dport_echo) /
1112 Raw("this should be looped back"))
1113 for dummy in range(echo_packet_count):
1114 self.sleep(.01, "delay between echo packets")
1115 echo_packet[UDP].sport = udp_sport_tx
1117 self.logger.debug(ppp("Sending packet:", echo_packet))
1118 self.pg0.add_stream(echo_packet)
1120 for dummy in range(echo_packet_count):
1121 p = self.pg0.wait_for_packet(1)
1122 self.logger.debug(ppp("Got packet:", p))
1124 self.assert_equal(self.pg0.remote_mac,
1125 ether.dst, "Destination MAC")
1126 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1128 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1129 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1131 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1132 "UDP destination port")
1133 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1135 # need to compare the hex payload here, otherwise BFD_vpp_echo
1137 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1138 scapy.compat.raw(echo_packet[UDP].payload),
1139 "Received packet is not the echo packet sent")
1140 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1141 "ECHO packet identifier for test purposes)")
1143 def test_echo(self):
1144 """ echo function """
1145 bfd_session_up(self)
1146 self.test_session.update(required_min_echo_rx=150000)
1147 self.test_session.send_packet()
1148 detection_time = self.test_session.detect_mult *\
1149 self.vpp_session.required_min_rx / USEC_IN_SEC
1150 # echo shouldn't work without echo source set
1151 for dummy in range(10):
1152 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1153 self.sleep(sleep, "delay before sending bfd packet")
1154 self.test_session.send_packet()
1155 p = wait_for_bfd_packet(
1156 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1157 self.assert_equal(p[BFD].required_min_rx_interval,
1158 self.vpp_session.required_min_rx,
1159 "BFD required min rx interval")
1160 self.test_session.send_packet()
1161 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1163 # should be turned on - loopback echo packets
1164 for dummy in range(3):
1165 loop_until = time.time() + 0.75 * detection_time
1166 while time.time() < loop_until:
1167 p = self.pg0.wait_for_packet(1)
1168 self.logger.debug(ppp("Got packet:", p))
1169 if p[UDP].dport == BFD.udp_dport_echo:
1171 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1172 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1173 "BFD ECHO src IP equal to loopback IP")
1174 self.logger.debug(ppp("Looping back packet:", p))
1175 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1176 "ECHO packet destination MAC address")
1177 p[Ether].dst = self.pg0.local_mac
1178 self.pg0.add_stream(p)
1181 elif p.haslayer(BFD):
1183 self.assertGreaterEqual(
1184 p[BFD].required_min_rx_interval,
1186 if "P" in p.sprintf("%BFD.flags%"):
1187 final = self.test_session.create_packet()
1188 final[BFD].flags = "F"
1189 self.test_session.send_packet(final)
1191 raise Exception(ppp("Received unknown packet:", p))
1193 self.assert_equal(len(self.vapi.collect_events()), 0,
1194 "number of bfd events")
1195 self.test_session.send_packet()
1196 self.assertTrue(echo_seen, "No echo packets received")
1198 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1199 def test_echo_fail(self):
1200 """ session goes down if echo function fails """
1201 bfd_session_up(self)
1202 self.test_session.update(required_min_echo_rx=150000)
1203 self.test_session.send_packet()
1204 detection_time = self.test_session.detect_mult *\
1205 self.vpp_session.required_min_rx / USEC_IN_SEC
1206 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1207 # echo function should be used now, but we will drop the echo packets
1208 verified_diag = False
1209 for dummy in range(3):
1210 loop_until = time.time() + 0.75 * detection_time
1211 while time.time() < loop_until:
1212 p = self.pg0.wait_for_packet(1)
1213 self.logger.debug(ppp("Got packet:", p))
1214 if p[UDP].dport == BFD.udp_dport_echo:
1217 elif p.haslayer(BFD):
1218 if "P" in p.sprintf("%BFD.flags%"):
1219 self.assertGreaterEqual(
1220 p[BFD].required_min_rx_interval,
1222 final = self.test_session.create_packet()
1223 final[BFD].flags = "F"
1224 self.test_session.send_packet(final)
1225 if p[BFD].state == BFDState.down:
1226 self.assert_equal(p[BFD].diag,
1227 BFDDiagCode.echo_function_failed,
1229 verified_diag = True
1231 raise Exception(ppp("Received unknown packet:", p))
1232 self.test_session.send_packet()
1233 events = self.vapi.collect_events()
1234 self.assert_equal(len(events), 1, "number of bfd events")
1235 self.assert_equal(events[0].state, BFDState.down, BFDState)
1236 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1238 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1239 def test_echo_stop(self):
1240 """ echo function stops if peer sets required min echo rx zero """
1241 bfd_session_up(self)
1242 self.test_session.update(required_min_echo_rx=150000)
1243 self.test_session.send_packet()
1244 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1245 # wait for first echo packet
1247 p = self.pg0.wait_for_packet(1)
1248 self.logger.debug(ppp("Got packet:", p))
1249 if p[UDP].dport == BFD.udp_dport_echo:
1250 self.logger.debug(ppp("Looping back packet:", p))
1251 p[Ether].dst = self.pg0.local_mac
1252 self.pg0.add_stream(p)
1255 elif p.haslayer(BFD):
1259 raise Exception(ppp("Received unknown packet:", p))
1260 self.test_session.update(required_min_echo_rx=0)
1261 self.test_session.send_packet()
1262 # echo packets shouldn't arrive anymore
1263 for dummy in range(5):
1264 wait_for_bfd_packet(
1265 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1266 self.test_session.send_packet()
1267 events = self.vapi.collect_events()
1268 self.assert_equal(len(events), 0, "number of bfd events")
1270 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1271 def test_echo_source_removed(self):
1272 """ echo function stops if echo source is removed """
1273 bfd_session_up(self)
1274 self.test_session.update(required_min_echo_rx=150000)
1275 self.test_session.send_packet()
1276 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1277 # wait for first echo packet
1279 p = self.pg0.wait_for_packet(1)
1280 self.logger.debug(ppp("Got packet:", p))
1281 if p[UDP].dport == BFD.udp_dport_echo:
1282 self.logger.debug(ppp("Looping back packet:", p))
1283 p[Ether].dst = self.pg0.local_mac
1284 self.pg0.add_stream(p)
1287 elif p.haslayer(BFD):
1291 raise Exception(ppp("Received unknown packet:", p))
1292 self.vapi.bfd_udp_del_echo_source()
1293 self.test_session.send_packet()
1294 # echo packets shouldn't arrive anymore
1295 for dummy in range(5):
1296 wait_for_bfd_packet(
1297 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1298 self.test_session.send_packet()
1299 events = self.vapi.collect_events()
1300 self.assert_equal(len(events), 0, "number of bfd events")
1302 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1303 def test_stale_echo(self):
1304 """ stale echo packets don't keep a session up """
1305 bfd_session_up(self)
1306 self.test_session.update(required_min_echo_rx=150000)
1307 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1308 self.test_session.send_packet()
1309 # should be turned on - loopback echo packets
1313 for dummy in range(10 * self.vpp_session.detect_mult):
1314 p = self.pg0.wait_for_packet(1)
1315 if p[UDP].dport == BFD.udp_dport_echo:
1316 if echo_packet is None:
1317 self.logger.debug(ppp("Got first echo packet:", p))
1319 timeout_at = time.time() + self.vpp_session.detect_mult * \
1320 self.test_session.required_min_echo_rx / USEC_IN_SEC
1322 self.logger.debug(ppp("Got followup echo packet:", p))
1323 self.logger.debug(ppp("Looping back first echo packet:", p))
1324 echo_packet[Ether].dst = self.pg0.local_mac
1325 self.pg0.add_stream(echo_packet)
1327 elif p.haslayer(BFD):
1328 self.logger.debug(ppp("Got packet:", p))
1329 if "P" in p.sprintf("%BFD.flags%"):
1330 final = self.test_session.create_packet()
1331 final[BFD].flags = "F"
1332 self.test_session.send_packet(final)
1333 if p[BFD].state == BFDState.down:
1334 self.assertIsNotNone(
1336 "Session went down before first echo packet received")
1338 self.assertGreaterEqual(
1340 "Session timeout at %s, but is expected at %s" %
1342 self.assert_equal(p[BFD].diag,
1343 BFDDiagCode.echo_function_failed,
1345 events = self.vapi.collect_events()
1346 self.assert_equal(len(events), 1, "number of bfd events")
1347 self.assert_equal(events[0].state, BFDState.down, BFDState)
1351 raise Exception(ppp("Received unknown packet:", p))
1352 self.test_session.send_packet()
1353 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1355 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1356 def test_invalid_echo_checksum(self):
1357 """ echo packets with invalid checksum don't keep a session up """
1358 bfd_session_up(self)
1359 self.test_session.update(required_min_echo_rx=150000)
1360 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1361 self.test_session.send_packet()
1362 # should be turned on - loopback echo packets
1365 for dummy in range(10 * self.vpp_session.detect_mult):
1366 p = self.pg0.wait_for_packet(1)
1367 if p[UDP].dport == BFD.udp_dport_echo:
1368 self.logger.debug(ppp("Got echo packet:", p))
1369 if timeout_at is None:
1370 timeout_at = time.time() + self.vpp_session.detect_mult * \
1371 self.test_session.required_min_echo_rx / USEC_IN_SEC
1372 p[BFD_vpp_echo].checksum = getrandbits(64)
1373 p[Ether].dst = self.pg0.local_mac
1374 self.logger.debug(ppp("Looping back modified echo packet:", p))
1375 self.pg0.add_stream(p)
1377 elif p.haslayer(BFD):
1378 self.logger.debug(ppp("Got packet:", p))
1379 if "P" in p.sprintf("%BFD.flags%"):
1380 final = self.test_session.create_packet()
1381 final[BFD].flags = "F"
1382 self.test_session.send_packet(final)
1383 if p[BFD].state == BFDState.down:
1384 self.assertIsNotNone(
1386 "Session went down before first echo packet received")
1388 self.assertGreaterEqual(
1390 "Session timeout at %s, but is expected at %s" %
1392 self.assert_equal(p[BFD].diag,
1393 BFDDiagCode.echo_function_failed,
1395 events = self.vapi.collect_events()
1396 self.assert_equal(len(events), 1, "number of bfd events")
1397 self.assert_equal(events[0].state, BFDState.down, BFDState)
1401 raise Exception(ppp("Received unknown packet:", p))
1402 self.test_session.send_packet()
1403 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1405 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1406 def test_admin_up_down(self):
1407 """ put session admin-up and admin-down """
1408 bfd_session_up(self)
1409 self.vpp_session.admin_down()
1410 self.pg0.enable_capture()
1411 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1412 verify_event(self, e, expected_state=BFDState.admin_down)
1413 for dummy in range(2):
1414 p = wait_for_bfd_packet(self)
1415 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1416 # try to bring session up - shouldn't be possible
1417 self.test_session.update(state=BFDState.init)
1418 self.test_session.send_packet()
1419 for dummy in range(2):
1420 p = wait_for_bfd_packet(self)
1421 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1422 self.vpp_session.admin_up()
1423 self.test_session.update(state=BFDState.down)
1424 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1425 verify_event(self, e, expected_state=BFDState.down)
1426 p = wait_for_bfd_packet(
1427 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1428 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1429 self.test_session.send_packet()
1430 p = wait_for_bfd_packet(
1431 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1432 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1433 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1434 verify_event(self, e, expected_state=BFDState.init)
1435 self.test_session.update(state=BFDState.up)
1436 self.test_session.send_packet()
1437 p = wait_for_bfd_packet(
1438 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1439 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1440 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1441 verify_event(self, e, expected_state=BFDState.up)
1443 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1444 def test_config_change_remote_demand(self):
1445 """ configuration change while peer in demand mode """
1446 bfd_session_up(self)
1447 demand = self.test_session.create_packet()
1448 demand[BFD].flags = "D"
1449 self.test_session.send_packet(demand)
1450 self.vpp_session.modify_parameters(
1451 required_min_rx=2 * self.vpp_session.required_min_rx)
1452 p = wait_for_bfd_packet(
1453 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1454 # poll bit must be set
1455 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1456 # terminate poll sequence
1457 final = self.test_session.create_packet()
1458 final[BFD].flags = "D+F"
1459 self.test_session.send_packet(final)
1460 # vpp should be quiet now again
1461 transmit_time = 0.9 \
1462 * max(self.vpp_session.required_min_rx,
1463 self.test_session.desired_min_tx) \
1466 for dummy in range(self.test_session.detect_mult * 2):
1467 self.sleep(transmit_time)
1468 self.test_session.send_packet(demand)
1470 p = wait_for_bfd_packet(self, timeout=0)
1471 self.logger.error(ppp("Received unexpected packet:", p))
1473 except CaptureTimeoutError:
1475 events = self.vapi.collect_events()
1477 self.logger.error("Received unexpected event: %s", e)
1478 self.assert_equal(count, 0, "number of packets received")
1479 self.assert_equal(len(events), 0, "number of events received")
1481 def test_intf_deleted(self):
1482 """ interface with bfd session deleted """
1483 intf = VppLoInterface(self)
1486 sw_if_index = intf.sw_if_index
1487 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1488 vpp_session.add_vpp_config()
1489 vpp_session.admin_up()
1490 intf.remove_vpp_config()
1491 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1492 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1493 self.assertFalse(vpp_session.query_vpp_config())
1496 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1497 class BFD6TestCase(VppTestCase):
1498 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1501 vpp_clock_offset = None
1506 def setUpClass(cls):
1507 super(BFD6TestCase, cls).setUpClass()
1508 cls.vapi.cli("set log class bfd level debug")
1510 cls.create_pg_interfaces([0])
1511 cls.pg0.config_ip6()
1512 cls.pg0.configure_ipv6_neighbors()
1514 cls.pg0.resolve_ndp()
1515 cls.create_loopback_interfaces(1)
1516 cls.loopback0 = cls.lo_interfaces[0]
1517 cls.loopback0.config_ip6()
1518 cls.loopback0.admin_up()
1521 super(BFD6TestCase, cls).tearDownClass()
1525 def tearDownClass(cls):
1526 super(BFD6TestCase, cls).tearDownClass()
1529 super(BFD6TestCase, self).setUp()
1530 self.factory = AuthKeyFactory()
1531 self.vapi.want_bfd_events()
1532 self.pg0.enable_capture()
1534 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1535 self.pg0.remote_ip6,
1537 self.vpp_session.add_vpp_config()
1538 self.vpp_session.admin_up()
1539 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1540 self.logger.debug(self.vapi.cli("show adj nbr"))
1542 self.vapi.want_bfd_events(enable_disable=0)
1546 if not self.vpp_dead:
1547 self.vapi.want_bfd_events(enable_disable=0)
1548 self.vapi.collect_events() # clear the event queue
1549 super(BFD6TestCase, self).tearDown()
1551 def test_session_up(self):
1552 """ bring BFD session up """
1553 bfd_session_up(self)
1555 def test_session_up_by_ip(self):
1556 """ bring BFD session up - first frame looked up by address pair """
1557 self.logger.info("BFD: Sending Slow control frame")
1558 self.test_session.update(my_discriminator=randint(0, 40000000))
1559 self.test_session.send_packet()
1560 self.pg0.enable_capture()
1561 p = self.pg0.wait_for_packet(1)
1562 self.assert_equal(p[BFD].your_discriminator,
1563 self.test_session.my_discriminator,
1564 "BFD - your discriminator")
1565 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1566 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1568 self.logger.info("BFD: Waiting for event")
1569 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1570 verify_event(self, e, expected_state=BFDState.init)
1571 self.logger.info("BFD: Sending Up")
1572 self.test_session.send_packet()
1573 self.logger.info("BFD: Waiting for event")
1574 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1575 verify_event(self, e, expected_state=BFDState.up)
1576 self.logger.info("BFD: Session is Up")
1577 self.test_session.update(state=BFDState.up)
1578 self.test_session.send_packet()
1579 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1581 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1582 def test_hold_up(self):
1583 """ hold BFD session up """
1584 bfd_session_up(self)
1585 for dummy in range(self.test_session.detect_mult * 2):
1586 wait_for_bfd_packet(self)
1587 self.test_session.send_packet()
1588 self.assert_equal(len(self.vapi.collect_events()), 0,
1589 "number of bfd events")
1590 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1592 def test_echo_looped_back(self):
1593 """ echo packets looped back """
1594 # don't need a session in this case..
1595 self.vpp_session.remove_vpp_config()
1596 self.pg0.enable_capture()
1597 echo_packet_count = 10
1598 # random source port low enough to increment a few times..
1599 udp_sport_tx = randint(1, 50000)
1600 udp_sport_rx = udp_sport_tx
1601 echo_packet = (Ether(src=self.pg0.remote_mac,
1602 dst=self.pg0.local_mac) /
1603 IPv6(src=self.pg0.remote_ip6,
1604 dst=self.pg0.remote_ip6) /
1605 UDP(dport=BFD.udp_dport_echo) /
1606 Raw("this should be looped back"))
1607 for dummy in range(echo_packet_count):
1608 self.sleep(.01, "delay between echo packets")
1609 echo_packet[UDP].sport = udp_sport_tx
1611 self.logger.debug(ppp("Sending packet:", echo_packet))
1612 self.pg0.add_stream(echo_packet)
1614 for dummy in range(echo_packet_count):
1615 p = self.pg0.wait_for_packet(1)
1616 self.logger.debug(ppp("Got packet:", p))
1618 self.assert_equal(self.pg0.remote_mac,
1619 ether.dst, "Destination MAC")
1620 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1622 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1623 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1625 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1626 "UDP destination port")
1627 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1629 # need to compare the hex payload here, otherwise BFD_vpp_echo
1631 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1632 scapy.compat.raw(echo_packet[UDP].payload),
1633 "Received packet is not the echo packet sent")
1634 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1635 "ECHO packet identifier for test purposes)")
1636 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1637 "ECHO packet identifier for test purposes)")
1639 def test_echo(self):
1640 """ echo function """
1641 bfd_session_up(self)
1642 self.test_session.update(required_min_echo_rx=150000)
1643 self.test_session.send_packet()
1644 detection_time = self.test_session.detect_mult *\
1645 self.vpp_session.required_min_rx / USEC_IN_SEC
1646 # echo shouldn't work without echo source set
1647 for dummy in range(10):
1648 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1649 self.sleep(sleep, "delay before sending bfd packet")
1650 self.test_session.send_packet()
1651 p = wait_for_bfd_packet(
1652 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1653 self.assert_equal(p[BFD].required_min_rx_interval,
1654 self.vpp_session.required_min_rx,
1655 "BFD required min rx interval")
1656 self.test_session.send_packet()
1657 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1659 # should be turned on - loopback echo packets
1660 for dummy in range(3):
1661 loop_until = time.time() + 0.75 * detection_time
1662 while time.time() < loop_until:
1663 p = self.pg0.wait_for_packet(1)
1664 self.logger.debug(ppp("Got packet:", p))
1665 if p[UDP].dport == BFD.udp_dport_echo:
1667 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1668 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1669 "BFD ECHO src IP equal to loopback IP")
1670 self.logger.debug(ppp("Looping back packet:", p))
1671 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1672 "ECHO packet destination MAC address")
1673 p[Ether].dst = self.pg0.local_mac
1674 self.pg0.add_stream(p)
1677 elif p.haslayer(BFD):
1679 self.assertGreaterEqual(
1680 p[BFD].required_min_rx_interval,
1682 if "P" in p.sprintf("%BFD.flags%"):
1683 final = self.test_session.create_packet()
1684 final[BFD].flags = "F"
1685 self.test_session.send_packet(final)
1687 raise Exception(ppp("Received unknown packet:", p))
1689 self.assert_equal(len(self.vapi.collect_events()), 0,
1690 "number of bfd events")
1691 self.test_session.send_packet()
1692 self.assertTrue(echo_seen, "No echo packets received")
1694 def test_intf_deleted(self):
1695 """ interface with bfd session deleted """
1696 intf = VppLoInterface(self)
1699 sw_if_index = intf.sw_if_index
1700 vpp_session = VppBFDUDPSession(
1701 self, intf, intf.remote_ip6, af=AF_INET6)
1702 vpp_session.add_vpp_config()
1703 vpp_session.admin_up()
1704 intf.remove_vpp_config()
1705 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1706 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1707 self.assertFalse(vpp_session.query_vpp_config())
1710 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1711 class BFDFIBTestCase(VppTestCase):
1712 """ BFD-FIB interactions (IPv6) """
1718 def setUpClass(cls):
1719 super(BFDFIBTestCase, cls).setUpClass()
1722 def tearDownClass(cls):
1723 super(BFDFIBTestCase, cls).tearDownClass()
1726 super(BFDFIBTestCase, self).setUp()
1727 self.create_pg_interfaces(range(1))
1729 self.vapi.want_bfd_events()
1730 self.pg0.enable_capture()
1732 for i in self.pg_interfaces:
1735 i.configure_ipv6_neighbors()
1738 if not self.vpp_dead:
1739 self.vapi.want_bfd_events(enable_disable=0)
1741 super(BFDFIBTestCase, self).tearDown()
1744 def pkt_is_not_data_traffic(p):
1745 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1746 if p.haslayer(BFD) or is_ipv6_misc(p):
1750 def test_session_with_fib(self):
1751 """ BFD-FIB interactions """
1753 # packets to match against both of the routes
1754 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1755 IPv6(src="3001::1", dst="2001::1") /
1756 UDP(sport=1234, dport=1234) /
1758 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1759 IPv6(src="3001::1", dst="2002::1") /
1760 UDP(sport=1234, dport=1234) /
1763 # A recursive and a non-recursive route via a next-hop that
1764 # will have a BFD session
1765 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1766 [VppRoutePath(self.pg0.remote_ip6,
1767 self.pg0.sw_if_index)])
1768 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1769 [VppRoutePath(self.pg0.remote_ip6,
1771 ip_2001_s_64.add_vpp_config()
1772 ip_2002_s_64.add_vpp_config()
1774 # bring the session up now the routes are present
1775 self.vpp_session = VppBFDUDPSession(self,
1777 self.pg0.remote_ip6,
1779 self.vpp_session.add_vpp_config()
1780 self.vpp_session.admin_up()
1781 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1783 # session is up - traffic passes
1784 bfd_session_up(self)
1786 self.pg0.add_stream(p)
1789 captured = self.pg0.wait_for_packet(
1791 filter_out_fn=self.pkt_is_not_data_traffic)
1792 self.assertEqual(captured[IPv6].dst,
1795 # session is up - traffic is dropped
1796 bfd_session_down(self)
1798 self.pg0.add_stream(p)
1800 with self.assertRaises(CaptureTimeoutError):
1801 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1803 # session is up - traffic passes
1804 bfd_session_up(self)
1806 self.pg0.add_stream(p)
1809 captured = self.pg0.wait_for_packet(
1811 filter_out_fn=self.pkt_is_not_data_traffic)
1812 self.assertEqual(captured[IPv6].dst,
1816 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1817 class BFDTunTestCase(VppTestCase):
1818 """ BFD over GRE tunnel """
1824 def setUpClass(cls):
1825 super(BFDTunTestCase, cls).setUpClass()
1828 def tearDownClass(cls):
1829 super(BFDTunTestCase, cls).tearDownClass()
1832 super(BFDTunTestCase, self).setUp()
1833 self.create_pg_interfaces(range(1))
1835 self.vapi.want_bfd_events()
1836 self.pg0.enable_capture()
1838 for i in self.pg_interfaces:
1844 if not self.vpp_dead:
1845 self.vapi.want_bfd_events(enable_disable=0)
1847 super(BFDTunTestCase, self).tearDown()
1850 def pkt_is_not_data_traffic(p):
1851 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1852 if p.haslayer(BFD) or is_ipv6_misc(p):
1856 def test_bfd_o_gre(self):
1859 # A GRE interface over which to run a BFD session
1860 gre_if = VppGreInterface(self,
1862 self.pg0.remote_ip4)
1863 gre_if.add_vpp_config()
1867 # bring the session up now the routes are present
1868 self.vpp_session = VppBFDUDPSession(self,
1872 self.vpp_session.add_vpp_config()
1873 self.vpp_session.admin_up()
1875 self.test_session = BFDTestSession(
1876 self, gre_if, AF_INET,
1877 tunnel_header=(IP(src=self.pg0.remote_ip4,
1878 dst=self.pg0.local_ip4) /
1880 phy_interface=self.pg0)
1882 # packets to match against both of the routes
1883 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1884 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1885 UDP(sport=1234, dport=1234) /
1888 # session is up - traffic passes
1889 bfd_session_up(self)
1891 self.send_and_expect(self.pg0, p, self.pg0)
1893 # bring session down
1894 bfd_session_down(self)
1897 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1898 class BFDSHA1TestCase(VppTestCase):
1899 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1902 vpp_clock_offset = None
1907 def setUpClass(cls):
1908 super(BFDSHA1TestCase, cls).setUpClass()
1909 cls.vapi.cli("set log class bfd level debug")
1911 cls.create_pg_interfaces([0])
1912 cls.pg0.config_ip4()
1914 cls.pg0.resolve_arp()
1917 super(BFDSHA1TestCase, cls).tearDownClass()
1921 def tearDownClass(cls):
1922 super(BFDSHA1TestCase, cls).tearDownClass()
1925 super(BFDSHA1TestCase, self).setUp()
1926 self.factory = AuthKeyFactory()
1927 self.vapi.want_bfd_events()
1928 self.pg0.enable_capture()
1931 if not self.vpp_dead:
1932 self.vapi.want_bfd_events(enable_disable=0)
1933 self.vapi.collect_events() # clear the event queue
1934 super(BFDSHA1TestCase, self).tearDown()
1936 def test_session_up(self):
1937 """ bring BFD session up """
1938 key = self.factory.create_random_key(self)
1939 key.add_vpp_config()
1940 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1941 self.pg0.remote_ip4,
1943 self.vpp_session.add_vpp_config()
1944 self.vpp_session.admin_up()
1945 self.test_session = BFDTestSession(
1946 self, self.pg0, AF_INET, sha1_key=key,
1947 bfd_key_id=self.vpp_session.bfd_key_id)
1948 bfd_session_up(self)
1950 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1951 def test_hold_up(self):
1952 """ hold BFD session up """
1953 key = self.factory.create_random_key(self)
1954 key.add_vpp_config()
1955 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1956 self.pg0.remote_ip4,
1958 self.vpp_session.add_vpp_config()
1959 self.vpp_session.admin_up()
1960 self.test_session = BFDTestSession(
1961 self, self.pg0, AF_INET, sha1_key=key,
1962 bfd_key_id=self.vpp_session.bfd_key_id)
1963 bfd_session_up(self)
1964 for dummy in range(self.test_session.detect_mult * 2):
1965 wait_for_bfd_packet(self)
1966 self.test_session.send_packet()
1967 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1969 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1970 def test_hold_up_meticulous(self):
1971 """ hold BFD session up - meticulous auth """
1972 key = self.factory.create_random_key(
1973 self, BFDAuthType.meticulous_keyed_sha1)
1974 key.add_vpp_config()
1975 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1976 self.pg0.remote_ip4, sha1_key=key)
1977 self.vpp_session.add_vpp_config()
1978 self.vpp_session.admin_up()
1979 # specify sequence number so that it wraps
1980 self.test_session = BFDTestSession(
1981 self, self.pg0, AF_INET, sha1_key=key,
1982 bfd_key_id=self.vpp_session.bfd_key_id,
1983 our_seq_number=0xFFFFFFFF - 4)
1984 bfd_session_up(self)
1985 for dummy in range(30):
1986 wait_for_bfd_packet(self)
1987 self.test_session.inc_seq_num()
1988 self.test_session.send_packet()
1989 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1991 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1992 def test_send_bad_seq_number(self):
1993 """ session is not kept alive by msgs with bad sequence numbers"""
1994 key = self.factory.create_random_key(
1995 self, BFDAuthType.meticulous_keyed_sha1)
1996 key.add_vpp_config()
1997 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1998 self.pg0.remote_ip4, sha1_key=key)
1999 self.vpp_session.add_vpp_config()
2000 self.test_session = BFDTestSession(
2001 self, self.pg0, AF_INET, sha1_key=key,
2002 bfd_key_id=self.vpp_session.bfd_key_id)
2003 bfd_session_up(self)
2004 detection_time = self.test_session.detect_mult *\
2005 self.vpp_session.required_min_rx / USEC_IN_SEC
2006 send_until = time.time() + 2 * detection_time
2007 while time.time() < send_until:
2008 self.test_session.send_packet()
2009 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2010 "time between bfd packets")
2011 e = self.vapi.collect_events()
2012 # session should be down now, because the sequence numbers weren't
2014 self.assert_equal(len(e), 1, "number of bfd events")
2015 verify_event(self, e[0], expected_state=BFDState.down)
2017 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2018 legitimate_test_session,
2020 rogue_bfd_values=None):
2021 """ execute a rogue session interaction scenario
2023 1. create vpp session, add config
2024 2. bring the legitimate session up
2025 3. copy the bfd values from legitimate session to rogue session
2026 4. apply rogue_bfd_values to rogue session
2027 5. set rogue session state to down
2028 6. send message to take the session down from the rogue session
2029 7. assert that the legitimate session is unaffected
2032 self.vpp_session = vpp_bfd_udp_session
2033 self.vpp_session.add_vpp_config()
2034 self.test_session = legitimate_test_session
2035 # bring vpp session up
2036 bfd_session_up(self)
2037 # send packet from rogue session
2038 rogue_test_session.update(
2039 my_discriminator=self.test_session.my_discriminator,
2040 your_discriminator=self.test_session.your_discriminator,
2041 desired_min_tx=self.test_session.desired_min_tx,
2042 required_min_rx=self.test_session.required_min_rx,
2043 detect_mult=self.test_session.detect_mult,
2044 diag=self.test_session.diag,
2045 state=self.test_session.state,
2046 auth_type=self.test_session.auth_type)
2047 if rogue_bfd_values:
2048 rogue_test_session.update(**rogue_bfd_values)
2049 rogue_test_session.update(state=BFDState.down)
2050 rogue_test_session.send_packet()
2051 wait_for_bfd_packet(self)
2052 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2054 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2055 def test_mismatch_auth(self):
2056 """ session is not brought down by unauthenticated msg """
2057 key = self.factory.create_random_key(self)
2058 key.add_vpp_config()
2059 vpp_session = VppBFDUDPSession(
2060 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2061 legitimate_test_session = BFDTestSession(
2062 self, self.pg0, AF_INET, sha1_key=key,
2063 bfd_key_id=vpp_session.bfd_key_id)
2064 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2065 self.execute_rogue_session_scenario(vpp_session,
2066 legitimate_test_session,
2069 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2070 def test_mismatch_bfd_key_id(self):
2071 """ session is not brought down by msg with non-existent key-id """
2072 key = self.factory.create_random_key(self)
2073 key.add_vpp_config()
2074 vpp_session = VppBFDUDPSession(
2075 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2076 # pick a different random bfd key id
2078 while x == vpp_session.bfd_key_id:
2080 legitimate_test_session = BFDTestSession(
2081 self, self.pg0, AF_INET, sha1_key=key,
2082 bfd_key_id=vpp_session.bfd_key_id)
2083 rogue_test_session = BFDTestSession(
2084 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2085 self.execute_rogue_session_scenario(vpp_session,
2086 legitimate_test_session,
2089 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2090 def test_mismatched_auth_type(self):
2091 """ session is not brought down by msg with wrong auth type """
2092 key = self.factory.create_random_key(self)
2093 key.add_vpp_config()
2094 vpp_session = VppBFDUDPSession(
2095 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2096 legitimate_test_session = BFDTestSession(
2097 self, self.pg0, AF_INET, sha1_key=key,
2098 bfd_key_id=vpp_session.bfd_key_id)
2099 rogue_test_session = BFDTestSession(
2100 self, self.pg0, AF_INET, sha1_key=key,
2101 bfd_key_id=vpp_session.bfd_key_id)
2102 self.execute_rogue_session_scenario(
2103 vpp_session, legitimate_test_session, rogue_test_session,
2104 {'auth_type': BFDAuthType.keyed_md5})
2106 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2107 def test_restart(self):
2108 """ simulate remote peer restart and resynchronization """
2109 key = self.factory.create_random_key(
2110 self, BFDAuthType.meticulous_keyed_sha1)
2111 key.add_vpp_config()
2112 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2113 self.pg0.remote_ip4, sha1_key=key)
2114 self.vpp_session.add_vpp_config()
2115 self.test_session = BFDTestSession(
2116 self, self.pg0, AF_INET, sha1_key=key,
2117 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2118 bfd_session_up(self)
2119 # don't send any packets for 2*detection_time
2120 detection_time = self.test_session.detect_mult *\
2121 self.vpp_session.required_min_rx / USEC_IN_SEC
2122 self.sleep(2 * detection_time, "simulating peer restart")
2123 events = self.vapi.collect_events()
2124 self.assert_equal(len(events), 1, "number of bfd events")
2125 verify_event(self, events[0], expected_state=BFDState.down)
2126 self.test_session.update(state=BFDState.down)
2127 # reset sequence number
2128 self.test_session.our_seq_number = 0
2129 self.test_session.vpp_seq_number = None
2130 # now throw away any pending packets
2131 self.pg0.enable_capture()
2132 bfd_session_up(self)
2135 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2136 class BFDAuthOnOffTestCase(VppTestCase):
2137 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2144 def setUpClass(cls):
2145 super(BFDAuthOnOffTestCase, cls).setUpClass()
2146 cls.vapi.cli("set log class bfd level debug")
2148 cls.create_pg_interfaces([0])
2149 cls.pg0.config_ip4()
2151 cls.pg0.resolve_arp()
2154 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2158 def tearDownClass(cls):
2159 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2162 super(BFDAuthOnOffTestCase, self).setUp()
2163 self.factory = AuthKeyFactory()
2164 self.vapi.want_bfd_events()
2165 self.pg0.enable_capture()
2168 if not self.vpp_dead:
2169 self.vapi.want_bfd_events(enable_disable=0)
2170 self.vapi.collect_events() # clear the event queue
2171 super(BFDAuthOnOffTestCase, self).tearDown()
2173 def test_auth_on_immediate(self):
2174 """ turn auth on without disturbing session state (immediate) """
2175 key = self.factory.create_random_key(self)
2176 key.add_vpp_config()
2177 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2178 self.pg0.remote_ip4)
2179 self.vpp_session.add_vpp_config()
2180 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2181 bfd_session_up(self)
2182 for dummy in range(self.test_session.detect_mult * 2):
2183 p = wait_for_bfd_packet(self)
2184 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2185 self.test_session.send_packet()
2186 self.vpp_session.activate_auth(key)
2187 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2188 self.test_session.sha1_key = key
2189 for dummy in range(self.test_session.detect_mult * 2):
2190 p = wait_for_bfd_packet(self)
2191 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2192 self.test_session.send_packet()
2193 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2194 self.assert_equal(len(self.vapi.collect_events()), 0,
2195 "number of bfd events")
2197 def test_auth_off_immediate(self):
2198 """ turn auth off without disturbing session state (immediate) """
2199 key = self.factory.create_random_key(self)
2200 key.add_vpp_config()
2201 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2202 self.pg0.remote_ip4, sha1_key=key)
2203 self.vpp_session.add_vpp_config()
2204 self.test_session = BFDTestSession(
2205 self, self.pg0, AF_INET, sha1_key=key,
2206 bfd_key_id=self.vpp_session.bfd_key_id)
2207 bfd_session_up(self)
2208 # self.vapi.want_bfd_events(enable_disable=0)
2209 for dummy in range(self.test_session.detect_mult * 2):
2210 p = wait_for_bfd_packet(self)
2211 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2212 self.test_session.inc_seq_num()
2213 self.test_session.send_packet()
2214 self.vpp_session.deactivate_auth()
2215 self.test_session.bfd_key_id = None
2216 self.test_session.sha1_key = None
2217 for dummy in range(self.test_session.detect_mult * 2):
2218 p = wait_for_bfd_packet(self)
2219 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2220 self.test_session.inc_seq_num()
2221 self.test_session.send_packet()
2222 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2223 self.assert_equal(len(self.vapi.collect_events()), 0,
2224 "number of bfd events")
2226 def test_auth_change_key_immediate(self):
2227 """ change auth key without disturbing session state (immediate) """
2228 key1 = self.factory.create_random_key(self)
2229 key1.add_vpp_config()
2230 key2 = self.factory.create_random_key(self)
2231 key2.add_vpp_config()
2232 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2233 self.pg0.remote_ip4, sha1_key=key1)
2234 self.vpp_session.add_vpp_config()
2235 self.test_session = BFDTestSession(
2236 self, self.pg0, AF_INET, sha1_key=key1,
2237 bfd_key_id=self.vpp_session.bfd_key_id)
2238 bfd_session_up(self)
2239 for dummy in range(self.test_session.detect_mult * 2):
2240 p = wait_for_bfd_packet(self)
2241 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2242 self.test_session.send_packet()
2243 self.vpp_session.activate_auth(key2)
2244 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2245 self.test_session.sha1_key = key2
2246 for dummy in range(self.test_session.detect_mult * 2):
2247 p = wait_for_bfd_packet(self)
2248 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2249 self.test_session.send_packet()
2250 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2251 self.assert_equal(len(self.vapi.collect_events()), 0,
2252 "number of bfd events")
2254 def test_auth_on_delayed(self):
2255 """ turn auth on without disturbing session state (delayed) """
2256 key = self.factory.create_random_key(self)
2257 key.add_vpp_config()
2258 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2259 self.pg0.remote_ip4)
2260 self.vpp_session.add_vpp_config()
2261 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2262 bfd_session_up(self)
2263 for dummy in range(self.test_session.detect_mult * 2):
2264 wait_for_bfd_packet(self)
2265 self.test_session.send_packet()
2266 self.vpp_session.activate_auth(key, delayed=True)
2267 for dummy in range(self.test_session.detect_mult * 2):
2268 p = wait_for_bfd_packet(self)
2269 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2270 self.test_session.send_packet()
2271 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2272 self.test_session.sha1_key = key
2273 self.test_session.send_packet()
2274 for dummy in range(self.test_session.detect_mult * 2):
2275 p = wait_for_bfd_packet(self)
2276 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2277 self.test_session.send_packet()
2278 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2279 self.assert_equal(len(self.vapi.collect_events()), 0,
2280 "number of bfd events")
2282 def test_auth_off_delayed(self):
2283 """ turn auth off without disturbing session state (delayed) """
2284 key = self.factory.create_random_key(self)
2285 key.add_vpp_config()
2286 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2287 self.pg0.remote_ip4, sha1_key=key)
2288 self.vpp_session.add_vpp_config()
2289 self.test_session = BFDTestSession(
2290 self, self.pg0, AF_INET, sha1_key=key,
2291 bfd_key_id=self.vpp_session.bfd_key_id)
2292 bfd_session_up(self)
2293 for dummy in range(self.test_session.detect_mult * 2):
2294 p = wait_for_bfd_packet(self)
2295 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2296 self.test_session.send_packet()
2297 self.vpp_session.deactivate_auth(delayed=True)
2298 for dummy in range(self.test_session.detect_mult * 2):
2299 p = wait_for_bfd_packet(self)
2300 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2301 self.test_session.send_packet()
2302 self.test_session.bfd_key_id = None
2303 self.test_session.sha1_key = None
2304 self.test_session.send_packet()
2305 for dummy in range(self.test_session.detect_mult * 2):
2306 p = wait_for_bfd_packet(self)
2307 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2308 self.test_session.send_packet()
2309 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2310 self.assert_equal(len(self.vapi.collect_events()), 0,
2311 "number of bfd events")
2313 def test_auth_change_key_delayed(self):
2314 """ change auth key without disturbing session state (delayed) """
2315 key1 = self.factory.create_random_key(self)
2316 key1.add_vpp_config()
2317 key2 = self.factory.create_random_key(self)
2318 key2.add_vpp_config()
2319 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2320 self.pg0.remote_ip4, sha1_key=key1)
2321 self.vpp_session.add_vpp_config()
2322 self.vpp_session.admin_up()
2323 self.test_session = BFDTestSession(
2324 self, self.pg0, AF_INET, sha1_key=key1,
2325 bfd_key_id=self.vpp_session.bfd_key_id)
2326 bfd_session_up(self)
2327 for dummy in range(self.test_session.detect_mult * 2):
2328 p = wait_for_bfd_packet(self)
2329 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2330 self.test_session.send_packet()
2331 self.vpp_session.activate_auth(key2, delayed=True)
2332 for dummy in range(self.test_session.detect_mult * 2):
2333 p = wait_for_bfd_packet(self)
2334 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2335 self.test_session.send_packet()
2336 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2337 self.test_session.sha1_key = key2
2338 self.test_session.send_packet()
2339 for dummy in range(self.test_session.detect_mult * 2):
2340 p = wait_for_bfd_packet(self)
2341 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2342 self.test_session.send_packet()
2343 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2344 self.assert_equal(len(self.vapi.collect_events()), 0,
2345 "number of bfd events")
2348 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2349 class BFDCLITestCase(VppTestCase):
2350 """Bidirectional Forwarding Detection (BFD) (CLI) """
2354 def setUpClass(cls):
2355 super(BFDCLITestCase, cls).setUpClass()
2356 cls.vapi.cli("set log class bfd level debug")
2358 cls.create_pg_interfaces((0,))
2359 cls.pg0.config_ip4()
2360 cls.pg0.config_ip6()
2361 cls.pg0.resolve_arp()
2362 cls.pg0.resolve_ndp()
2365 super(BFDCLITestCase, cls).tearDownClass()
2369 def tearDownClass(cls):
2370 super(BFDCLITestCase, cls).tearDownClass()
2373 super(BFDCLITestCase, self).setUp()
2374 self.factory = AuthKeyFactory()
2375 self.pg0.enable_capture()
2379 self.vapi.want_bfd_events(enable_disable=0)
2380 except UnexpectedApiReturnValueError:
2381 # some tests aren't subscribed, so this is not an issue
2383 self.vapi.collect_events() # clear the event queue
2384 super(BFDCLITestCase, self).tearDown()
2386 def cli_verify_no_response(self, cli):
2387 """ execute a CLI, asserting that the response is empty """
2388 self.assert_equal(self.vapi.cli(cli),
2390 "CLI command response")
2392 def cli_verify_response(self, cli, expected):
2393 """ execute a CLI, asserting that the response matches expectation """
2394 self.assert_equal(self.vapi.cli(cli).strip(),
2396 "CLI command response")
2398 def test_show(self):
2399 """ show commands """
2400 k1 = self.factory.create_random_key(self)
2402 k2 = self.factory.create_random_key(
2403 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2405 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2407 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2410 self.logger.info(self.vapi.ppcli("show bfd keys"))
2411 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2412 self.logger.info(self.vapi.ppcli("show bfd"))
2414 def test_set_del_sha1_key(self):
2415 """ set/delete SHA1 auth key """
2416 k = self.factory.create_random_key(self)
2417 self.registry.register(k, self.logger)
2418 self.cli_verify_no_response(
2419 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2421 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2422 self.assertTrue(k.query_vpp_config())
2423 self.vpp_session = VppBFDUDPSession(
2424 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2425 self.vpp_session.add_vpp_config()
2426 self.test_session = \
2427 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2428 bfd_key_id=self.vpp_session.bfd_key_id)
2429 self.vapi.want_bfd_events()
2430 bfd_session_up(self)
2431 bfd_session_down(self)
2432 # try to replace the secret for the key - should fail because the key
2434 k2 = self.factory.create_random_key(self)
2435 self.cli_verify_response(
2436 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2438 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2439 "bfd key set: `bfd_auth_set_key' API call failed, "
2440 "rv=-103:BFD object in use")
2441 # manipulating the session using old secret should still work
2442 bfd_session_up(self)
2443 bfd_session_down(self)
2444 self.vpp_session.remove_vpp_config()
2445 self.cli_verify_no_response(
2446 "bfd key del conf-key-id %s" % k.conf_key_id)
2447 self.assertFalse(k.query_vpp_config())
2449 def test_set_del_meticulous_sha1_key(self):
2450 """ set/delete meticulous SHA1 auth key """
2451 k = self.factory.create_random_key(
2452 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2453 self.registry.register(k, self.logger)
2454 self.cli_verify_no_response(
2455 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2457 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2458 self.assertTrue(k.query_vpp_config())
2459 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2460 self.pg0.remote_ip6, af=AF_INET6,
2462 self.vpp_session.add_vpp_config()
2463 self.vpp_session.admin_up()
2464 self.test_session = \
2465 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2466 bfd_key_id=self.vpp_session.bfd_key_id)
2467 self.vapi.want_bfd_events()
2468 bfd_session_up(self)
2469 bfd_session_down(self)
2470 # try to replace the secret for the key - should fail because the key
2472 k2 = self.factory.create_random_key(self)
2473 self.cli_verify_response(
2474 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2476 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2477 "bfd key set: `bfd_auth_set_key' API call failed, "
2478 "rv=-103:BFD object in use")
2479 # manipulating the session using old secret should still work
2480 bfd_session_up(self)
2481 bfd_session_down(self)
2482 self.vpp_session.remove_vpp_config()
2483 self.cli_verify_no_response(
2484 "bfd key del conf-key-id %s" % k.conf_key_id)
2485 self.assertFalse(k.query_vpp_config())
2487 def test_add_mod_del_bfd_udp(self):
2488 """ create/modify/delete IPv4 BFD UDP session """
2489 vpp_session = VppBFDUDPSession(
2490 self, self.pg0, self.pg0.remote_ip4)
2491 self.registry.register(vpp_session, self.logger)
2492 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2493 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2494 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2495 self.pg0.remote_ip4,
2496 vpp_session.desired_min_tx,
2497 vpp_session.required_min_rx,
2498 vpp_session.detect_mult)
2499 self.cli_verify_no_response(cli_add_cmd)
2500 # 2nd add should fail
2501 self.cli_verify_response(
2503 "bfd udp session add: `bfd_add_add_session' API call"
2504 " failed, rv=-101:Duplicate BFD object")
2505 verify_bfd_session_config(self, vpp_session)
2506 mod_session = VppBFDUDPSession(
2507 self, self.pg0, self.pg0.remote_ip4,
2508 required_min_rx=2 * vpp_session.required_min_rx,
2509 desired_min_tx=3 * vpp_session.desired_min_tx,
2510 detect_mult=4 * vpp_session.detect_mult)
2511 self.cli_verify_no_response(
2512 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2513 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2514 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2515 mod_session.desired_min_tx, mod_session.required_min_rx,
2516 mod_session.detect_mult))
2517 verify_bfd_session_config(self, mod_session)
2518 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2519 "peer-addr %s" % (self.pg0.name,
2520 self.pg0.local_ip4, self.pg0.remote_ip4)
2521 self.cli_verify_no_response(cli_del_cmd)
2522 # 2nd del is expected to fail
2523 self.cli_verify_response(
2524 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2525 " failed, rv=-102:No such BFD object")
2526 self.assertFalse(vpp_session.query_vpp_config())
2528 def test_add_mod_del_bfd_udp6(self):
2529 """ create/modify/delete IPv6 BFD UDP session """
2530 vpp_session = VppBFDUDPSession(
2531 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2532 self.registry.register(vpp_session, self.logger)
2533 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2534 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2535 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2536 self.pg0.remote_ip6,
2537 vpp_session.desired_min_tx,
2538 vpp_session.required_min_rx,
2539 vpp_session.detect_mult)
2540 self.cli_verify_no_response(cli_add_cmd)
2541 # 2nd add should fail
2542 self.cli_verify_response(
2544 "bfd udp session add: `bfd_add_add_session' API call"
2545 " failed, rv=-101:Duplicate BFD object")
2546 verify_bfd_session_config(self, vpp_session)
2547 mod_session = VppBFDUDPSession(
2548 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2549 required_min_rx=2 * vpp_session.required_min_rx,
2550 desired_min_tx=3 * vpp_session.desired_min_tx,
2551 detect_mult=4 * vpp_session.detect_mult)
2552 self.cli_verify_no_response(
2553 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2554 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2555 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2556 mod_session.desired_min_tx,
2557 mod_session.required_min_rx, mod_session.detect_mult))
2558 verify_bfd_session_config(self, mod_session)
2559 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2560 "peer-addr %s" % (self.pg0.name,
2561 self.pg0.local_ip6, self.pg0.remote_ip6)
2562 self.cli_verify_no_response(cli_del_cmd)
2563 # 2nd del is expected to fail
2564 self.cli_verify_response(
2566 "bfd udp session del: `bfd_udp_del_session' API call"
2567 " failed, rv=-102:No such BFD object")
2568 self.assertFalse(vpp_session.query_vpp_config())
2570 def test_add_mod_del_bfd_udp_auth(self):
2571 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2572 key = self.factory.create_random_key(self)
2573 key.add_vpp_config()
2574 vpp_session = VppBFDUDPSession(
2575 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2576 self.registry.register(vpp_session, self.logger)
2577 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2578 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2579 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2580 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2581 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2582 vpp_session.detect_mult, key.conf_key_id,
2583 vpp_session.bfd_key_id)
2584 self.cli_verify_no_response(cli_add_cmd)
2585 # 2nd add should fail
2586 self.cli_verify_response(
2588 "bfd udp session add: `bfd_add_add_session' API call"
2589 " failed, rv=-101:Duplicate BFD object")
2590 verify_bfd_session_config(self, vpp_session)
2591 mod_session = VppBFDUDPSession(
2592 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2593 bfd_key_id=vpp_session.bfd_key_id,
2594 required_min_rx=2 * vpp_session.required_min_rx,
2595 desired_min_tx=3 * vpp_session.desired_min_tx,
2596 detect_mult=4 * vpp_session.detect_mult)
2597 self.cli_verify_no_response(
2598 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2599 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2600 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2601 mod_session.desired_min_tx,
2602 mod_session.required_min_rx, mod_session.detect_mult))
2603 verify_bfd_session_config(self, mod_session)
2604 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2605 "peer-addr %s" % (self.pg0.name,
2606 self.pg0.local_ip4, self.pg0.remote_ip4)
2607 self.cli_verify_no_response(cli_del_cmd)
2608 # 2nd del is expected to fail
2609 self.cli_verify_response(
2611 "bfd udp session del: `bfd_udp_del_session' API call"
2612 " failed, rv=-102:No such BFD object")
2613 self.assertFalse(vpp_session.query_vpp_config())
2615 def test_add_mod_del_bfd_udp6_auth(self):
2616 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2617 key = self.factory.create_random_key(
2618 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2619 key.add_vpp_config()
2620 vpp_session = VppBFDUDPSession(
2621 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2622 self.registry.register(vpp_session, self.logger)
2623 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2624 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2625 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2626 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2627 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2628 vpp_session.detect_mult, key.conf_key_id,
2629 vpp_session.bfd_key_id)
2630 self.cli_verify_no_response(cli_add_cmd)
2631 # 2nd add should fail
2632 self.cli_verify_response(
2634 "bfd udp session add: `bfd_add_add_session' API call"
2635 " failed, rv=-101:Duplicate BFD object")
2636 verify_bfd_session_config(self, vpp_session)
2637 mod_session = VppBFDUDPSession(
2638 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2639 bfd_key_id=vpp_session.bfd_key_id,
2640 required_min_rx=2 * vpp_session.required_min_rx,
2641 desired_min_tx=3 * vpp_session.desired_min_tx,
2642 detect_mult=4 * vpp_session.detect_mult)
2643 self.cli_verify_no_response(
2644 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2645 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2646 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2647 mod_session.desired_min_tx,
2648 mod_session.required_min_rx, mod_session.detect_mult))
2649 verify_bfd_session_config(self, mod_session)
2650 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2651 "peer-addr %s" % (self.pg0.name,
2652 self.pg0.local_ip6, self.pg0.remote_ip6)
2653 self.cli_verify_no_response(cli_del_cmd)
2654 # 2nd del is expected to fail
2655 self.cli_verify_response(
2657 "bfd udp session del: `bfd_udp_del_session' API call"
2658 " failed, rv=-102:No such BFD object")
2659 self.assertFalse(vpp_session.query_vpp_config())
2661 def test_auth_on_off(self):
2662 """ turn authentication on and off """
2663 key = self.factory.create_random_key(
2664 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2665 key.add_vpp_config()
2666 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2667 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2669 session.add_vpp_config()
2671 "bfd udp session auth activate interface %s local-addr %s "\
2672 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2673 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2674 key.conf_key_id, auth_session.bfd_key_id)
2675 self.cli_verify_no_response(cli_activate)
2676 verify_bfd_session_config(self, auth_session)
2677 self.cli_verify_no_response(cli_activate)
2678 verify_bfd_session_config(self, auth_session)
2680 "bfd udp session auth deactivate interface %s local-addr %s "\
2682 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2683 self.cli_verify_no_response(cli_deactivate)
2684 verify_bfd_session_config(self, session)
2685 self.cli_verify_no_response(cli_deactivate)
2686 verify_bfd_session_config(self, session)
2688 def test_auth_on_off_delayed(self):
2689 """ turn authentication on and off (delayed) """
2690 key = self.factory.create_random_key(
2691 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2692 key.add_vpp_config()
2693 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2694 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2696 session.add_vpp_config()
2698 "bfd udp session auth activate interface %s local-addr %s "\
2699 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2700 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2701 key.conf_key_id, auth_session.bfd_key_id)
2702 self.cli_verify_no_response(cli_activate)
2703 verify_bfd_session_config(self, auth_session)
2704 self.cli_verify_no_response(cli_activate)
2705 verify_bfd_session_config(self, auth_session)
2707 "bfd udp session auth deactivate interface %s local-addr %s "\
2708 "peer-addr %s delayed yes"\
2709 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2710 self.cli_verify_no_response(cli_deactivate)
2711 verify_bfd_session_config(self, session)
2712 self.cli_verify_no_response(cli_deactivate)
2713 verify_bfd_session_config(self, session)
2715 def test_admin_up_down(self):
2716 """ put session admin-up and admin-down """
2717 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2718 session.add_vpp_config()
2720 "bfd udp session set-flags admin down interface %s local-addr %s "\
2722 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2724 "bfd udp session set-flags admin up interface %s local-addr %s "\
2726 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2727 self.cli_verify_no_response(cli_down)
2728 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2729 self.cli_verify_no_response(cli_up)
2730 verify_bfd_session_config(self, session, state=BFDState.down)
2732 def test_set_del_udp_echo_source(self):
2733 """ set/del udp echo source """
2734 self.create_loopback_interfaces(1)
2735 self.loopback0 = self.lo_interfaces[0]
2736 self.loopback0.admin_up()
2737 self.cli_verify_response("show bfd echo-source",
2738 "UDP echo source is not set.")
2739 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2740 self.cli_verify_no_response(cli_set)
2741 self.cli_verify_response("show bfd echo-source",
2742 "UDP echo source is: %s\n"
2743 "IPv4 address usable as echo source: none\n"
2744 "IPv6 address usable as echo source: none" %
2745 self.loopback0.name)
2746 self.loopback0.config_ip4()
2747 unpacked = unpack("!L", self.loopback0.local_ip4n)
2748 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2749 self.cli_verify_response("show bfd echo-source",
2750 "UDP echo source is: %s\n"
2751 "IPv4 address usable as echo source: %s\n"
2752 "IPv6 address usable as echo source: none" %
2753 (self.loopback0.name, echo_ip4))
2754 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2755 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2756 unpacked[2], unpacked[3] ^ 1))
2757 self.loopback0.config_ip6()
2758 self.cli_verify_response("show bfd echo-source",
2759 "UDP echo source is: %s\n"
2760 "IPv4 address usable as echo source: %s\n"
2761 "IPv6 address usable as echo source: %s" %
2762 (self.loopback0.name, echo_ip4, echo_ip6))
2763 cli_del = "bfd udp echo-source del"
2764 self.cli_verify_no_response(cli_del)
2765 self.cli_verify_response("show bfd echo-source",
2766 "UDP echo source is not set.")
2768 if __name__ == '__main__':
2769 unittest.main(testRunner=VppTestRunner)