4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import UnexpectedApiReturnValueError
23 from vpp_ip import DpoProto
24 from vpp_ip_route import VppIpRoute, VppRoutePath
29 class AuthKeyFactory(object):
30 """Factory class for creating auth keys with unique conf key ID"""
33 self._conf_key_ids = {}
35 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
36 """ create a random key with unique conf key id """
37 conf_key_id = randint(0, 0xFFFFFFFF)
38 while conf_key_id in self._conf_key_ids:
39 conf_key_id = randint(0, 0xFFFFFFFF)
40 self._conf_key_ids[conf_key_id] = 1
41 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
42 return VppBFDAuthKey(test=test, auth_type=auth_type,
43 conf_key_id=conf_key_id, key=key)
46 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
47 class BFDAPITestCase(VppTestCase):
48 """Bidirectional Forwarding Detection (BFD) - API"""
55 super(BFDAPITestCase, cls).setUpClass()
56 cls.vapi.cli("set log class bfd level debug")
58 cls.create_pg_interfaces(range(2))
59 for i in cls.pg_interfaces:
65 super(BFDAPITestCase, cls).tearDownClass()
69 super(BFDAPITestCase, self).setUp()
70 self.factory = AuthKeyFactory()
72 def test_add_bfd(self):
73 """ create a BFD session """
74 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
75 session.add_vpp_config()
76 self.logger.debug("Session state is %s", session.state)
77 session.remove_vpp_config()
78 session.add_vpp_config()
79 self.logger.debug("Session state is %s", session.state)
80 session.remove_vpp_config()
82 def test_double_add(self):
83 """ create the same BFD session twice (negative case) """
84 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
85 session.add_vpp_config()
87 with self.vapi.assert_negative_api_retval():
88 session.add_vpp_config()
90 session.remove_vpp_config()
92 def test_add_bfd6(self):
93 """ create IPv6 BFD session """
94 session = VppBFDUDPSession(
95 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
96 session.add_vpp_config()
97 self.logger.debug("Session state is %s", session.state)
98 session.remove_vpp_config()
99 session.add_vpp_config()
100 self.logger.debug("Session state is %s", session.state)
101 session.remove_vpp_config()
103 def test_mod_bfd(self):
104 """ modify BFD session parameters """
105 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
106 desired_min_tx=50000,
107 required_min_rx=10000,
109 session.add_vpp_config()
110 s = session.get_bfd_udp_session_dump_entry()
111 self.assert_equal(session.desired_min_tx,
113 "desired min transmit interval")
114 self.assert_equal(session.required_min_rx,
116 "required min receive interval")
117 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
118 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
119 required_min_rx=session.required_min_rx * 2,
120 detect_mult=session.detect_mult * 2)
121 s = session.get_bfd_udp_session_dump_entry()
122 self.assert_equal(session.desired_min_tx,
124 "desired min transmit interval")
125 self.assert_equal(session.required_min_rx,
127 "required min receive interval")
128 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130 def test_add_sha1_keys(self):
131 """ add SHA1 keys """
133 keys = [self.factory.create_random_key(
134 self) for i in range(0, key_count)]
136 self.assertFalse(key.query_vpp_config())
140 self.assertTrue(key.query_vpp_config())
142 indexes = range(key_count)
147 key.remove_vpp_config()
149 for j in range(key_count):
152 self.assertFalse(key.query_vpp_config())
154 self.assertTrue(key.query_vpp_config())
155 # should be removed now
157 self.assertFalse(key.query_vpp_config())
158 # add back and remove again
162 self.assertTrue(key.query_vpp_config())
164 key.remove_vpp_config()
166 self.assertFalse(key.query_vpp_config())
168 def test_add_bfd_sha1(self):
169 """ create a BFD session (SHA1) """
170 key = self.factory.create_random_key(self)
172 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
174 session.add_vpp_config()
175 self.logger.debug("Session state is %s", session.state)
176 session.remove_vpp_config()
177 session.add_vpp_config()
178 self.logger.debug("Session state is %s", session.state)
179 session.remove_vpp_config()
181 def test_double_add_sha1(self):
182 """ create the same BFD session twice (negative case) (SHA1) """
183 key = self.factory.create_random_key(self)
185 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
187 session.add_vpp_config()
188 with self.assertRaises(Exception):
189 session.add_vpp_config()
191 def test_add_auth_nonexistent_key(self):
192 """ create BFD session using non-existent SHA1 (negative case) """
193 session = VppBFDUDPSession(
194 self, self.pg0, self.pg0.remote_ip4,
195 sha1_key=self.factory.create_random_key(self))
196 with self.assertRaises(Exception):
197 session.add_vpp_config()
199 def test_shared_sha1_key(self):
200 """ share single SHA1 key between multiple BFD sessions """
201 key = self.factory.create_random_key(self)
204 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
206 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
207 sha1_key=key, af=AF_INET6),
208 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
210 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
211 sha1_key=key, af=AF_INET6)]
216 e = key.get_bfd_auth_keys_dump_entry()
217 self.assert_equal(e.use_count, len(sessions) - removed,
218 "Use count for shared key")
219 s.remove_vpp_config()
221 e = key.get_bfd_auth_keys_dump_entry()
222 self.assert_equal(e.use_count, len(sessions) - removed,
223 "Use count for shared key")
225 def test_activate_auth(self):
226 """ activate SHA1 authentication """
227 key = self.factory.create_random_key(self)
229 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
230 session.add_vpp_config()
231 session.activate_auth(key)
233 def test_deactivate_auth(self):
234 """ deactivate SHA1 authentication """
235 key = self.factory.create_random_key(self)
237 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
238 session.add_vpp_config()
239 session.activate_auth(key)
240 session.deactivate_auth()
242 def test_change_key(self):
243 """ change SHA1 key """
244 key1 = self.factory.create_random_key(self)
245 key2 = self.factory.create_random_key(self)
246 while key2.conf_key_id == key1.conf_key_id:
247 key2 = self.factory.create_random_key(self)
248 key1.add_vpp_config()
249 key2.add_vpp_config()
250 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
252 session.add_vpp_config()
253 session.activate_auth(key2)
255 def test_set_del_udp_echo_source(self):
256 """ set/del udp echo source """
257 self.create_loopback_interfaces(1)
258 self.loopback0 = self.lo_interfaces[0]
259 self.loopback0.admin_up()
260 echo_source = self.vapi.bfd_udp_get_echo_source()
261 self.assertFalse(echo_source.is_set)
262 self.assertFalse(echo_source.have_usable_ip4)
263 self.assertFalse(echo_source.have_usable_ip6)
265 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
266 echo_source = self.vapi.bfd_udp_get_echo_source()
267 self.assertTrue(echo_source.is_set)
268 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
269 self.assertFalse(echo_source.have_usable_ip4)
270 self.assertFalse(echo_source.have_usable_ip6)
272 self.loopback0.config_ip4()
273 unpacked = unpack("!L", self.loopback0.local_ip4n)
274 echo_ip4 = pack("!L", unpacked[0] ^ 1)
275 echo_source = self.vapi.bfd_udp_get_echo_source()
276 self.assertTrue(echo_source.is_set)
277 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
278 self.assertTrue(echo_source.have_usable_ip4)
279 self.assertEqual(echo_source.ip4_addr, echo_ip4)
280 self.assertFalse(echo_source.have_usable_ip6)
282 self.loopback0.config_ip6()
283 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
284 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
286 echo_source = self.vapi.bfd_udp_get_echo_source()
287 self.assertTrue(echo_source.is_set)
288 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
289 self.assertTrue(echo_source.have_usable_ip4)
290 self.assertEqual(echo_source.ip4_addr, echo_ip4)
291 self.assertTrue(echo_source.have_usable_ip6)
292 self.assertEqual(echo_source.ip6_addr, echo_ip6)
294 self.vapi.bfd_udp_del_echo_source()
295 echo_source = self.vapi.bfd_udp_get_echo_source()
296 self.assertFalse(echo_source.is_set)
297 self.assertFalse(echo_source.have_usable_ip4)
298 self.assertFalse(echo_source.have_usable_ip6)
301 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
302 class BFDTestSession(object):
303 """ BFD session as seen from test framework side """
305 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
306 bfd_key_id=None, our_seq_number=None):
309 self.sha1_key = sha1_key
310 self.bfd_key_id = bfd_key_id
311 self.interface = interface
312 self.udp_sport = randint(49152, 65535)
313 if our_seq_number is None:
314 self.our_seq_number = randint(0, 40000000)
316 self.our_seq_number = our_seq_number
317 self.vpp_seq_number = None
318 self.my_discriminator = 0
319 self.desired_min_tx = 300000
320 self.required_min_rx = 300000
321 self.required_min_echo_rx = None
322 self.detect_mult = detect_mult
323 self.diag = BFDDiagCode.no_diagnostic
324 self.your_discriminator = None
325 self.state = BFDState.down
326 self.auth_type = BFDAuthType.no_auth
328 def inc_seq_num(self):
329 """ increment sequence number, wrapping if needed """
330 if self.our_seq_number == 0xFFFFFFFF:
331 self.our_seq_number = 0
333 self.our_seq_number += 1
335 def update(self, my_discriminator=None, your_discriminator=None,
336 desired_min_tx=None, required_min_rx=None,
337 required_min_echo_rx=None, detect_mult=None,
338 diag=None, state=None, auth_type=None):
339 """ update BFD parameters associated with session """
340 if my_discriminator is not None:
341 self.my_discriminator = my_discriminator
342 if your_discriminator is not None:
343 self.your_discriminator = your_discriminator
344 if required_min_rx is not None:
345 self.required_min_rx = required_min_rx
346 if required_min_echo_rx is not None:
347 self.required_min_echo_rx = required_min_echo_rx
348 if desired_min_tx is not None:
349 self.desired_min_tx = desired_min_tx
350 if detect_mult is not None:
351 self.detect_mult = detect_mult
354 if state is not None:
356 if auth_type is not None:
357 self.auth_type = auth_type
359 def fill_packet_fields(self, packet):
360 """ set packet fields with known values in packet """
362 if self.my_discriminator:
363 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
364 self.my_discriminator)
365 bfd.my_discriminator = self.my_discriminator
366 if self.your_discriminator:
367 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
368 self.your_discriminator)
369 bfd.your_discriminator = self.your_discriminator
370 if self.required_min_rx:
371 self.test.logger.debug(
372 "BFD: setting packet.required_min_rx_interval=%s",
373 self.required_min_rx)
374 bfd.required_min_rx_interval = self.required_min_rx
375 if self.required_min_echo_rx:
376 self.test.logger.debug(
377 "BFD: setting packet.required_min_echo_rx=%s",
378 self.required_min_echo_rx)
379 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
380 if self.desired_min_tx:
381 self.test.logger.debug(
382 "BFD: setting packet.desired_min_tx_interval=%s",
384 bfd.desired_min_tx_interval = self.desired_min_tx
386 self.test.logger.debug(
387 "BFD: setting packet.detect_mult=%s", self.detect_mult)
388 bfd.detect_mult = self.detect_mult
390 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
393 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
394 bfd.state = self.state
396 # this is used by a negative test-case
397 self.test.logger.debug("BFD: setting packet.auth_type=%s",
399 bfd.auth_type = self.auth_type
401 def create_packet(self):
402 """ create a BFD packet, reflecting the current state of session """
405 bfd.auth_type = self.sha1_key.auth_type
406 bfd.auth_len = BFD.sha1_auth_len
407 bfd.auth_key_id = self.bfd_key_id
408 bfd.auth_seq_num = self.our_seq_number
409 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
412 if self.af == AF_INET6:
413 packet = (Ether(src=self.interface.remote_mac,
414 dst=self.interface.local_mac) /
415 IPv6(src=self.interface.remote_ip6,
416 dst=self.interface.local_ip6,
418 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
421 packet = (Ether(src=self.interface.remote_mac,
422 dst=self.interface.local_mac) /
423 IP(src=self.interface.remote_ip4,
424 dst=self.interface.local_ip4,
426 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
428 self.test.logger.debug("BFD: Creating packet")
429 self.fill_packet_fields(packet)
431 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
432 "\0" * (20 - len(self.sha1_key.key))
433 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
434 hashlib.sha1(hash_material).hexdigest())
435 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
438 def send_packet(self, packet=None, interface=None):
439 """ send packet on interface, creating the packet if needed """
441 packet = self.create_packet()
442 if interface is None:
443 interface = self.test.pg0
444 self.test.logger.debug(ppp("Sending packet:", packet))
445 interface.add_stream(packet)
448 def verify_sha1_auth(self, packet):
449 """ Verify correctness of authentication in BFD layer. """
451 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
452 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
454 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
455 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
456 if self.vpp_seq_number is None:
457 self.vpp_seq_number = bfd.auth_seq_num
458 self.test.logger.debug("Received initial sequence number: %s" %
461 recvd_seq_num = bfd.auth_seq_num
462 self.test.logger.debug("Received followup sequence number: %s" %
464 if self.vpp_seq_number < 0xffffffff:
465 if self.sha1_key.auth_type == \
466 BFDAuthType.meticulous_keyed_sha1:
467 self.test.assert_equal(recvd_seq_num,
468 self.vpp_seq_number + 1,
469 "BFD sequence number")
471 self.test.assert_in_range(recvd_seq_num,
473 self.vpp_seq_number + 1,
474 "BFD sequence number")
476 if self.sha1_key.auth_type == \
477 BFDAuthType.meticulous_keyed_sha1:
478 self.test.assert_equal(recvd_seq_num, 0,
479 "BFD sequence number")
481 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
482 "BFD sequence number not one of "
483 "(%s, 0)" % self.vpp_seq_number)
484 self.vpp_seq_number = recvd_seq_num
485 # last 20 bytes represent the hash - so replace them with the key,
486 # pad the result with zeros and hash the result
487 hash_material = bfd.original[:-20] + self.sha1_key.key + \
488 "\0" * (20 - len(self.sha1_key.key))
489 expected_hash = hashlib.sha1(hash_material).hexdigest()
490 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
491 expected_hash, "Auth key hash")
493 def verify_bfd(self, packet):
494 """ Verify correctness of BFD layer. """
496 self.test.assert_equal(bfd.version, 1, "BFD version")
497 self.test.assert_equal(bfd.your_discriminator,
498 self.my_discriminator,
499 "BFD - your discriminator")
501 self.verify_sha1_auth(packet)
504 def bfd_session_up(test):
505 """ Bring BFD session up """
506 test.logger.info("BFD: Waiting for slow hello")
507 p = wait_for_bfd_packet(test, 2)
509 if hasattr(test, 'vpp_clock_offset'):
510 old_offset = test.vpp_clock_offset
511 test.vpp_clock_offset = time.time() - p.time
512 test.logger.debug("BFD: Calculated vpp clock offset: %s",
513 test.vpp_clock_offset)
515 test.assertAlmostEqual(
516 old_offset, test.vpp_clock_offset, delta=0.5,
517 msg="vpp clock offset not stable (new: %s, old: %s)" %
518 (test.vpp_clock_offset, old_offset))
519 test.logger.info("BFD: Sending Init")
520 test.test_session.update(my_discriminator=randint(0, 40000000),
521 your_discriminator=p[BFD].my_discriminator,
523 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
524 BFDAuthType.meticulous_keyed_sha1:
525 test.test_session.inc_seq_num()
526 test.test_session.send_packet()
527 test.logger.info("BFD: Waiting for event")
528 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
529 verify_event(test, e, expected_state=BFDState.up)
530 test.logger.info("BFD: Session is Up")
531 test.test_session.update(state=BFDState.up)
532 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
533 BFDAuthType.meticulous_keyed_sha1:
534 test.test_session.inc_seq_num()
535 test.test_session.send_packet()
536 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
539 def bfd_session_down(test):
540 """ Bring BFD session down """
541 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
542 test.test_session.update(state=BFDState.down)
543 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
544 BFDAuthType.meticulous_keyed_sha1:
545 test.test_session.inc_seq_num()
546 test.test_session.send_packet()
547 test.logger.info("BFD: Waiting for event")
548 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
549 verify_event(test, e, expected_state=BFDState.down)
550 test.logger.info("BFD: Session is Down")
551 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
554 def verify_bfd_session_config(test, session, state=None):
555 dump = session.get_bfd_udp_session_dump_entry()
556 test.assertIsNotNone(dump)
557 # since dump is not none, we have verified that sw_if_index and addresses
558 # are valid (in get_bfd_udp_session_dump_entry)
560 test.assert_equal(dump.state, state, "session state")
561 test.assert_equal(dump.required_min_rx, session.required_min_rx,
562 "required min rx interval")
563 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
564 "desired min tx interval")
565 test.assert_equal(dump.detect_mult, session.detect_mult,
567 if session.sha1_key is None:
568 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
570 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
571 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
573 test.assert_equal(dump.conf_key_id,
574 session.sha1_key.conf_key_id,
578 def verify_ip(test, packet):
579 """ Verify correctness of IP layer. """
580 if test.vpp_session.af == AF_INET6:
582 local_ip = test.pg0.local_ip6
583 remote_ip = test.pg0.remote_ip6
584 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
587 local_ip = test.pg0.local_ip4
588 remote_ip = test.pg0.remote_ip4
589 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
590 test.assert_equal(ip.src, local_ip, "IP source address")
591 test.assert_equal(ip.dst, remote_ip, "IP destination address")
594 def verify_udp(test, packet):
595 """ Verify correctness of UDP layer. """
597 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
598 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
602 def verify_event(test, event, expected_state):
603 """ Verify correctness of event values. """
605 test.logger.debug("BFD: Event: %s" % repr(e))
606 test.assert_equal(e.sw_if_index,
607 test.vpp_session.interface.sw_if_index,
608 "BFD interface index")
610 if test.vpp_session.af == AF_INET6:
612 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
613 if test.vpp_session.af == AF_INET:
614 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
615 "Local IPv4 address")
616 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
619 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
620 "Local IPv6 address")
621 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
623 test.assert_equal(e.state, expected_state, BFDState)
626 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
627 """ wait for BFD packet and verify its correctness
629 :param timeout: how long to wait
630 :param pcap_time_min: ignore packets with pcap timestamp lower than this
632 :returns: tuple (packet, time spent waiting for packet)
634 test.logger.info("BFD: Waiting for BFD packet")
635 deadline = time.time() + timeout
640 test.assert_in_range(counter, 0, 100, "number of packets ignored")
641 time_left = deadline - time.time()
643 raise CaptureTimeoutError("Packet did not arrive within timeout")
644 p = test.pg0.wait_for_packet(timeout=time_left)
645 test.logger.debug(ppp("BFD: Got packet:", p))
646 if pcap_time_min is not None and p.time < pcap_time_min:
647 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
648 "pcap time min %s):" %
649 (p.time, pcap_time_min), p))
654 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
656 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
659 test.test_session.verify_bfd(p)
663 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
664 class BFD4TestCase(VppTestCase):
665 """Bidirectional Forwarding Detection (BFD)"""
668 vpp_clock_offset = None
674 super(BFD4TestCase, cls).setUpClass()
675 cls.vapi.cli("set log class bfd level debug")
677 cls.create_pg_interfaces([0])
678 cls.create_loopback_interfaces(1)
679 cls.loopback0 = cls.lo_interfaces[0]
680 cls.loopback0.config_ip4()
681 cls.loopback0.admin_up()
683 cls.pg0.configure_ipv4_neighbors()
685 cls.pg0.resolve_arp()
688 super(BFD4TestCase, cls).tearDownClass()
692 super(BFD4TestCase, self).setUp()
693 self.factory = AuthKeyFactory()
694 self.vapi.want_bfd_events()
695 self.pg0.enable_capture()
697 self.vpp_session = VppBFDUDPSession(self, self.pg0,
699 self.vpp_session.add_vpp_config()
700 self.vpp_session.admin_up()
701 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
703 self.vapi.want_bfd_events(enable_disable=0)
707 if not self.vpp_dead:
708 self.vapi.want_bfd_events(enable_disable=0)
709 self.vapi.collect_events() # clear the event queue
710 super(BFD4TestCase, self).tearDown()
712 def test_session_up(self):
713 """ bring BFD session up """
716 def test_session_up_by_ip(self):
717 """ bring BFD session up - first frame looked up by address pair """
718 self.logger.info("BFD: Sending Slow control frame")
719 self.test_session.update(my_discriminator=randint(0, 40000000))
720 self.test_session.send_packet()
721 self.pg0.enable_capture()
722 p = self.pg0.wait_for_packet(1)
723 self.assert_equal(p[BFD].your_discriminator,
724 self.test_session.my_discriminator,
725 "BFD - your discriminator")
726 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
727 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
729 self.logger.info("BFD: Waiting for event")
730 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
731 verify_event(self, e, expected_state=BFDState.init)
732 self.logger.info("BFD: Sending Up")
733 self.test_session.send_packet()
734 self.logger.info("BFD: Waiting for event")
735 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
736 verify_event(self, e, expected_state=BFDState.up)
737 self.logger.info("BFD: Session is Up")
738 self.test_session.update(state=BFDState.up)
739 self.test_session.send_packet()
740 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
742 def test_session_down(self):
743 """ bring BFD session down """
745 bfd_session_down(self)
747 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
748 def test_hold_up(self):
749 """ hold BFD session up """
751 for dummy in range(self.test_session.detect_mult * 2):
752 wait_for_bfd_packet(self)
753 self.test_session.send_packet()
754 self.assert_equal(len(self.vapi.collect_events()), 0,
755 "number of bfd events")
757 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
758 def test_slow_timer(self):
759 """ verify slow periodic control frames while session down """
761 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
762 prev_packet = wait_for_bfd_packet(self, 2)
763 for dummy in range(packet_count):
764 next_packet = wait_for_bfd_packet(self, 2)
765 time_diff = next_packet.time - prev_packet.time
766 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
767 # to work around timing issues
768 self.assert_in_range(
769 time_diff, 0.70, 1.05, "time between slow packets")
770 prev_packet = next_packet
772 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
773 def test_zero_remote_min_rx(self):
774 """ no packets when zero remote required min rx interval """
776 self.test_session.update(required_min_rx=0)
777 self.test_session.send_packet()
778 for dummy in range(self.test_session.detect_mult):
779 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
780 "sleep before transmitting bfd packet")
781 self.test_session.send_packet()
783 p = wait_for_bfd_packet(self, timeout=0)
784 self.logger.error(ppp("Received unexpected packet:", p))
785 except CaptureTimeoutError:
788 len(self.vapi.collect_events()), 0, "number of bfd events")
789 self.test_session.update(required_min_rx=300000)
790 for dummy in range(3):
791 self.test_session.send_packet()
793 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
795 len(self.vapi.collect_events()), 0, "number of bfd events")
797 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
798 def test_conn_down(self):
799 """ verify session goes down after inactivity """
801 detection_time = self.test_session.detect_mult *\
802 self.vpp_session.required_min_rx / USEC_IN_SEC
803 self.sleep(detection_time, "waiting for BFD session time-out")
804 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
805 verify_event(self, e, expected_state=BFDState.down)
807 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
808 def test_large_required_min_rx(self):
809 """ large remote required min rx interval """
811 p = wait_for_bfd_packet(self)
813 self.test_session.update(required_min_rx=interval)
814 self.test_session.send_packet()
815 time_mark = time.time()
817 # busy wait here, trying to collect a packet or event, vpp is not
818 # allowed to send packets and the session will timeout first - so the
819 # Up->Down event must arrive before any packets do
820 while time.time() < time_mark + interval / USEC_IN_SEC:
822 p = wait_for_bfd_packet(self, timeout=0)
823 # if vpp managed to send a packet before we did the session
824 # session update, then that's fine, ignore it
825 if p.time < time_mark - self.vpp_clock_offset:
827 self.logger.error(ppp("Received unexpected packet:", p))
829 except CaptureTimeoutError:
831 events = self.vapi.collect_events()
833 verify_event(self, events[0], BFDState.down)
835 self.assert_equal(count, 0, "number of packets received")
837 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
838 def test_immediate_remote_min_rx_reduction(self):
839 """ immediately honor remote required min rx reduction """
840 self.vpp_session.remove_vpp_config()
841 self.vpp_session = VppBFDUDPSession(
842 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
843 self.pg0.enable_capture()
844 self.vpp_session.add_vpp_config()
845 self.test_session.update(desired_min_tx=1000000,
846 required_min_rx=1000000)
848 reference_packet = wait_for_bfd_packet(self)
849 time_mark = time.time()
851 self.test_session.update(required_min_rx=interval)
852 self.test_session.send_packet()
853 extra_time = time.time() - time_mark
854 p = wait_for_bfd_packet(self)
855 # first packet is allowed to be late by time we spent doing the update
856 # calculated in extra_time
857 self.assert_in_range(p.time - reference_packet.time,
858 .95 * 0.75 * interval / USEC_IN_SEC,
859 1.05 * interval / USEC_IN_SEC + extra_time,
860 "time between BFD packets")
862 for dummy in range(3):
863 p = wait_for_bfd_packet(self)
864 diff = p.time - reference_packet.time
865 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
866 1.05 * interval / USEC_IN_SEC,
867 "time between BFD packets")
870 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
871 def test_modify_req_min_rx_double(self):
872 """ modify session - double required min rx """
874 p = wait_for_bfd_packet(self)
875 self.test_session.update(desired_min_tx=10000,
876 required_min_rx=10000)
877 self.test_session.send_packet()
878 # double required min rx
879 self.vpp_session.modify_parameters(
880 required_min_rx=2 * self.vpp_session.required_min_rx)
881 p = wait_for_bfd_packet(
882 self, pcap_time_min=time.time() - self.vpp_clock_offset)
883 # poll bit needs to be set
884 self.assertIn("P", p.sprintf("%BFD.flags%"),
885 "Poll bit not set in BFD packet")
886 # finish poll sequence with final packet
887 final = self.test_session.create_packet()
888 final[BFD].flags = "F"
889 timeout = self.test_session.detect_mult * \
890 max(self.test_session.desired_min_tx,
891 self.vpp_session.required_min_rx) / USEC_IN_SEC
892 self.test_session.send_packet(final)
893 time_mark = time.time()
894 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
895 verify_event(self, e, expected_state=BFDState.down)
896 time_to_event = time.time() - time_mark
897 self.assert_in_range(time_to_event, .9 * timeout,
898 1.1 * timeout, "session timeout")
900 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
901 def test_modify_req_min_rx_halve(self):
902 """ modify session - halve required min rx """
903 self.vpp_session.modify_parameters(
904 required_min_rx=2 * self.vpp_session.required_min_rx)
906 p = wait_for_bfd_packet(self)
907 self.test_session.update(desired_min_tx=10000,
908 required_min_rx=10000)
909 self.test_session.send_packet()
910 p = wait_for_bfd_packet(
911 self, pcap_time_min=time.time() - self.vpp_clock_offset)
912 # halve required min rx
913 old_required_min_rx = self.vpp_session.required_min_rx
914 self.vpp_session.modify_parameters(
915 required_min_rx=0.5 * self.vpp_session.required_min_rx)
916 # now we wait 0.8*3*old-req-min-rx and the session should still be up
917 self.sleep(0.8 * self.vpp_session.detect_mult *
918 old_required_min_rx / USEC_IN_SEC,
919 "wait before finishing poll sequence")
920 self.assert_equal(len(self.vapi.collect_events()), 0,
921 "number of bfd events")
922 p = wait_for_bfd_packet(self)
923 # poll bit needs to be set
924 self.assertIn("P", p.sprintf("%BFD.flags%"),
925 "Poll bit not set in BFD packet")
926 # finish poll sequence with final packet
927 final = self.test_session.create_packet()
928 final[BFD].flags = "F"
929 self.test_session.send_packet(final)
930 # now the session should time out under new conditions
931 detection_time = self.test_session.detect_mult *\
932 self.vpp_session.required_min_rx / USEC_IN_SEC
934 e = self.vapi.wait_for_event(
935 2 * detection_time, "bfd_udp_session_details")
937 self.assert_in_range(after - before,
938 0.9 * detection_time,
939 1.1 * detection_time,
940 "time before bfd session goes down")
941 verify_event(self, e, expected_state=BFDState.down)
943 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
944 def test_modify_detect_mult(self):
945 """ modify detect multiplier """
947 p = wait_for_bfd_packet(self)
948 self.vpp_session.modify_parameters(detect_mult=1)
949 p = wait_for_bfd_packet(
950 self, pcap_time_min=time.time() - self.vpp_clock_offset)
951 self.assert_equal(self.vpp_session.detect_mult,
954 # poll bit must not be set
955 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
956 "Poll bit not set in BFD packet")
957 self.vpp_session.modify_parameters(detect_mult=10)
958 p = wait_for_bfd_packet(
959 self, pcap_time_min=time.time() - self.vpp_clock_offset)
960 self.assert_equal(self.vpp_session.detect_mult,
963 # poll bit must not be set
964 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
965 "Poll bit not set in BFD packet")
967 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
968 def test_queued_poll(self):
969 """ test poll sequence queueing """
971 p = wait_for_bfd_packet(self)
972 self.vpp_session.modify_parameters(
973 required_min_rx=2 * self.vpp_session.required_min_rx)
974 p = wait_for_bfd_packet(self)
975 poll_sequence_start = time.time()
976 poll_sequence_length_min = 0.5
977 send_final_after = time.time() + poll_sequence_length_min
978 # poll bit needs to be set
979 self.assertIn("P", p.sprintf("%BFD.flags%"),
980 "Poll bit not set in BFD packet")
981 self.assert_equal(p[BFD].required_min_rx_interval,
982 self.vpp_session.required_min_rx,
983 "BFD required min rx interval")
984 self.vpp_session.modify_parameters(
985 required_min_rx=2 * self.vpp_session.required_min_rx)
986 # 2nd poll sequence should be queued now
987 # don't send the reply back yet, wait for some time to emulate
988 # longer round-trip time
990 while time.time() < send_final_after:
991 self.test_session.send_packet()
992 p = wait_for_bfd_packet(self)
993 self.assert_equal(len(self.vapi.collect_events()), 0,
994 "number of bfd events")
995 self.assert_equal(p[BFD].required_min_rx_interval,
996 self.vpp_session.required_min_rx,
997 "BFD required min rx interval")
999 # poll bit must be set
1000 self.assertIn("P", p.sprintf("%BFD.flags%"),
1001 "Poll bit not set in BFD packet")
1002 final = self.test_session.create_packet()
1003 final[BFD].flags = "F"
1004 self.test_session.send_packet(final)
1005 # finish 1st with final
1006 poll_sequence_length = time.time() - poll_sequence_start
1007 # vpp must wait for some time before starting new poll sequence
1008 poll_no_2_started = False
1009 for dummy in range(2 * packet_count):
1010 p = wait_for_bfd_packet(self)
1011 self.assert_equal(len(self.vapi.collect_events()), 0,
1012 "number of bfd events")
1013 if "P" in p.sprintf("%BFD.flags%"):
1014 poll_no_2_started = True
1015 if time.time() < poll_sequence_start + poll_sequence_length:
1016 raise Exception("VPP started 2nd poll sequence too soon")
1017 final = self.test_session.create_packet()
1018 final[BFD].flags = "F"
1019 self.test_session.send_packet(final)
1022 self.test_session.send_packet()
1023 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1024 # finish 2nd with final
1025 final = self.test_session.create_packet()
1026 final[BFD].flags = "F"
1027 self.test_session.send_packet(final)
1028 p = wait_for_bfd_packet(self)
1029 # poll bit must not be set
1030 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1031 "Poll bit set in BFD packet")
1033 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1034 def test_poll_response(self):
1035 """ test correct response to control frame with poll bit set """
1036 bfd_session_up(self)
1037 poll = self.test_session.create_packet()
1038 poll[BFD].flags = "P"
1039 self.test_session.send_packet(poll)
1040 final = wait_for_bfd_packet(
1041 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1042 self.assertIn("F", final.sprintf("%BFD.flags%"))
1044 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1045 def test_no_periodic_if_remote_demand(self):
1046 """ no periodic frames outside poll sequence if remote demand set """
1047 bfd_session_up(self)
1048 demand = self.test_session.create_packet()
1049 demand[BFD].flags = "D"
1050 self.test_session.send_packet(demand)
1051 transmit_time = 0.9 \
1052 * max(self.vpp_session.required_min_rx,
1053 self.test_session.desired_min_tx) \
1056 for dummy in range(self.test_session.detect_mult * 2):
1057 time.sleep(transmit_time)
1058 self.test_session.send_packet(demand)
1060 p = wait_for_bfd_packet(self, timeout=0)
1061 self.logger.error(ppp("Received unexpected packet:", p))
1063 except CaptureTimeoutError:
1065 events = self.vapi.collect_events()
1067 self.logger.error("Received unexpected event: %s", e)
1068 self.assert_equal(count, 0, "number of packets received")
1069 self.assert_equal(len(events), 0, "number of events received")
1071 def test_echo_looped_back(self):
1072 """ echo packets looped back """
1073 # don't need a session in this case..
1074 self.vpp_session.remove_vpp_config()
1075 self.pg0.enable_capture()
1076 echo_packet_count = 10
1077 # random source port low enough to increment a few times..
1078 udp_sport_tx = randint(1, 50000)
1079 udp_sport_rx = udp_sport_tx
1080 echo_packet = (Ether(src=self.pg0.remote_mac,
1081 dst=self.pg0.local_mac) /
1082 IP(src=self.pg0.remote_ip4,
1083 dst=self.pg0.remote_ip4) /
1084 UDP(dport=BFD.udp_dport_echo) /
1085 Raw("this should be looped back"))
1086 for dummy in range(echo_packet_count):
1087 self.sleep(.01, "delay between echo packets")
1088 echo_packet[UDP].sport = udp_sport_tx
1090 self.logger.debug(ppp("Sending packet:", echo_packet))
1091 self.pg0.add_stream(echo_packet)
1093 for dummy in range(echo_packet_count):
1094 p = self.pg0.wait_for_packet(1)
1095 self.logger.debug(ppp("Got packet:", p))
1097 self.assert_equal(self.pg0.remote_mac,
1098 ether.dst, "Destination MAC")
1099 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1101 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1102 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1104 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1105 "UDP destination port")
1106 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1108 # need to compare the hex payload here, otherwise BFD_vpp_echo
1110 self.assertEqual(str(p[UDP].payload),
1111 str(echo_packet[UDP].payload),
1112 "Received packet is not the echo packet sent")
1113 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1114 "ECHO packet identifier for test purposes)")
1116 def test_echo(self):
1117 """ echo function """
1118 bfd_session_up(self)
1119 self.test_session.update(required_min_echo_rx=150000)
1120 self.test_session.send_packet()
1121 detection_time = self.test_session.detect_mult *\
1122 self.vpp_session.required_min_rx / USEC_IN_SEC
1123 # echo shouldn't work without echo source set
1124 for dummy in range(10):
1125 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1126 self.sleep(sleep, "delay before sending bfd packet")
1127 self.test_session.send_packet()
1128 p = wait_for_bfd_packet(
1129 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1130 self.assert_equal(p[BFD].required_min_rx_interval,
1131 self.vpp_session.required_min_rx,
1132 "BFD required min rx interval")
1133 self.test_session.send_packet()
1134 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1136 # should be turned on - loopback echo packets
1137 for dummy in range(3):
1138 loop_until = time.time() + 0.75 * detection_time
1139 while time.time() < loop_until:
1140 p = self.pg0.wait_for_packet(1)
1141 self.logger.debug(ppp("Got packet:", p))
1142 if p[UDP].dport == BFD.udp_dport_echo:
1144 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1145 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1146 "BFD ECHO src IP equal to loopback IP")
1147 self.logger.debug(ppp("Looping back packet:", p))
1148 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1149 "ECHO packet destination MAC address")
1150 p[Ether].dst = self.pg0.local_mac
1151 self.pg0.add_stream(p)
1154 elif p.haslayer(BFD):
1156 self.assertGreaterEqual(
1157 p[BFD].required_min_rx_interval,
1159 if "P" in p.sprintf("%BFD.flags%"):
1160 final = self.test_session.create_packet()
1161 final[BFD].flags = "F"
1162 self.test_session.send_packet(final)
1164 raise Exception(ppp("Received unknown packet:", p))
1166 self.assert_equal(len(self.vapi.collect_events()), 0,
1167 "number of bfd events")
1168 self.test_session.send_packet()
1169 self.assertTrue(echo_seen, "No echo packets received")
1171 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1172 def test_echo_fail(self):
1173 """ session goes down if echo function fails """
1174 bfd_session_up(self)
1175 self.test_session.update(required_min_echo_rx=150000)
1176 self.test_session.send_packet()
1177 detection_time = self.test_session.detect_mult *\
1178 self.vpp_session.required_min_rx / USEC_IN_SEC
1179 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1180 # echo function should be used now, but we will drop the echo packets
1181 verified_diag = False
1182 for dummy in range(3):
1183 loop_until = time.time() + 0.75 * detection_time
1184 while time.time() < loop_until:
1185 p = self.pg0.wait_for_packet(1)
1186 self.logger.debug(ppp("Got packet:", p))
1187 if p[UDP].dport == BFD.udp_dport_echo:
1190 elif p.haslayer(BFD):
1191 if "P" in p.sprintf("%BFD.flags%"):
1192 self.assertGreaterEqual(
1193 p[BFD].required_min_rx_interval,
1195 final = self.test_session.create_packet()
1196 final[BFD].flags = "F"
1197 self.test_session.send_packet(final)
1198 if p[BFD].state == BFDState.down:
1199 self.assert_equal(p[BFD].diag,
1200 BFDDiagCode.echo_function_failed,
1202 verified_diag = True
1204 raise Exception(ppp("Received unknown packet:", p))
1205 self.test_session.send_packet()
1206 events = self.vapi.collect_events()
1207 self.assert_equal(len(events), 1, "number of bfd events")
1208 self.assert_equal(events[0].state, BFDState.down, BFDState)
1209 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1211 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1212 def test_echo_stop(self):
1213 """ echo function stops if peer sets required min echo rx zero """
1214 bfd_session_up(self)
1215 self.test_session.update(required_min_echo_rx=150000)
1216 self.test_session.send_packet()
1217 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1218 # wait for first echo packet
1220 p = self.pg0.wait_for_packet(1)
1221 self.logger.debug(ppp("Got packet:", p))
1222 if p[UDP].dport == BFD.udp_dport_echo:
1223 self.logger.debug(ppp("Looping back packet:", p))
1224 p[Ether].dst = self.pg0.local_mac
1225 self.pg0.add_stream(p)
1228 elif p.haslayer(BFD):
1232 raise Exception(ppp("Received unknown packet:", p))
1233 self.test_session.update(required_min_echo_rx=0)
1234 self.test_session.send_packet()
1235 # echo packets shouldn't arrive anymore
1236 for dummy in range(5):
1237 wait_for_bfd_packet(
1238 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1239 self.test_session.send_packet()
1240 events = self.vapi.collect_events()
1241 self.assert_equal(len(events), 0, "number of bfd events")
1243 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1244 def test_echo_source_removed(self):
1245 """ echo function stops if echo source is removed """
1246 bfd_session_up(self)
1247 self.test_session.update(required_min_echo_rx=150000)
1248 self.test_session.send_packet()
1249 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1250 # wait for first echo packet
1252 p = self.pg0.wait_for_packet(1)
1253 self.logger.debug(ppp("Got packet:", p))
1254 if p[UDP].dport == BFD.udp_dport_echo:
1255 self.logger.debug(ppp("Looping back packet:", p))
1256 p[Ether].dst = self.pg0.local_mac
1257 self.pg0.add_stream(p)
1260 elif p.haslayer(BFD):
1264 raise Exception(ppp("Received unknown packet:", p))
1265 self.vapi.bfd_udp_del_echo_source()
1266 self.test_session.send_packet()
1267 # echo packets shouldn't arrive anymore
1268 for dummy in range(5):
1269 wait_for_bfd_packet(
1270 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1271 self.test_session.send_packet()
1272 events = self.vapi.collect_events()
1273 self.assert_equal(len(events), 0, "number of bfd events")
1275 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1276 def test_stale_echo(self):
1277 """ stale echo packets don't keep a session up """
1278 bfd_session_up(self)
1279 self.test_session.update(required_min_echo_rx=150000)
1280 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1281 self.test_session.send_packet()
1282 # should be turned on - loopback echo packets
1286 for dummy in range(10 * self.vpp_session.detect_mult):
1287 p = self.pg0.wait_for_packet(1)
1288 if p[UDP].dport == BFD.udp_dport_echo:
1289 if echo_packet is None:
1290 self.logger.debug(ppp("Got first echo packet:", p))
1292 timeout_at = time.time() + self.vpp_session.detect_mult * \
1293 self.test_session.required_min_echo_rx / USEC_IN_SEC
1295 self.logger.debug(ppp("Got followup echo packet:", p))
1296 self.logger.debug(ppp("Looping back first echo packet:", p))
1297 echo_packet[Ether].dst = self.pg0.local_mac
1298 self.pg0.add_stream(echo_packet)
1300 elif p.haslayer(BFD):
1301 self.logger.debug(ppp("Got packet:", p))
1302 if "P" in p.sprintf("%BFD.flags%"):
1303 final = self.test_session.create_packet()
1304 final[BFD].flags = "F"
1305 self.test_session.send_packet(final)
1306 if p[BFD].state == BFDState.down:
1307 self.assertIsNotNone(
1309 "Session went down before first echo packet received")
1311 self.assertGreaterEqual(
1313 "Session timeout at %s, but is expected at %s" %
1315 self.assert_equal(p[BFD].diag,
1316 BFDDiagCode.echo_function_failed,
1318 events = self.vapi.collect_events()
1319 self.assert_equal(len(events), 1, "number of bfd events")
1320 self.assert_equal(events[0].state, BFDState.down, BFDState)
1324 raise Exception(ppp("Received unknown packet:", p))
1325 self.test_session.send_packet()
1326 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1328 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1329 def test_invalid_echo_checksum(self):
1330 """ echo packets with invalid checksum don't keep a session up """
1331 bfd_session_up(self)
1332 self.test_session.update(required_min_echo_rx=150000)
1333 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1334 self.test_session.send_packet()
1335 # should be turned on - loopback echo packets
1338 for dummy in range(10 * self.vpp_session.detect_mult):
1339 p = self.pg0.wait_for_packet(1)
1340 if p[UDP].dport == BFD.udp_dport_echo:
1341 self.logger.debug(ppp("Got echo packet:", p))
1342 if timeout_at is None:
1343 timeout_at = time.time() + self.vpp_session.detect_mult * \
1344 self.test_session.required_min_echo_rx / USEC_IN_SEC
1345 p[BFD_vpp_echo].checksum = getrandbits(64)
1346 p[Ether].dst = self.pg0.local_mac
1347 self.logger.debug(ppp("Looping back modified echo packet:", p))
1348 self.pg0.add_stream(p)
1350 elif p.haslayer(BFD):
1351 self.logger.debug(ppp("Got packet:", p))
1352 if "P" in p.sprintf("%BFD.flags%"):
1353 final = self.test_session.create_packet()
1354 final[BFD].flags = "F"
1355 self.test_session.send_packet(final)
1356 if p[BFD].state == BFDState.down:
1357 self.assertIsNotNone(
1359 "Session went down before first echo packet received")
1361 self.assertGreaterEqual(
1363 "Session timeout at %s, but is expected at %s" %
1365 self.assert_equal(p[BFD].diag,
1366 BFDDiagCode.echo_function_failed,
1368 events = self.vapi.collect_events()
1369 self.assert_equal(len(events), 1, "number of bfd events")
1370 self.assert_equal(events[0].state, BFDState.down, BFDState)
1374 raise Exception(ppp("Received unknown packet:", p))
1375 self.test_session.send_packet()
1376 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1378 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1379 def test_admin_up_down(self):
1380 """ put session admin-up and admin-down """
1381 bfd_session_up(self)
1382 self.vpp_session.admin_down()
1383 self.pg0.enable_capture()
1384 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1385 verify_event(self, e, expected_state=BFDState.admin_down)
1386 for dummy in range(2):
1387 p = wait_for_bfd_packet(self)
1388 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1389 # try to bring session up - shouldn't be possible
1390 self.test_session.update(state=BFDState.init)
1391 self.test_session.send_packet()
1392 for dummy in range(2):
1393 p = wait_for_bfd_packet(self)
1394 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1395 self.vpp_session.admin_up()
1396 self.test_session.update(state=BFDState.down)
1397 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1398 verify_event(self, e, expected_state=BFDState.down)
1399 p = wait_for_bfd_packet(
1400 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1401 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1402 self.test_session.send_packet()
1403 p = wait_for_bfd_packet(
1404 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1405 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1406 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1407 verify_event(self, e, expected_state=BFDState.init)
1408 self.test_session.update(state=BFDState.up)
1409 self.test_session.send_packet()
1410 p = wait_for_bfd_packet(
1411 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1412 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1413 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1414 verify_event(self, e, expected_state=BFDState.up)
1416 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1417 def test_config_change_remote_demand(self):
1418 """ configuration change while peer in demand mode """
1419 bfd_session_up(self)
1420 demand = self.test_session.create_packet()
1421 demand[BFD].flags = "D"
1422 self.test_session.send_packet(demand)
1423 self.vpp_session.modify_parameters(
1424 required_min_rx=2 * self.vpp_session.required_min_rx)
1425 p = wait_for_bfd_packet(
1426 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1427 # poll bit must be set
1428 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1429 # terminate poll sequence
1430 final = self.test_session.create_packet()
1431 final[BFD].flags = "D+F"
1432 self.test_session.send_packet(final)
1433 # vpp should be quiet now again
1434 transmit_time = 0.9 \
1435 * max(self.vpp_session.required_min_rx,
1436 self.test_session.desired_min_tx) \
1439 for dummy in range(self.test_session.detect_mult * 2):
1440 time.sleep(transmit_time)
1441 self.test_session.send_packet(demand)
1443 p = wait_for_bfd_packet(self, timeout=0)
1444 self.logger.error(ppp("Received unexpected packet:", p))
1446 except CaptureTimeoutError:
1448 events = self.vapi.collect_events()
1450 self.logger.error("Received unexpected event: %s", e)
1451 self.assert_equal(count, 0, "number of packets received")
1452 self.assert_equal(len(events), 0, "number of events received")
1454 def test_intf_deleted(self):
1455 """ interface with bfd session deleted """
1456 intf = VppLoInterface(self)
1459 sw_if_index = intf.sw_if_index
1460 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1461 vpp_session.add_vpp_config()
1462 vpp_session.admin_up()
1463 intf.remove_vpp_config()
1464 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1465 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1466 self.assertFalse(vpp_session.query_vpp_config())
1469 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1470 class BFD6TestCase(VppTestCase):
1471 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1474 vpp_clock_offset = None
1479 def setUpClass(cls):
1480 super(BFD6TestCase, cls).setUpClass()
1481 cls.vapi.cli("set log class bfd level debug")
1483 cls.create_pg_interfaces([0])
1484 cls.pg0.config_ip6()
1485 cls.pg0.configure_ipv6_neighbors()
1487 cls.pg0.resolve_ndp()
1488 cls.create_loopback_interfaces(1)
1489 cls.loopback0 = cls.lo_interfaces[0]
1490 cls.loopback0.config_ip6()
1491 cls.loopback0.admin_up()
1494 super(BFD6TestCase, cls).tearDownClass()
1498 super(BFD6TestCase, self).setUp()
1499 self.factory = AuthKeyFactory()
1500 self.vapi.want_bfd_events()
1501 self.pg0.enable_capture()
1503 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1504 self.pg0.remote_ip6,
1506 self.vpp_session.add_vpp_config()
1507 self.vpp_session.admin_up()
1508 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1509 self.logger.debug(self.vapi.cli("show adj nbr"))
1511 self.vapi.want_bfd_events(enable_disable=0)
1515 if not self.vpp_dead:
1516 self.vapi.want_bfd_events(enable_disable=0)
1517 self.vapi.collect_events() # clear the event queue
1518 super(BFD6TestCase, self).tearDown()
1520 def test_session_up(self):
1521 """ bring BFD session up """
1522 bfd_session_up(self)
1524 def test_session_up_by_ip(self):
1525 """ bring BFD session up - first frame looked up by address pair """
1526 self.logger.info("BFD: Sending Slow control frame")
1527 self.test_session.update(my_discriminator=randint(0, 40000000))
1528 self.test_session.send_packet()
1529 self.pg0.enable_capture()
1530 p = self.pg0.wait_for_packet(1)
1531 self.assert_equal(p[BFD].your_discriminator,
1532 self.test_session.my_discriminator,
1533 "BFD - your discriminator")
1534 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1535 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1537 self.logger.info("BFD: Waiting for event")
1538 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1539 verify_event(self, e, expected_state=BFDState.init)
1540 self.logger.info("BFD: Sending Up")
1541 self.test_session.send_packet()
1542 self.logger.info("BFD: Waiting for event")
1543 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1544 verify_event(self, e, expected_state=BFDState.up)
1545 self.logger.info("BFD: Session is Up")
1546 self.test_session.update(state=BFDState.up)
1547 self.test_session.send_packet()
1548 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1550 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1551 def test_hold_up(self):
1552 """ hold BFD session up """
1553 bfd_session_up(self)
1554 for dummy in range(self.test_session.detect_mult * 2):
1555 wait_for_bfd_packet(self)
1556 self.test_session.send_packet()
1557 self.assert_equal(len(self.vapi.collect_events()), 0,
1558 "number of bfd events")
1559 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1561 def test_echo_looped_back(self):
1562 """ echo packets looped back """
1563 # don't need a session in this case..
1564 self.vpp_session.remove_vpp_config()
1565 self.pg0.enable_capture()
1566 echo_packet_count = 10
1567 # random source port low enough to increment a few times..
1568 udp_sport_tx = randint(1, 50000)
1569 udp_sport_rx = udp_sport_tx
1570 echo_packet = (Ether(src=self.pg0.remote_mac,
1571 dst=self.pg0.local_mac) /
1572 IPv6(src=self.pg0.remote_ip6,
1573 dst=self.pg0.remote_ip6) /
1574 UDP(dport=BFD.udp_dport_echo) /
1575 Raw("this should be looped back"))
1576 for dummy in range(echo_packet_count):
1577 self.sleep(.01, "delay between echo packets")
1578 echo_packet[UDP].sport = udp_sport_tx
1580 self.logger.debug(ppp("Sending packet:", echo_packet))
1581 self.pg0.add_stream(echo_packet)
1583 for dummy in range(echo_packet_count):
1584 p = self.pg0.wait_for_packet(1)
1585 self.logger.debug(ppp("Got packet:", p))
1587 self.assert_equal(self.pg0.remote_mac,
1588 ether.dst, "Destination MAC")
1589 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1591 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1592 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1594 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1595 "UDP destination port")
1596 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1598 # need to compare the hex payload here, otherwise BFD_vpp_echo
1600 self.assertEqual(str(p[UDP].payload),
1601 str(echo_packet[UDP].payload),
1602 "Received packet is not the echo packet sent")
1603 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1604 "ECHO packet identifier for test purposes)")
1605 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1606 "ECHO packet identifier for test purposes)")
1608 def test_echo(self):
1609 """ echo function """
1610 bfd_session_up(self)
1611 self.test_session.update(required_min_echo_rx=150000)
1612 self.test_session.send_packet()
1613 detection_time = self.test_session.detect_mult *\
1614 self.vpp_session.required_min_rx / USEC_IN_SEC
1615 # echo shouldn't work without echo source set
1616 for dummy in range(10):
1617 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1618 self.sleep(sleep, "delay before sending bfd packet")
1619 self.test_session.send_packet()
1620 p = wait_for_bfd_packet(
1621 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1622 self.assert_equal(p[BFD].required_min_rx_interval,
1623 self.vpp_session.required_min_rx,
1624 "BFD required min rx interval")
1625 self.test_session.send_packet()
1626 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1628 # should be turned on - loopback echo packets
1629 for dummy in range(3):
1630 loop_until = time.time() + 0.75 * detection_time
1631 while time.time() < loop_until:
1632 p = self.pg0.wait_for_packet(1)
1633 self.logger.debug(ppp("Got packet:", p))
1634 if p[UDP].dport == BFD.udp_dport_echo:
1636 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1637 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1638 "BFD ECHO src IP equal to loopback IP")
1639 self.logger.debug(ppp("Looping back packet:", p))
1640 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1641 "ECHO packet destination MAC address")
1642 p[Ether].dst = self.pg0.local_mac
1643 self.pg0.add_stream(p)
1646 elif p.haslayer(BFD):
1648 self.assertGreaterEqual(
1649 p[BFD].required_min_rx_interval,
1651 if "P" in p.sprintf("%BFD.flags%"):
1652 final = self.test_session.create_packet()
1653 final[BFD].flags = "F"
1654 self.test_session.send_packet(final)
1656 raise Exception(ppp("Received unknown packet:", p))
1658 self.assert_equal(len(self.vapi.collect_events()), 0,
1659 "number of bfd events")
1660 self.test_session.send_packet()
1661 self.assertTrue(echo_seen, "No echo packets received")
1663 def test_intf_deleted(self):
1664 """ interface with bfd session deleted """
1665 intf = VppLoInterface(self)
1668 sw_if_index = intf.sw_if_index
1669 vpp_session = VppBFDUDPSession(
1670 self, intf, intf.remote_ip6, af=AF_INET6)
1671 vpp_session.add_vpp_config()
1672 vpp_session.admin_up()
1673 intf.remove_vpp_config()
1674 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1675 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1676 self.assertFalse(vpp_session.query_vpp_config())
1679 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1680 class BFDFIBTestCase(VppTestCase):
1681 """ BFD-FIB interactions (IPv6) """
1687 super(BFDFIBTestCase, self).setUp()
1688 self.create_pg_interfaces(range(1))
1690 self.vapi.want_bfd_events()
1691 self.pg0.enable_capture()
1693 for i in self.pg_interfaces:
1696 i.configure_ipv6_neighbors()
1699 if not self.vpp_dead:
1700 self.vapi.want_bfd_events(enable_disable=0)
1702 super(BFDFIBTestCase, self).tearDown()
1705 def pkt_is_not_data_traffic(p):
1706 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1707 if p.haslayer(BFD) or is_ipv6_misc(p):
1711 def test_session_with_fib(self):
1712 """ BFD-FIB interactions """
1714 # packets to match against both of the routes
1715 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1716 IPv6(src="3001::1", dst="2001::1") /
1717 UDP(sport=1234, dport=1234) /
1719 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1720 IPv6(src="3001::1", dst="2002::1") /
1721 UDP(sport=1234, dport=1234) /
1724 # A recursive and a non-recursive route via a next-hop that
1725 # will have a BFD session
1726 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1727 [VppRoutePath(self.pg0.remote_ip6,
1728 self.pg0.sw_if_index,
1729 proto=DpoProto.DPO_PROTO_IP6)],
1731 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1732 [VppRoutePath(self.pg0.remote_ip6,
1734 proto=DpoProto.DPO_PROTO_IP6)],
1736 ip_2001_s_64.add_vpp_config()
1737 ip_2002_s_64.add_vpp_config()
1739 # bring the session up now the routes are present
1740 self.vpp_session = VppBFDUDPSession(self,
1742 self.pg0.remote_ip6,
1744 self.vpp_session.add_vpp_config()
1745 self.vpp_session.admin_up()
1746 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1748 # session is up - traffic passes
1749 bfd_session_up(self)
1751 self.pg0.add_stream(p)
1754 captured = self.pg0.wait_for_packet(
1756 filter_out_fn=self.pkt_is_not_data_traffic)
1757 self.assertEqual(captured[IPv6].dst,
1760 # session is up - traffic is dropped
1761 bfd_session_down(self)
1763 self.pg0.add_stream(p)
1765 with self.assertRaises(CaptureTimeoutError):
1766 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1768 # session is up - traffic passes
1769 bfd_session_up(self)
1771 self.pg0.add_stream(p)
1774 captured = self.pg0.wait_for_packet(
1776 filter_out_fn=self.pkt_is_not_data_traffic)
1777 self.assertEqual(captured[IPv6].dst,
1781 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1782 class BFDSHA1TestCase(VppTestCase):
1783 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1786 vpp_clock_offset = None
1791 def setUpClass(cls):
1792 super(BFDSHA1TestCase, cls).setUpClass()
1793 cls.vapi.cli("set log class bfd level debug")
1795 cls.create_pg_interfaces([0])
1796 cls.pg0.config_ip4()
1798 cls.pg0.resolve_arp()
1801 super(BFDSHA1TestCase, cls).tearDownClass()
1805 super(BFDSHA1TestCase, self).setUp()
1806 self.factory = AuthKeyFactory()
1807 self.vapi.want_bfd_events()
1808 self.pg0.enable_capture()
1811 if not self.vpp_dead:
1812 self.vapi.want_bfd_events(enable_disable=0)
1813 self.vapi.collect_events() # clear the event queue
1814 super(BFDSHA1TestCase, self).tearDown()
1816 def test_session_up(self):
1817 """ bring BFD session up """
1818 key = self.factory.create_random_key(self)
1819 key.add_vpp_config()
1820 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1821 self.pg0.remote_ip4,
1823 self.vpp_session.add_vpp_config()
1824 self.vpp_session.admin_up()
1825 self.test_session = BFDTestSession(
1826 self, self.pg0, AF_INET, sha1_key=key,
1827 bfd_key_id=self.vpp_session.bfd_key_id)
1828 bfd_session_up(self)
1830 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1831 def test_hold_up(self):
1832 """ hold BFD session up """
1833 key = self.factory.create_random_key(self)
1834 key.add_vpp_config()
1835 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1836 self.pg0.remote_ip4,
1838 self.vpp_session.add_vpp_config()
1839 self.vpp_session.admin_up()
1840 self.test_session = BFDTestSession(
1841 self, self.pg0, AF_INET, sha1_key=key,
1842 bfd_key_id=self.vpp_session.bfd_key_id)
1843 bfd_session_up(self)
1844 for dummy in range(self.test_session.detect_mult * 2):
1845 wait_for_bfd_packet(self)
1846 self.test_session.send_packet()
1847 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1849 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1850 def test_hold_up_meticulous(self):
1851 """ hold BFD session up - meticulous auth """
1852 key = self.factory.create_random_key(
1853 self, BFDAuthType.meticulous_keyed_sha1)
1854 key.add_vpp_config()
1855 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1856 self.pg0.remote_ip4, sha1_key=key)
1857 self.vpp_session.add_vpp_config()
1858 self.vpp_session.admin_up()
1859 # specify sequence number so that it wraps
1860 self.test_session = BFDTestSession(
1861 self, self.pg0, AF_INET, sha1_key=key,
1862 bfd_key_id=self.vpp_session.bfd_key_id,
1863 our_seq_number=0xFFFFFFFF - 4)
1864 bfd_session_up(self)
1865 for dummy in range(30):
1866 wait_for_bfd_packet(self)
1867 self.test_session.inc_seq_num()
1868 self.test_session.send_packet()
1869 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1871 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1872 def test_send_bad_seq_number(self):
1873 """ session is not kept alive by msgs with bad sequence numbers"""
1874 key = self.factory.create_random_key(
1875 self, BFDAuthType.meticulous_keyed_sha1)
1876 key.add_vpp_config()
1877 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1878 self.pg0.remote_ip4, sha1_key=key)
1879 self.vpp_session.add_vpp_config()
1880 self.test_session = BFDTestSession(
1881 self, self.pg0, AF_INET, sha1_key=key,
1882 bfd_key_id=self.vpp_session.bfd_key_id)
1883 bfd_session_up(self)
1884 detection_time = self.test_session.detect_mult *\
1885 self.vpp_session.required_min_rx / USEC_IN_SEC
1886 send_until = time.time() + 2 * detection_time
1887 while time.time() < send_until:
1888 self.test_session.send_packet()
1889 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1890 "time between bfd packets")
1891 e = self.vapi.collect_events()
1892 # session should be down now, because the sequence numbers weren't
1894 self.assert_equal(len(e), 1, "number of bfd events")
1895 verify_event(self, e[0], expected_state=BFDState.down)
1897 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1898 legitimate_test_session,
1900 rogue_bfd_values=None):
1901 """ execute a rogue session interaction scenario
1903 1. create vpp session, add config
1904 2. bring the legitimate session up
1905 3. copy the bfd values from legitimate session to rogue session
1906 4. apply rogue_bfd_values to rogue session
1907 5. set rogue session state to down
1908 6. send message to take the session down from the rogue session
1909 7. assert that the legitimate session is unaffected
1912 self.vpp_session = vpp_bfd_udp_session
1913 self.vpp_session.add_vpp_config()
1914 self.test_session = legitimate_test_session
1915 # bring vpp session up
1916 bfd_session_up(self)
1917 # send packet from rogue session
1918 rogue_test_session.update(
1919 my_discriminator=self.test_session.my_discriminator,
1920 your_discriminator=self.test_session.your_discriminator,
1921 desired_min_tx=self.test_session.desired_min_tx,
1922 required_min_rx=self.test_session.required_min_rx,
1923 detect_mult=self.test_session.detect_mult,
1924 diag=self.test_session.diag,
1925 state=self.test_session.state,
1926 auth_type=self.test_session.auth_type)
1927 if rogue_bfd_values:
1928 rogue_test_session.update(**rogue_bfd_values)
1929 rogue_test_session.update(state=BFDState.down)
1930 rogue_test_session.send_packet()
1931 wait_for_bfd_packet(self)
1932 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1934 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1935 def test_mismatch_auth(self):
1936 """ session is not brought down by unauthenticated msg """
1937 key = self.factory.create_random_key(self)
1938 key.add_vpp_config()
1939 vpp_session = VppBFDUDPSession(
1940 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1941 legitimate_test_session = BFDTestSession(
1942 self, self.pg0, AF_INET, sha1_key=key,
1943 bfd_key_id=vpp_session.bfd_key_id)
1944 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1945 self.execute_rogue_session_scenario(vpp_session,
1946 legitimate_test_session,
1949 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1950 def test_mismatch_bfd_key_id(self):
1951 """ session is not brought down by msg with non-existent key-id """
1952 key = self.factory.create_random_key(self)
1953 key.add_vpp_config()
1954 vpp_session = VppBFDUDPSession(
1955 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1956 # pick a different random bfd key id
1958 while x == vpp_session.bfd_key_id:
1960 legitimate_test_session = BFDTestSession(
1961 self, self.pg0, AF_INET, sha1_key=key,
1962 bfd_key_id=vpp_session.bfd_key_id)
1963 rogue_test_session = BFDTestSession(
1964 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1965 self.execute_rogue_session_scenario(vpp_session,
1966 legitimate_test_session,
1969 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1970 def test_mismatched_auth_type(self):
1971 """ session is not brought down by msg with wrong auth type """
1972 key = self.factory.create_random_key(self)
1973 key.add_vpp_config()
1974 vpp_session = VppBFDUDPSession(
1975 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1976 legitimate_test_session = BFDTestSession(
1977 self, self.pg0, AF_INET, sha1_key=key,
1978 bfd_key_id=vpp_session.bfd_key_id)
1979 rogue_test_session = BFDTestSession(
1980 self, self.pg0, AF_INET, sha1_key=key,
1981 bfd_key_id=vpp_session.bfd_key_id)
1982 self.execute_rogue_session_scenario(
1983 vpp_session, legitimate_test_session, rogue_test_session,
1984 {'auth_type': BFDAuthType.keyed_md5})
1986 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1987 def test_restart(self):
1988 """ simulate remote peer restart and resynchronization """
1989 key = self.factory.create_random_key(
1990 self, BFDAuthType.meticulous_keyed_sha1)
1991 key.add_vpp_config()
1992 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1993 self.pg0.remote_ip4, sha1_key=key)
1994 self.vpp_session.add_vpp_config()
1995 self.test_session = BFDTestSession(
1996 self, self.pg0, AF_INET, sha1_key=key,
1997 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1998 bfd_session_up(self)
1999 # don't send any packets for 2*detection_time
2000 detection_time = self.test_session.detect_mult *\
2001 self.vpp_session.required_min_rx / USEC_IN_SEC
2002 self.sleep(2 * detection_time, "simulating peer restart")
2003 events = self.vapi.collect_events()
2004 self.assert_equal(len(events), 1, "number of bfd events")
2005 verify_event(self, events[0], expected_state=BFDState.down)
2006 self.test_session.update(state=BFDState.down)
2007 # reset sequence number
2008 self.test_session.our_seq_number = 0
2009 self.test_session.vpp_seq_number = None
2010 # now throw away any pending packets
2011 self.pg0.enable_capture()
2012 bfd_session_up(self)
2015 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2016 class BFDAuthOnOffTestCase(VppTestCase):
2017 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2024 def setUpClass(cls):
2025 super(BFDAuthOnOffTestCase, cls).setUpClass()
2026 cls.vapi.cli("set log class bfd level debug")
2028 cls.create_pg_interfaces([0])
2029 cls.pg0.config_ip4()
2031 cls.pg0.resolve_arp()
2034 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2038 super(BFDAuthOnOffTestCase, self).setUp()
2039 self.factory = AuthKeyFactory()
2040 self.vapi.want_bfd_events()
2041 self.pg0.enable_capture()
2044 if not self.vpp_dead:
2045 self.vapi.want_bfd_events(enable_disable=0)
2046 self.vapi.collect_events() # clear the event queue
2047 super(BFDAuthOnOffTestCase, self).tearDown()
2049 def test_auth_on_immediate(self):
2050 """ turn auth on without disturbing session state (immediate) """
2051 key = self.factory.create_random_key(self)
2052 key.add_vpp_config()
2053 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2054 self.pg0.remote_ip4)
2055 self.vpp_session.add_vpp_config()
2056 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2057 bfd_session_up(self)
2058 for dummy in range(self.test_session.detect_mult * 2):
2059 p = wait_for_bfd_packet(self)
2060 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2061 self.test_session.send_packet()
2062 self.vpp_session.activate_auth(key)
2063 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2064 self.test_session.sha1_key = key
2065 for dummy in range(self.test_session.detect_mult * 2):
2066 p = wait_for_bfd_packet(self)
2067 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2068 self.test_session.send_packet()
2069 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2070 self.assert_equal(len(self.vapi.collect_events()), 0,
2071 "number of bfd events")
2073 def test_auth_off_immediate(self):
2074 """ turn auth off without disturbing session state (immediate) """
2075 key = self.factory.create_random_key(self)
2076 key.add_vpp_config()
2077 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2078 self.pg0.remote_ip4, sha1_key=key)
2079 self.vpp_session.add_vpp_config()
2080 self.test_session = BFDTestSession(
2081 self, self.pg0, AF_INET, sha1_key=key,
2082 bfd_key_id=self.vpp_session.bfd_key_id)
2083 bfd_session_up(self)
2084 # self.vapi.want_bfd_events(enable_disable=0)
2085 for dummy in range(self.test_session.detect_mult * 2):
2086 p = wait_for_bfd_packet(self)
2087 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2088 self.test_session.inc_seq_num()
2089 self.test_session.send_packet()
2090 self.vpp_session.deactivate_auth()
2091 self.test_session.bfd_key_id = None
2092 self.test_session.sha1_key = None
2093 for dummy in range(self.test_session.detect_mult * 2):
2094 p = wait_for_bfd_packet(self)
2095 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2096 self.test_session.inc_seq_num()
2097 self.test_session.send_packet()
2098 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2099 self.assert_equal(len(self.vapi.collect_events()), 0,
2100 "number of bfd events")
2102 def test_auth_change_key_immediate(self):
2103 """ change auth key without disturbing session state (immediate) """
2104 key1 = self.factory.create_random_key(self)
2105 key1.add_vpp_config()
2106 key2 = self.factory.create_random_key(self)
2107 key2.add_vpp_config()
2108 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2109 self.pg0.remote_ip4, sha1_key=key1)
2110 self.vpp_session.add_vpp_config()
2111 self.test_session = BFDTestSession(
2112 self, self.pg0, AF_INET, sha1_key=key1,
2113 bfd_key_id=self.vpp_session.bfd_key_id)
2114 bfd_session_up(self)
2115 for dummy in range(self.test_session.detect_mult * 2):
2116 p = wait_for_bfd_packet(self)
2117 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2118 self.test_session.send_packet()
2119 self.vpp_session.activate_auth(key2)
2120 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2121 self.test_session.sha1_key = key2
2122 for dummy in range(self.test_session.detect_mult * 2):
2123 p = wait_for_bfd_packet(self)
2124 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2125 self.test_session.send_packet()
2126 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2127 self.assert_equal(len(self.vapi.collect_events()), 0,
2128 "number of bfd events")
2130 def test_auth_on_delayed(self):
2131 """ turn auth on without disturbing session state (delayed) """
2132 key = self.factory.create_random_key(self)
2133 key.add_vpp_config()
2134 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2135 self.pg0.remote_ip4)
2136 self.vpp_session.add_vpp_config()
2137 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2138 bfd_session_up(self)
2139 for dummy in range(self.test_session.detect_mult * 2):
2140 wait_for_bfd_packet(self)
2141 self.test_session.send_packet()
2142 self.vpp_session.activate_auth(key, delayed=True)
2143 for dummy in range(self.test_session.detect_mult * 2):
2144 p = wait_for_bfd_packet(self)
2145 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2146 self.test_session.send_packet()
2147 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2148 self.test_session.sha1_key = key
2149 self.test_session.send_packet()
2150 for dummy in range(self.test_session.detect_mult * 2):
2151 p = wait_for_bfd_packet(self)
2152 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2153 self.test_session.send_packet()
2154 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2155 self.assert_equal(len(self.vapi.collect_events()), 0,
2156 "number of bfd events")
2158 def test_auth_off_delayed(self):
2159 """ turn auth off without disturbing session state (delayed) """
2160 key = self.factory.create_random_key(self)
2161 key.add_vpp_config()
2162 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2163 self.pg0.remote_ip4, sha1_key=key)
2164 self.vpp_session.add_vpp_config()
2165 self.test_session = BFDTestSession(
2166 self, self.pg0, AF_INET, sha1_key=key,
2167 bfd_key_id=self.vpp_session.bfd_key_id)
2168 bfd_session_up(self)
2169 for dummy in range(self.test_session.detect_mult * 2):
2170 p = wait_for_bfd_packet(self)
2171 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2172 self.test_session.send_packet()
2173 self.vpp_session.deactivate_auth(delayed=True)
2174 for dummy in range(self.test_session.detect_mult * 2):
2175 p = wait_for_bfd_packet(self)
2176 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2177 self.test_session.send_packet()
2178 self.test_session.bfd_key_id = None
2179 self.test_session.sha1_key = None
2180 self.test_session.send_packet()
2181 for dummy in range(self.test_session.detect_mult * 2):
2182 p = wait_for_bfd_packet(self)
2183 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2184 self.test_session.send_packet()
2185 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2186 self.assert_equal(len(self.vapi.collect_events()), 0,
2187 "number of bfd events")
2189 def test_auth_change_key_delayed(self):
2190 """ change auth key without disturbing session state (delayed) """
2191 key1 = self.factory.create_random_key(self)
2192 key1.add_vpp_config()
2193 key2 = self.factory.create_random_key(self)
2194 key2.add_vpp_config()
2195 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2196 self.pg0.remote_ip4, sha1_key=key1)
2197 self.vpp_session.add_vpp_config()
2198 self.vpp_session.admin_up()
2199 self.test_session = BFDTestSession(
2200 self, self.pg0, AF_INET, sha1_key=key1,
2201 bfd_key_id=self.vpp_session.bfd_key_id)
2202 bfd_session_up(self)
2203 for dummy in range(self.test_session.detect_mult * 2):
2204 p = wait_for_bfd_packet(self)
2205 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2206 self.test_session.send_packet()
2207 self.vpp_session.activate_auth(key2, delayed=True)
2208 for dummy in range(self.test_session.detect_mult * 2):
2209 p = wait_for_bfd_packet(self)
2210 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2211 self.test_session.send_packet()
2212 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2213 self.test_session.sha1_key = key2
2214 self.test_session.send_packet()
2215 for dummy in range(self.test_session.detect_mult * 2):
2216 p = wait_for_bfd_packet(self)
2217 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2218 self.test_session.send_packet()
2219 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2220 self.assert_equal(len(self.vapi.collect_events()), 0,
2221 "number of bfd events")
2224 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2225 class BFDCLITestCase(VppTestCase):
2226 """Bidirectional Forwarding Detection (BFD) (CLI) """
2230 def setUpClass(cls):
2231 super(BFDCLITestCase, cls).setUpClass()
2232 cls.vapi.cli("set log class bfd level debug")
2234 cls.create_pg_interfaces((0,))
2235 cls.pg0.config_ip4()
2236 cls.pg0.config_ip6()
2237 cls.pg0.resolve_arp()
2238 cls.pg0.resolve_ndp()
2241 super(BFDCLITestCase, cls).tearDownClass()
2245 super(BFDCLITestCase, self).setUp()
2246 self.factory = AuthKeyFactory()
2247 self.pg0.enable_capture()
2251 self.vapi.want_bfd_events(enable_disable=0)
2252 except UnexpectedApiReturnValueError:
2253 # some tests aren't subscribed, so this is not an issue
2255 self.vapi.collect_events() # clear the event queue
2256 super(BFDCLITestCase, self).tearDown()
2258 def cli_verify_no_response(self, cli):
2259 """ execute a CLI, asserting that the response is empty """
2260 self.assert_equal(self.vapi.cli(cli),
2262 "CLI command response")
2264 def cli_verify_response(self, cli, expected):
2265 """ execute a CLI, asserting that the response matches expectation """
2266 self.assert_equal(self.vapi.cli(cli).strip(),
2268 "CLI command response")
2270 def test_show(self):
2271 """ show commands """
2272 k1 = self.factory.create_random_key(self)
2274 k2 = self.factory.create_random_key(
2275 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2277 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2279 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2282 self.logger.info(self.vapi.ppcli("show bfd keys"))
2283 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2284 self.logger.info(self.vapi.ppcli("show bfd"))
2286 def test_set_del_sha1_key(self):
2287 """ set/delete SHA1 auth key """
2288 k = self.factory.create_random_key(self)
2289 self.registry.register(k, self.logger)
2290 self.cli_verify_no_response(
2291 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2293 "".join("{:02x}".format(ord(c)) for c in k.key)))
2294 self.assertTrue(k.query_vpp_config())
2295 self.vpp_session = VppBFDUDPSession(
2296 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2297 self.vpp_session.add_vpp_config()
2298 self.test_session = \
2299 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2300 bfd_key_id=self.vpp_session.bfd_key_id)
2301 self.vapi.want_bfd_events()
2302 bfd_session_up(self)
2303 bfd_session_down(self)
2304 # try to replace the secret for the key - should fail because the key
2306 k2 = self.factory.create_random_key(self)
2307 self.cli_verify_response(
2308 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2310 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2311 "bfd key set: `bfd_auth_set_key' API call failed, "
2312 "rv=-103:BFD object in use")
2313 # manipulating the session using old secret should still work
2314 bfd_session_up(self)
2315 bfd_session_down(self)
2316 self.vpp_session.remove_vpp_config()
2317 self.cli_verify_no_response(
2318 "bfd key del conf-key-id %s" % k.conf_key_id)
2319 self.assertFalse(k.query_vpp_config())
2321 def test_set_del_meticulous_sha1_key(self):
2322 """ set/delete meticulous SHA1 auth key """
2323 k = self.factory.create_random_key(
2324 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2325 self.registry.register(k, self.logger)
2326 self.cli_verify_no_response(
2327 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2329 "".join("{:02x}".format(ord(c)) for c in k.key)))
2330 self.assertTrue(k.query_vpp_config())
2331 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2332 self.pg0.remote_ip6, af=AF_INET6,
2334 self.vpp_session.add_vpp_config()
2335 self.vpp_session.admin_up()
2336 self.test_session = \
2337 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2338 bfd_key_id=self.vpp_session.bfd_key_id)
2339 self.vapi.want_bfd_events()
2340 bfd_session_up(self)
2341 bfd_session_down(self)
2342 # try to replace the secret for the key - should fail because the key
2344 k2 = self.factory.create_random_key(self)
2345 self.cli_verify_response(
2346 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2348 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2349 "bfd key set: `bfd_auth_set_key' API call failed, "
2350 "rv=-103:BFD object in use")
2351 # manipulating the session using old secret should still work
2352 bfd_session_up(self)
2353 bfd_session_down(self)
2354 self.vpp_session.remove_vpp_config()
2355 self.cli_verify_no_response(
2356 "bfd key del conf-key-id %s" % k.conf_key_id)
2357 self.assertFalse(k.query_vpp_config())
2359 def test_add_mod_del_bfd_udp(self):
2360 """ create/modify/delete IPv4 BFD UDP session """
2361 vpp_session = VppBFDUDPSession(
2362 self, self.pg0, self.pg0.remote_ip4)
2363 self.registry.register(vpp_session, self.logger)
2364 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2365 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2366 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2367 self.pg0.remote_ip4,
2368 vpp_session.desired_min_tx,
2369 vpp_session.required_min_rx,
2370 vpp_session.detect_mult)
2371 self.cli_verify_no_response(cli_add_cmd)
2372 # 2nd add should fail
2373 self.cli_verify_response(
2375 "bfd udp session add: `bfd_add_add_session' API call"
2376 " failed, rv=-101:Duplicate BFD object")
2377 verify_bfd_session_config(self, vpp_session)
2378 mod_session = VppBFDUDPSession(
2379 self, self.pg0, self.pg0.remote_ip4,
2380 required_min_rx=2 * vpp_session.required_min_rx,
2381 desired_min_tx=3 * vpp_session.desired_min_tx,
2382 detect_mult=4 * vpp_session.detect_mult)
2383 self.cli_verify_no_response(
2384 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2385 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2386 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2387 mod_session.desired_min_tx, mod_session.required_min_rx,
2388 mod_session.detect_mult))
2389 verify_bfd_session_config(self, mod_session)
2390 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2391 "peer-addr %s" % (self.pg0.name,
2392 self.pg0.local_ip4, self.pg0.remote_ip4)
2393 self.cli_verify_no_response(cli_del_cmd)
2394 # 2nd del is expected to fail
2395 self.cli_verify_response(
2396 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2397 " failed, rv=-102:No such BFD object")
2398 self.assertFalse(vpp_session.query_vpp_config())
2400 def test_add_mod_del_bfd_udp6(self):
2401 """ create/modify/delete IPv6 BFD UDP session """
2402 vpp_session = VppBFDUDPSession(
2403 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2404 self.registry.register(vpp_session, self.logger)
2405 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2406 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2407 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2408 self.pg0.remote_ip6,
2409 vpp_session.desired_min_tx,
2410 vpp_session.required_min_rx,
2411 vpp_session.detect_mult)
2412 self.cli_verify_no_response(cli_add_cmd)
2413 # 2nd add should fail
2414 self.cli_verify_response(
2416 "bfd udp session add: `bfd_add_add_session' API call"
2417 " failed, rv=-101:Duplicate BFD object")
2418 verify_bfd_session_config(self, vpp_session)
2419 mod_session = VppBFDUDPSession(
2420 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2421 required_min_rx=2 * vpp_session.required_min_rx,
2422 desired_min_tx=3 * vpp_session.desired_min_tx,
2423 detect_mult=4 * vpp_session.detect_mult)
2424 self.cli_verify_no_response(
2425 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2426 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2427 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2428 mod_session.desired_min_tx,
2429 mod_session.required_min_rx, mod_session.detect_mult))
2430 verify_bfd_session_config(self, mod_session)
2431 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2432 "peer-addr %s" % (self.pg0.name,
2433 self.pg0.local_ip6, self.pg0.remote_ip6)
2434 self.cli_verify_no_response(cli_del_cmd)
2435 # 2nd del is expected to fail
2436 self.cli_verify_response(
2438 "bfd udp session del: `bfd_udp_del_session' API call"
2439 " failed, rv=-102:No such BFD object")
2440 self.assertFalse(vpp_session.query_vpp_config())
2442 def test_add_mod_del_bfd_udp_auth(self):
2443 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2444 key = self.factory.create_random_key(self)
2445 key.add_vpp_config()
2446 vpp_session = VppBFDUDPSession(
2447 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2448 self.registry.register(vpp_session, self.logger)
2449 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2450 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2451 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2452 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2453 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2454 vpp_session.detect_mult, key.conf_key_id,
2455 vpp_session.bfd_key_id)
2456 self.cli_verify_no_response(cli_add_cmd)
2457 # 2nd add should fail
2458 self.cli_verify_response(
2460 "bfd udp session add: `bfd_add_add_session' API call"
2461 " failed, rv=-101:Duplicate BFD object")
2462 verify_bfd_session_config(self, vpp_session)
2463 mod_session = VppBFDUDPSession(
2464 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2465 bfd_key_id=vpp_session.bfd_key_id,
2466 required_min_rx=2 * vpp_session.required_min_rx,
2467 desired_min_tx=3 * vpp_session.desired_min_tx,
2468 detect_mult=4 * vpp_session.detect_mult)
2469 self.cli_verify_no_response(
2470 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2471 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2472 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2473 mod_session.desired_min_tx,
2474 mod_session.required_min_rx, mod_session.detect_mult))
2475 verify_bfd_session_config(self, mod_session)
2476 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2477 "peer-addr %s" % (self.pg0.name,
2478 self.pg0.local_ip4, self.pg0.remote_ip4)
2479 self.cli_verify_no_response(cli_del_cmd)
2480 # 2nd del is expected to fail
2481 self.cli_verify_response(
2483 "bfd udp session del: `bfd_udp_del_session' API call"
2484 " failed, rv=-102:No such BFD object")
2485 self.assertFalse(vpp_session.query_vpp_config())
2487 def test_add_mod_del_bfd_udp6_auth(self):
2488 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2489 key = self.factory.create_random_key(
2490 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2491 key.add_vpp_config()
2492 vpp_session = VppBFDUDPSession(
2493 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2494 self.registry.register(vpp_session, self.logger)
2495 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2496 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2497 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2498 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2499 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2500 vpp_session.detect_mult, key.conf_key_id,
2501 vpp_session.bfd_key_id)
2502 self.cli_verify_no_response(cli_add_cmd)
2503 # 2nd add should fail
2504 self.cli_verify_response(
2506 "bfd udp session add: `bfd_add_add_session' API call"
2507 " failed, rv=-101:Duplicate BFD object")
2508 verify_bfd_session_config(self, vpp_session)
2509 mod_session = VppBFDUDPSession(
2510 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2511 bfd_key_id=vpp_session.bfd_key_id,
2512 required_min_rx=2 * vpp_session.required_min_rx,
2513 desired_min_tx=3 * vpp_session.desired_min_tx,
2514 detect_mult=4 * vpp_session.detect_mult)
2515 self.cli_verify_no_response(
2516 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2517 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2518 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2519 mod_session.desired_min_tx,
2520 mod_session.required_min_rx, mod_session.detect_mult))
2521 verify_bfd_session_config(self, mod_session)
2522 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2523 "peer-addr %s" % (self.pg0.name,
2524 self.pg0.local_ip6, self.pg0.remote_ip6)
2525 self.cli_verify_no_response(cli_del_cmd)
2526 # 2nd del is expected to fail
2527 self.cli_verify_response(
2529 "bfd udp session del: `bfd_udp_del_session' API call"
2530 " failed, rv=-102:No such BFD object")
2531 self.assertFalse(vpp_session.query_vpp_config())
2533 def test_auth_on_off(self):
2534 """ turn authentication on and off """
2535 key = self.factory.create_random_key(
2536 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2537 key.add_vpp_config()
2538 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2539 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2541 session.add_vpp_config()
2543 "bfd udp session auth activate interface %s local-addr %s "\
2544 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2545 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2546 key.conf_key_id, auth_session.bfd_key_id)
2547 self.cli_verify_no_response(cli_activate)
2548 verify_bfd_session_config(self, auth_session)
2549 self.cli_verify_no_response(cli_activate)
2550 verify_bfd_session_config(self, auth_session)
2552 "bfd udp session auth deactivate interface %s local-addr %s "\
2554 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2555 self.cli_verify_no_response(cli_deactivate)
2556 verify_bfd_session_config(self, session)
2557 self.cli_verify_no_response(cli_deactivate)
2558 verify_bfd_session_config(self, session)
2560 def test_auth_on_off_delayed(self):
2561 """ turn authentication on and off (delayed) """
2562 key = self.factory.create_random_key(
2563 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2564 key.add_vpp_config()
2565 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2566 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2568 session.add_vpp_config()
2570 "bfd udp session auth activate interface %s local-addr %s "\
2571 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2572 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2573 key.conf_key_id, auth_session.bfd_key_id)
2574 self.cli_verify_no_response(cli_activate)
2575 verify_bfd_session_config(self, auth_session)
2576 self.cli_verify_no_response(cli_activate)
2577 verify_bfd_session_config(self, auth_session)
2579 "bfd udp session auth deactivate interface %s local-addr %s "\
2580 "peer-addr %s delayed yes"\
2581 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2582 self.cli_verify_no_response(cli_deactivate)
2583 verify_bfd_session_config(self, session)
2584 self.cli_verify_no_response(cli_deactivate)
2585 verify_bfd_session_config(self, session)
2587 def test_admin_up_down(self):
2588 """ put session admin-up and admin-down """
2589 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2590 session.add_vpp_config()
2592 "bfd udp session set-flags admin down interface %s local-addr %s "\
2594 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2596 "bfd udp session set-flags admin up interface %s local-addr %s "\
2598 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2599 self.cli_verify_no_response(cli_down)
2600 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2601 self.cli_verify_no_response(cli_up)
2602 verify_bfd_session_config(self, session, state=BFDState.down)
2604 def test_set_del_udp_echo_source(self):
2605 """ set/del udp echo source """
2606 self.create_loopback_interfaces(1)
2607 self.loopback0 = self.lo_interfaces[0]
2608 self.loopback0.admin_up()
2609 self.cli_verify_response("show bfd echo-source",
2610 "UDP echo source is not set.")
2611 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2612 self.cli_verify_no_response(cli_set)
2613 self.cli_verify_response("show bfd echo-source",
2614 "UDP echo source is: %s\n"
2615 "IPv4 address usable as echo source: none\n"
2616 "IPv6 address usable as echo source: none" %
2617 self.loopback0.name)
2618 self.loopback0.config_ip4()
2619 unpacked = unpack("!L", self.loopback0.local_ip4n)
2620 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2621 self.cli_verify_response("show bfd echo-source",
2622 "UDP echo source is: %s\n"
2623 "IPv4 address usable as echo source: %s\n"
2624 "IPv6 address usable as echo source: none" %
2625 (self.loopback0.name, echo_ip4))
2626 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2627 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2628 unpacked[2], unpacked[3] ^ 1))
2629 self.loopback0.config_ip6()
2630 self.cli_verify_response("show bfd echo-source",
2631 "UDP echo source is: %s\n"
2632 "IPv4 address usable as echo source: %s\n"
2633 "IPv6 address usable as echo source: %s" %
2634 (self.loopback0.name, echo_ip4, echo_ip6))
2635 cli_del = "bfd udp echo-source del"
2636 self.cli_verify_no_response(cli_del)
2637 self.cli_verify_response("show bfd echo-source",
2638 "UDP echo source is not set.")
2640 if __name__ == '__main__':
2641 unittest.main(testRunner=VppTestRunner)