4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22 from vpp_ip_route import VppIpRoute, VppRoutePath
27 class AuthKeyFactory(object):
28 """Factory class for creating auth keys with unique conf key ID"""
31 self._conf_key_ids = {}
33 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
34 """ create a random key with unique conf key id """
35 conf_key_id = randint(0, 0xFFFFFFFF)
36 while conf_key_id in self._conf_key_ids:
37 conf_key_id = randint(0, 0xFFFFFFFF)
38 self._conf_key_ids[conf_key_id] = 1
39 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
40 return VppBFDAuthKey(test=test, auth_type=auth_type,
41 conf_key_id=conf_key_id, key=key)
44 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
45 class BFDAPITestCase(VppTestCase):
46 """Bidirectional Forwarding Detection (BFD) - API"""
53 super(BFDAPITestCase, cls).setUpClass()
56 cls.create_pg_interfaces(range(2))
57 for i in cls.pg_interfaces:
63 super(BFDAPITestCase, cls).tearDownClass()
67 super(BFDAPITestCase, self).setUp()
68 self.factory = AuthKeyFactory()
70 def test_add_bfd(self):
71 """ create a BFD session """
72 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
73 session.add_vpp_config()
74 self.logger.debug("Session state is %s", session.state)
75 session.remove_vpp_config()
76 session.add_vpp_config()
77 self.logger.debug("Session state is %s", session.state)
78 session.remove_vpp_config()
80 def test_double_add(self):
81 """ create the same BFD session twice (negative case) """
82 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
83 session.add_vpp_config()
85 with self.vapi.expect_negative_api_retval():
86 session.add_vpp_config()
88 session.remove_vpp_config()
90 def test_add_bfd6(self):
91 """ create IPv6 BFD session """
92 session = VppBFDUDPSession(
93 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
94 session.add_vpp_config()
95 self.logger.debug("Session state is %s", session.state)
96 session.remove_vpp_config()
97 session.add_vpp_config()
98 self.logger.debug("Session state is %s", session.state)
99 session.remove_vpp_config()
101 def test_mod_bfd(self):
102 """ modify BFD session parameters """
103 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
104 desired_min_tx=50000,
105 required_min_rx=10000,
107 session.add_vpp_config()
108 s = session.get_bfd_udp_session_dump_entry()
109 self.assert_equal(session.desired_min_tx,
111 "desired min transmit interval")
112 self.assert_equal(session.required_min_rx,
114 "required min receive interval")
115 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
116 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
117 required_min_rx=session.required_min_rx * 2,
118 detect_mult=session.detect_mult * 2)
119 s = session.get_bfd_udp_session_dump_entry()
120 self.assert_equal(session.desired_min_tx,
122 "desired min transmit interval")
123 self.assert_equal(session.required_min_rx,
125 "required min receive interval")
126 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
128 def test_add_sha1_keys(self):
129 """ add SHA1 keys """
131 keys = [self.factory.create_random_key(
132 self) for i in range(0, key_count)]
134 self.assertFalse(key.query_vpp_config())
138 self.assertTrue(key.query_vpp_config())
140 indexes = range(key_count)
145 key.remove_vpp_config()
147 for j in range(key_count):
150 self.assertFalse(key.query_vpp_config())
152 self.assertTrue(key.query_vpp_config())
153 # should be removed now
155 self.assertFalse(key.query_vpp_config())
156 # add back and remove again
160 self.assertTrue(key.query_vpp_config())
162 key.remove_vpp_config()
164 self.assertFalse(key.query_vpp_config())
166 def test_add_bfd_sha1(self):
167 """ create a BFD session (SHA1) """
168 key = self.factory.create_random_key(self)
170 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
172 session.add_vpp_config()
173 self.logger.debug("Session state is %s", session.state)
174 session.remove_vpp_config()
175 session.add_vpp_config()
176 self.logger.debug("Session state is %s", session.state)
177 session.remove_vpp_config()
179 def test_double_add_sha1(self):
180 """ create the same BFD session twice (negative case) (SHA1) """
181 key = self.factory.create_random_key(self)
183 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185 session.add_vpp_config()
186 with self.assertRaises(Exception):
187 session.add_vpp_config()
189 def test_add_auth_nonexistent_key(self):
190 """ create BFD session using non-existent SHA1 (negative case) """
191 session = VppBFDUDPSession(
192 self, self.pg0, self.pg0.remote_ip4,
193 sha1_key=self.factory.create_random_key(self))
194 with self.assertRaises(Exception):
195 session.add_vpp_config()
197 def test_shared_sha1_key(self):
198 """ share single SHA1 key between multiple BFD sessions """
199 key = self.factory.create_random_key(self)
202 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
204 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
205 sha1_key=key, af=AF_INET6),
206 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
208 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
209 sha1_key=key, af=AF_INET6)]
214 e = key.get_bfd_auth_keys_dump_entry()
215 self.assert_equal(e.use_count, len(sessions) - removed,
216 "Use count for shared key")
217 s.remove_vpp_config()
219 e = key.get_bfd_auth_keys_dump_entry()
220 self.assert_equal(e.use_count, len(sessions) - removed,
221 "Use count for shared key")
223 def test_activate_auth(self):
224 """ activate SHA1 authentication """
225 key = self.factory.create_random_key(self)
227 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
228 session.add_vpp_config()
229 session.activate_auth(key)
231 def test_deactivate_auth(self):
232 """ deactivate SHA1 authentication """
233 key = self.factory.create_random_key(self)
235 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
236 session.add_vpp_config()
237 session.activate_auth(key)
238 session.deactivate_auth()
240 def test_change_key(self):
241 """ change SHA1 key """
242 key1 = self.factory.create_random_key(self)
243 key2 = self.factory.create_random_key(self)
244 while key2.conf_key_id == key1.conf_key_id:
245 key2 = self.factory.create_random_key(self)
246 key1.add_vpp_config()
247 key2.add_vpp_config()
248 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
250 session.add_vpp_config()
251 session.activate_auth(key2)
254 class BFDTestSession(object):
255 """ BFD session as seen from test framework side """
257 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
258 bfd_key_id=None, our_seq_number=None):
261 self.sha1_key = sha1_key
262 self.bfd_key_id = bfd_key_id
263 self.interface = interface
264 self.udp_sport = randint(49152, 65535)
265 if our_seq_number is None:
266 self.our_seq_number = randint(0, 40000000)
268 self.our_seq_number = our_seq_number
269 self.vpp_seq_number = None
270 self.my_discriminator = 0
271 self.desired_min_tx = 300000
272 self.required_min_rx = 300000
273 self.required_min_echo_rx = None
274 self.detect_mult = detect_mult
275 self.diag = BFDDiagCode.no_diagnostic
276 self.your_discriminator = None
277 self.state = BFDState.down
278 self.auth_type = BFDAuthType.no_auth
280 def inc_seq_num(self):
281 """ increment sequence number, wrapping if needed """
282 if self.our_seq_number == 0xFFFFFFFF:
283 self.our_seq_number = 0
285 self.our_seq_number += 1
287 def update(self, my_discriminator=None, your_discriminator=None,
288 desired_min_tx=None, required_min_rx=None,
289 required_min_echo_rx=None, detect_mult=None,
290 diag=None, state=None, auth_type=None):
291 """ update BFD parameters associated with session """
292 if my_discriminator is not None:
293 self.my_discriminator = my_discriminator
294 if your_discriminator is not None:
295 self.your_discriminator = your_discriminator
296 if required_min_rx is not None:
297 self.required_min_rx = required_min_rx
298 if required_min_echo_rx is not None:
299 self.required_min_echo_rx = required_min_echo_rx
300 if desired_min_tx is not None:
301 self.desired_min_tx = desired_min_tx
302 if detect_mult is not None:
303 self.detect_mult = detect_mult
306 if state is not None:
308 if auth_type is not None:
309 self.auth_type = auth_type
311 def fill_packet_fields(self, packet):
312 """ set packet fields with known values in packet """
314 if self.my_discriminator:
315 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
316 self.my_discriminator)
317 bfd.my_discriminator = self.my_discriminator
318 if self.your_discriminator:
319 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
320 self.your_discriminator)
321 bfd.your_discriminator = self.your_discriminator
322 if self.required_min_rx:
323 self.test.logger.debug(
324 "BFD: setting packet.required_min_rx_interval=%s",
325 self.required_min_rx)
326 bfd.required_min_rx_interval = self.required_min_rx
327 if self.required_min_echo_rx:
328 self.test.logger.debug(
329 "BFD: setting packet.required_min_echo_rx=%s",
330 self.required_min_echo_rx)
331 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
332 if self.desired_min_tx:
333 self.test.logger.debug(
334 "BFD: setting packet.desired_min_tx_interval=%s",
336 bfd.desired_min_tx_interval = self.desired_min_tx
338 self.test.logger.debug(
339 "BFD: setting packet.detect_mult=%s", self.detect_mult)
340 bfd.detect_mult = self.detect_mult
342 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
345 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
346 bfd.state = self.state
348 # this is used by a negative test-case
349 self.test.logger.debug("BFD: setting packet.auth_type=%s",
351 bfd.auth_type = self.auth_type
353 def create_packet(self):
354 """ create a BFD packet, reflecting the current state of session """
357 bfd.auth_type = self.sha1_key.auth_type
358 bfd.auth_len = BFD.sha1_auth_len
359 bfd.auth_key_id = self.bfd_key_id
360 bfd.auth_seq_num = self.our_seq_number
361 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
364 if self.af == AF_INET6:
365 packet = (Ether(src=self.interface.remote_mac,
366 dst=self.interface.local_mac) /
367 IPv6(src=self.interface.remote_ip6,
368 dst=self.interface.local_ip6,
370 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
373 packet = (Ether(src=self.interface.remote_mac,
374 dst=self.interface.local_mac) /
375 IP(src=self.interface.remote_ip4,
376 dst=self.interface.local_ip4,
378 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
380 self.test.logger.debug("BFD: Creating packet")
381 self.fill_packet_fields(packet)
383 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
384 "\0" * (20 - len(self.sha1_key.key))
385 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
386 hashlib.sha1(hash_material).hexdigest())
387 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
390 def send_packet(self, packet=None, interface=None):
391 """ send packet on interface, creating the packet if needed """
393 packet = self.create_packet()
394 if interface is None:
395 interface = self.test.pg0
396 self.test.logger.debug(ppp("Sending packet:", packet))
397 interface.add_stream(packet)
400 def verify_sha1_auth(self, packet):
401 """ Verify correctness of authentication in BFD layer. """
403 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
404 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
406 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
407 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
408 if self.vpp_seq_number is None:
409 self.vpp_seq_number = bfd.auth_seq_num
410 self.test.logger.debug("Received initial sequence number: %s" %
413 recvd_seq_num = bfd.auth_seq_num
414 self.test.logger.debug("Received followup sequence number: %s" %
416 if self.vpp_seq_number < 0xffffffff:
417 if self.sha1_key.auth_type == \
418 BFDAuthType.meticulous_keyed_sha1:
419 self.test.assert_equal(recvd_seq_num,
420 self.vpp_seq_number + 1,
421 "BFD sequence number")
423 self.test.assert_in_range(recvd_seq_num,
425 self.vpp_seq_number + 1,
426 "BFD sequence number")
428 if self.sha1_key.auth_type == \
429 BFDAuthType.meticulous_keyed_sha1:
430 self.test.assert_equal(recvd_seq_num, 0,
431 "BFD sequence number")
433 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
434 "BFD sequence number not one of "
435 "(%s, 0)" % self.vpp_seq_number)
436 self.vpp_seq_number = recvd_seq_num
437 # last 20 bytes represent the hash - so replace them with the key,
438 # pad the result with zeros and hash the result
439 hash_material = bfd.original[:-20] + self.sha1_key.key + \
440 "\0" * (20 - len(self.sha1_key.key))
441 expected_hash = hashlib.sha1(hash_material).hexdigest()
442 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
443 expected_hash, "Auth key hash")
445 def verify_bfd(self, packet):
446 """ Verify correctness of BFD layer. """
448 self.test.assert_equal(bfd.version, 1, "BFD version")
449 self.test.assert_equal(bfd.your_discriminator,
450 self.my_discriminator,
451 "BFD - your discriminator")
453 self.verify_sha1_auth(packet)
456 def bfd_session_up(test):
457 """ Bring BFD session up """
458 test.logger.info("BFD: Waiting for slow hello")
459 p = wait_for_bfd_packet(test, 2)
461 if hasattr(test, 'vpp_clock_offset'):
462 old_offset = test.vpp_clock_offset
463 test.vpp_clock_offset = time.time() - p.time
464 test.logger.debug("BFD: Calculated vpp clock offset: %s",
465 test.vpp_clock_offset)
467 test.assertAlmostEqual(
468 old_offset, test.vpp_clock_offset, delta=0.5,
469 msg="vpp clock offset not stable (new: %s, old: %s)" %
470 (test.vpp_clock_offset, old_offset))
471 test.logger.info("BFD: Sending Init")
472 test.test_session.update(my_discriminator=randint(0, 40000000),
473 your_discriminator=p[BFD].my_discriminator,
475 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
476 BFDAuthType.meticulous_keyed_sha1:
477 test.test_session.inc_seq_num()
478 test.test_session.send_packet()
479 test.logger.info("BFD: Waiting for event")
480 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
481 verify_event(test, e, expected_state=BFDState.up)
482 test.logger.info("BFD: Session is Up")
483 test.test_session.update(state=BFDState.up)
484 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
485 BFDAuthType.meticulous_keyed_sha1:
486 test.test_session.inc_seq_num()
487 test.test_session.send_packet()
488 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
491 def bfd_session_down(test):
492 """ Bring BFD session down """
493 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
494 test.test_session.update(state=BFDState.down)
495 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
496 BFDAuthType.meticulous_keyed_sha1:
497 test.test_session.inc_seq_num()
498 test.test_session.send_packet()
499 test.logger.info("BFD: Waiting for event")
500 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
501 verify_event(test, e, expected_state=BFDState.down)
502 test.logger.info("BFD: Session is Down")
503 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
506 def verify_bfd_session_config(test, session, state=None):
507 dump = session.get_bfd_udp_session_dump_entry()
508 test.assertIsNotNone(dump)
509 # since dump is not none, we have verified that sw_if_index and addresses
510 # are valid (in get_bfd_udp_session_dump_entry)
512 test.assert_equal(dump.state, state, "session state")
513 test.assert_equal(dump.required_min_rx, session.required_min_rx,
514 "required min rx interval")
515 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
516 "desired min tx interval")
517 test.assert_equal(dump.detect_mult, session.detect_mult,
519 if session.sha1_key is None:
520 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
522 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
523 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
525 test.assert_equal(dump.conf_key_id,
526 session.sha1_key.conf_key_id,
530 def verify_ip(test, packet):
531 """ Verify correctness of IP layer. """
532 if test.vpp_session.af == AF_INET6:
534 local_ip = test.pg0.local_ip6
535 remote_ip = test.pg0.remote_ip6
536 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
539 local_ip = test.pg0.local_ip4
540 remote_ip = test.pg0.remote_ip4
541 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
542 test.assert_equal(ip.src, local_ip, "IP source address")
543 test.assert_equal(ip.dst, remote_ip, "IP destination address")
546 def verify_udp(test, packet):
547 """ Verify correctness of UDP layer. """
549 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
550 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
554 def verify_event(test, event, expected_state):
555 """ Verify correctness of event values. """
557 test.logger.debug("BFD: Event: %s" % repr(e))
558 test.assert_equal(e.sw_if_index,
559 test.vpp_session.interface.sw_if_index,
560 "BFD interface index")
562 if test.vpp_session.af == AF_INET6:
564 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
565 if test.vpp_session.af == AF_INET:
566 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
567 "Local IPv4 address")
568 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
571 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
572 "Local IPv6 address")
573 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
575 test.assert_equal(e.state, expected_state, BFDState)
578 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
579 """ wait for BFD packet and verify its correctness
581 :param timeout: how long to wait
582 :param pcap_time_min: ignore packets with pcap timestamp lower than this
584 :returns: tuple (packet, time spent waiting for packet)
586 test.logger.info("BFD: Waiting for BFD packet")
587 deadline = time.time() + timeout
592 test.assert_in_range(counter, 0, 100, "number of packets ignored")
593 time_left = deadline - time.time()
595 raise CaptureTimeoutError("Packet did not arrive within timeout")
596 p = test.pg0.wait_for_packet(timeout=time_left)
597 test.logger.debug(ppp("BFD: Got packet:", p))
598 if pcap_time_min is not None and p.time < pcap_time_min:
599 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
600 "pcap time min %s):" %
601 (p.time, pcap_time_min), p))
606 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
608 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
611 test.test_session.verify_bfd(p)
615 class BFD4TestCase(VppTestCase):
616 """Bidirectional Forwarding Detection (BFD)"""
619 vpp_clock_offset = None
625 super(BFD4TestCase, cls).setUpClass()
627 cls.create_pg_interfaces([0])
628 cls.create_loopback_interfaces([0])
629 cls.loopback0 = cls.lo_interfaces[0]
630 cls.loopback0.config_ip4()
631 cls.loopback0.admin_up()
633 cls.pg0.configure_ipv4_neighbors()
635 cls.pg0.resolve_arp()
638 super(BFD4TestCase, cls).tearDownClass()
642 super(BFD4TestCase, self).setUp()
643 self.factory = AuthKeyFactory()
644 self.vapi.want_bfd_events()
645 self.pg0.enable_capture()
647 self.vpp_session = VppBFDUDPSession(self, self.pg0,
649 self.vpp_session.add_vpp_config()
650 self.vpp_session.admin_up()
651 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
653 self.vapi.want_bfd_events(enable_disable=0)
657 if not self.vpp_dead:
658 self.vapi.want_bfd_events(enable_disable=0)
659 self.vapi.collect_events() # clear the event queue
660 super(BFD4TestCase, self).tearDown()
662 def test_session_up(self):
663 """ bring BFD session up """
666 def test_session_up_by_ip(self):
667 """ bring BFD session up - first frame looked up by address pair """
668 self.logger.info("BFD: Sending Slow control frame")
669 self.test_session.update(my_discriminator=randint(0, 40000000))
670 self.test_session.send_packet()
671 self.pg0.enable_capture()
672 p = self.pg0.wait_for_packet(1)
673 self.assert_equal(p[BFD].your_discriminator,
674 self.test_session.my_discriminator,
675 "BFD - your discriminator")
676 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
677 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
679 self.logger.info("BFD: Waiting for event")
680 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
681 verify_event(self, e, expected_state=BFDState.init)
682 self.logger.info("BFD: Sending Up")
683 self.test_session.send_packet()
684 self.logger.info("BFD: Waiting for event")
685 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
686 verify_event(self, e, expected_state=BFDState.up)
687 self.logger.info("BFD: Session is Up")
688 self.test_session.update(state=BFDState.up)
689 self.test_session.send_packet()
690 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
692 def test_session_down(self):
693 """ bring BFD session down """
695 bfd_session_down(self)
697 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
698 def test_hold_up(self):
699 """ hold BFD session up """
701 for dummy in range(self.test_session.detect_mult * 2):
702 wait_for_bfd_packet(self)
703 self.test_session.send_packet()
704 self.assert_equal(len(self.vapi.collect_events()), 0,
705 "number of bfd events")
707 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
708 def test_slow_timer(self):
709 """ verify slow periodic control frames while session down """
711 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
712 prev_packet = wait_for_bfd_packet(self, 2)
713 for dummy in range(packet_count):
714 next_packet = wait_for_bfd_packet(self, 2)
715 time_diff = next_packet.time - prev_packet.time
716 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
717 # to work around timing issues
718 self.assert_in_range(
719 time_diff, 0.70, 1.05, "time between slow packets")
720 prev_packet = next_packet
722 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
723 def test_zero_remote_min_rx(self):
724 """ no packets when zero remote required min rx interval """
726 self.test_session.update(required_min_rx=0)
727 self.test_session.send_packet()
728 for dummy in range(self.test_session.detect_mult):
729 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
730 "sleep before transmitting bfd packet")
731 self.test_session.send_packet()
733 p = wait_for_bfd_packet(self, timeout=0)
734 self.logger.error(ppp("Received unexpected packet:", p))
735 except CaptureTimeoutError:
738 len(self.vapi.collect_events()), 0, "number of bfd events")
739 self.test_session.update(required_min_rx=300000)
740 for dummy in range(3):
741 self.test_session.send_packet()
743 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
745 len(self.vapi.collect_events()), 0, "number of bfd events")
747 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
748 def test_conn_down(self):
749 """ verify session goes down after inactivity """
751 detection_time = self.test_session.detect_mult *\
752 self.vpp_session.required_min_rx / USEC_IN_SEC
753 self.sleep(detection_time, "waiting for BFD session time-out")
754 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755 verify_event(self, e, expected_state=BFDState.down)
757 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
758 def test_large_required_min_rx(self):
759 """ large remote required min rx interval """
761 p = wait_for_bfd_packet(self)
763 self.test_session.update(required_min_rx=interval)
764 self.test_session.send_packet()
765 time_mark = time.time()
767 # busy wait here, trying to collect a packet or event, vpp is not
768 # allowed to send packets and the session will timeout first - so the
769 # Up->Down event must arrive before any packets do
770 while time.time() < time_mark + interval / USEC_IN_SEC:
772 p = wait_for_bfd_packet(self, timeout=0)
773 # if vpp managed to send a packet before we did the session
774 # session update, then that's fine, ignore it
775 if p.time < time_mark - self.vpp_clock_offset:
777 self.logger.error(ppp("Received unexpected packet:", p))
779 except CaptureTimeoutError:
781 events = self.vapi.collect_events()
783 verify_event(self, events[0], BFDState.down)
785 self.assert_equal(count, 0, "number of packets received")
787 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
788 def test_immediate_remote_min_rx_reduction(self):
789 """ immediately honor remote required min rx reduction """
790 self.vpp_session.remove_vpp_config()
791 self.vpp_session = VppBFDUDPSession(
792 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
793 self.pg0.enable_capture()
794 self.vpp_session.add_vpp_config()
795 self.test_session.update(desired_min_tx=1000000,
796 required_min_rx=1000000)
798 reference_packet = wait_for_bfd_packet(self)
799 time_mark = time.time()
801 self.test_session.update(required_min_rx=interval)
802 self.test_session.send_packet()
803 extra_time = time.time() - time_mark
804 p = wait_for_bfd_packet(self)
805 # first packet is allowed to be late by time we spent doing the update
806 # calculated in extra_time
807 self.assert_in_range(p.time - reference_packet.time,
808 .95 * 0.75 * interval / USEC_IN_SEC,
809 1.05 * interval / USEC_IN_SEC + extra_time,
810 "time between BFD packets")
812 for dummy in range(3):
813 p = wait_for_bfd_packet(self)
814 diff = p.time - reference_packet.time
815 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
816 1.05 * interval / USEC_IN_SEC,
817 "time between BFD packets")
820 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
821 def test_modify_req_min_rx_double(self):
822 """ modify session - double required min rx """
824 p = wait_for_bfd_packet(self)
825 self.test_session.update(desired_min_tx=10000,
826 required_min_rx=10000)
827 self.test_session.send_packet()
828 # double required min rx
829 self.vpp_session.modify_parameters(
830 required_min_rx=2 * self.vpp_session.required_min_rx)
831 p = wait_for_bfd_packet(
832 self, pcap_time_min=time.time() - self.vpp_clock_offset)
833 # poll bit needs to be set
834 self.assertIn("P", p.sprintf("%BFD.flags%"),
835 "Poll bit not set in BFD packet")
836 # finish poll sequence with final packet
837 final = self.test_session.create_packet()
838 final[BFD].flags = "F"
839 timeout = self.test_session.detect_mult * \
840 max(self.test_session.desired_min_tx,
841 self.vpp_session.required_min_rx) / USEC_IN_SEC
842 self.test_session.send_packet(final)
843 time_mark = time.time()
844 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
845 verify_event(self, e, expected_state=BFDState.down)
846 time_to_event = time.time() - time_mark
847 self.assert_in_range(time_to_event, .9 * timeout,
848 1.1 * timeout, "session timeout")
850 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
851 def test_modify_req_min_rx_halve(self):
852 """ modify session - halve required min rx """
853 self.vpp_session.modify_parameters(
854 required_min_rx=2 * self.vpp_session.required_min_rx)
856 p = wait_for_bfd_packet(self)
857 self.test_session.update(desired_min_tx=10000,
858 required_min_rx=10000)
859 self.test_session.send_packet()
860 p = wait_for_bfd_packet(
861 self, pcap_time_min=time.time() - self.vpp_clock_offset)
862 # halve required min rx
863 old_required_min_rx = self.vpp_session.required_min_rx
864 self.vpp_session.modify_parameters(
865 required_min_rx=0.5 * self.vpp_session.required_min_rx)
866 # now we wait 0.8*3*old-req-min-rx and the session should still be up
867 self.sleep(0.8 * self.vpp_session.detect_mult *
868 old_required_min_rx / USEC_IN_SEC,
869 "wait before finishing poll sequence")
870 self.assert_equal(len(self.vapi.collect_events()), 0,
871 "number of bfd events")
872 p = wait_for_bfd_packet(self)
873 # poll bit needs to be set
874 self.assertIn("P", p.sprintf("%BFD.flags%"),
875 "Poll bit not set in BFD packet")
876 # finish poll sequence with final packet
877 final = self.test_session.create_packet()
878 final[BFD].flags = "F"
879 self.test_session.send_packet(final)
880 # now the session should time out under new conditions
881 detection_time = self.test_session.detect_mult *\
882 self.vpp_session.required_min_rx / USEC_IN_SEC
884 e = self.vapi.wait_for_event(
885 2 * detection_time, "bfd_udp_session_details")
887 self.assert_in_range(after - before,
888 0.9 * detection_time,
889 1.1 * detection_time,
890 "time before bfd session goes down")
891 verify_event(self, e, expected_state=BFDState.down)
893 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
894 def test_modify_detect_mult(self):
895 """ modify detect multiplier """
897 p = wait_for_bfd_packet(self)
898 self.vpp_session.modify_parameters(detect_mult=1)
899 p = wait_for_bfd_packet(
900 self, pcap_time_min=time.time() - self.vpp_clock_offset)
901 self.assert_equal(self.vpp_session.detect_mult,
904 # poll bit must not be set
905 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
906 "Poll bit not set in BFD packet")
907 self.vpp_session.modify_parameters(detect_mult=10)
908 p = wait_for_bfd_packet(
909 self, pcap_time_min=time.time() - self.vpp_clock_offset)
910 self.assert_equal(self.vpp_session.detect_mult,
913 # poll bit must not be set
914 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
915 "Poll bit not set in BFD packet")
917 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
918 def test_queued_poll(self):
919 """ test poll sequence queueing """
921 p = wait_for_bfd_packet(self)
922 self.vpp_session.modify_parameters(
923 required_min_rx=2 * self.vpp_session.required_min_rx)
924 p = wait_for_bfd_packet(self)
925 poll_sequence_start = time.time()
926 poll_sequence_length_min = 0.5
927 send_final_after = time.time() + poll_sequence_length_min
928 # poll bit needs to be set
929 self.assertIn("P", p.sprintf("%BFD.flags%"),
930 "Poll bit not set in BFD packet")
931 self.assert_equal(p[BFD].required_min_rx_interval,
932 self.vpp_session.required_min_rx,
933 "BFD required min rx interval")
934 self.vpp_session.modify_parameters(
935 required_min_rx=2 * self.vpp_session.required_min_rx)
936 # 2nd poll sequence should be queued now
937 # don't send the reply back yet, wait for some time to emulate
938 # longer round-trip time
940 while time.time() < send_final_after:
941 self.test_session.send_packet()
942 p = wait_for_bfd_packet(self)
943 self.assert_equal(len(self.vapi.collect_events()), 0,
944 "number of bfd events")
945 self.assert_equal(p[BFD].required_min_rx_interval,
946 self.vpp_session.required_min_rx,
947 "BFD required min rx interval")
949 # poll bit must be set
950 self.assertIn("P", p.sprintf("%BFD.flags%"),
951 "Poll bit not set in BFD packet")
952 final = self.test_session.create_packet()
953 final[BFD].flags = "F"
954 self.test_session.send_packet(final)
955 # finish 1st with final
956 poll_sequence_length = time.time() - poll_sequence_start
957 # vpp must wait for some time before starting new poll sequence
958 poll_no_2_started = False
959 for dummy in range(2 * packet_count):
960 p = wait_for_bfd_packet(self)
961 self.assert_equal(len(self.vapi.collect_events()), 0,
962 "number of bfd events")
963 if "P" in p.sprintf("%BFD.flags%"):
964 poll_no_2_started = True
965 if time.time() < poll_sequence_start + poll_sequence_length:
966 raise Exception("VPP started 2nd poll sequence too soon")
967 final = self.test_session.create_packet()
968 final[BFD].flags = "F"
969 self.test_session.send_packet(final)
972 self.test_session.send_packet()
973 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
974 # finish 2nd with final
975 final = self.test_session.create_packet()
976 final[BFD].flags = "F"
977 self.test_session.send_packet(final)
978 p = wait_for_bfd_packet(self)
979 # poll bit must not be set
980 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
981 "Poll bit set in BFD packet")
983 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
984 def test_poll_response(self):
985 """ test correct response to control frame with poll bit set """
987 poll = self.test_session.create_packet()
988 poll[BFD].flags = "P"
989 self.test_session.send_packet(poll)
990 final = wait_for_bfd_packet(
991 self, pcap_time_min=time.time() - self.vpp_clock_offset)
992 self.assertIn("F", final.sprintf("%BFD.flags%"))
994 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
995 def test_no_periodic_if_remote_demand(self):
996 """ no periodic frames outside poll sequence if remote demand set """
998 demand = self.test_session.create_packet()
999 demand[BFD].flags = "D"
1000 self.test_session.send_packet(demand)
1001 transmit_time = 0.9 \
1002 * max(self.vpp_session.required_min_rx,
1003 self.test_session.desired_min_tx) \
1006 for dummy in range(self.test_session.detect_mult * 2):
1007 time.sleep(transmit_time)
1008 self.test_session.send_packet(demand)
1010 p = wait_for_bfd_packet(self, timeout=0)
1011 self.logger.error(ppp("Received unexpected packet:", p))
1013 except CaptureTimeoutError:
1015 events = self.vapi.collect_events()
1017 self.logger.error("Received unexpected event: %s", e)
1018 self.assert_equal(count, 0, "number of packets received")
1019 self.assert_equal(len(events), 0, "number of events received")
1021 def test_echo_looped_back(self):
1022 """ echo packets looped back """
1023 # don't need a session in this case..
1024 self.vpp_session.remove_vpp_config()
1025 self.pg0.enable_capture()
1026 echo_packet_count = 10
1027 # random source port low enough to increment a few times..
1028 udp_sport_tx = randint(1, 50000)
1029 udp_sport_rx = udp_sport_tx
1030 echo_packet = (Ether(src=self.pg0.remote_mac,
1031 dst=self.pg0.local_mac) /
1032 IP(src=self.pg0.remote_ip4,
1033 dst=self.pg0.remote_ip4) /
1034 UDP(dport=BFD.udp_dport_echo) /
1035 Raw("this should be looped back"))
1036 for dummy in range(echo_packet_count):
1037 self.sleep(.01, "delay between echo packets")
1038 echo_packet[UDP].sport = udp_sport_tx
1040 self.logger.debug(ppp("Sending packet:", echo_packet))
1041 self.pg0.add_stream(echo_packet)
1043 for dummy in range(echo_packet_count):
1044 p = self.pg0.wait_for_packet(1)
1045 self.logger.debug(ppp("Got packet:", p))
1047 self.assert_equal(self.pg0.remote_mac,
1048 ether.dst, "Destination MAC")
1049 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1051 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1052 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1054 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1055 "UDP destination port")
1056 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1058 # need to compare the hex payload here, otherwise BFD_vpp_echo
1060 self.assertEqual(str(p[UDP].payload),
1061 str(echo_packet[UDP].payload),
1062 "Received packet is not the echo packet sent")
1063 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1064 "ECHO packet identifier for test purposes)")
1066 def test_echo(self):
1067 """ echo function """
1068 bfd_session_up(self)
1069 self.test_session.update(required_min_echo_rx=150000)
1070 self.test_session.send_packet()
1071 detection_time = self.test_session.detect_mult *\
1072 self.vpp_session.required_min_rx / USEC_IN_SEC
1073 # echo shouldn't work without echo source set
1074 for dummy in range(10):
1075 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1076 self.sleep(sleep, "delay before sending bfd packet")
1077 self.test_session.send_packet()
1078 p = wait_for_bfd_packet(
1079 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1080 self.assert_equal(p[BFD].required_min_rx_interval,
1081 self.vpp_session.required_min_rx,
1082 "BFD required min rx interval")
1083 self.test_session.send_packet()
1084 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1086 # should be turned on - loopback echo packets
1087 for dummy in range(3):
1088 loop_until = time.time() + 0.75 * detection_time
1089 while time.time() < loop_until:
1090 p = self.pg0.wait_for_packet(1)
1091 self.logger.debug(ppp("Got packet:", p))
1092 if p[UDP].dport == BFD.udp_dport_echo:
1094 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1095 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1096 "BFD ECHO src IP equal to loopback IP")
1097 self.logger.debug(ppp("Looping back packet:", p))
1098 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1099 "ECHO packet destination MAC address")
1100 p[Ether].dst = self.pg0.local_mac
1101 self.pg0.add_stream(p)
1104 elif p.haslayer(BFD):
1106 self.assertGreaterEqual(
1107 p[BFD].required_min_rx_interval,
1109 if "P" in p.sprintf("%BFD.flags%"):
1110 final = self.test_session.create_packet()
1111 final[BFD].flags = "F"
1112 self.test_session.send_packet(final)
1114 raise Exception(ppp("Received unknown packet:", p))
1116 self.assert_equal(len(self.vapi.collect_events()), 0,
1117 "number of bfd events")
1118 self.test_session.send_packet()
1119 self.assertTrue(echo_seen, "No echo packets received")
1121 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1122 def test_echo_fail(self):
1123 """ session goes down if echo function fails """
1124 bfd_session_up(self)
1125 self.test_session.update(required_min_echo_rx=150000)
1126 self.test_session.send_packet()
1127 detection_time = self.test_session.detect_mult *\
1128 self.vpp_session.required_min_rx / USEC_IN_SEC
1129 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1130 # echo function should be used now, but we will drop the echo packets
1131 verified_diag = False
1132 for dummy in range(3):
1133 loop_until = time.time() + 0.75 * detection_time
1134 while time.time() < loop_until:
1135 p = self.pg0.wait_for_packet(1)
1136 self.logger.debug(ppp("Got packet:", p))
1137 if p[UDP].dport == BFD.udp_dport_echo:
1140 elif p.haslayer(BFD):
1141 if "P" in p.sprintf("%BFD.flags%"):
1142 self.assertGreaterEqual(
1143 p[BFD].required_min_rx_interval,
1145 final = self.test_session.create_packet()
1146 final[BFD].flags = "F"
1147 self.test_session.send_packet(final)
1148 if p[BFD].state == BFDState.down:
1149 self.assert_equal(p[BFD].diag,
1150 BFDDiagCode.echo_function_failed,
1152 verified_diag = True
1154 raise Exception(ppp("Received unknown packet:", p))
1155 self.test_session.send_packet()
1156 events = self.vapi.collect_events()
1157 self.assert_equal(len(events), 1, "number of bfd events")
1158 self.assert_equal(events[0].state, BFDState.down, BFDState)
1159 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1161 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1162 def test_echo_stop(self):
1163 """ echo function stops if peer sets required min echo rx zero """
1164 bfd_session_up(self)
1165 self.test_session.update(required_min_echo_rx=150000)
1166 self.test_session.send_packet()
1167 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1168 # wait for first echo packet
1170 p = self.pg0.wait_for_packet(1)
1171 self.logger.debug(ppp("Got packet:", p))
1172 if p[UDP].dport == BFD.udp_dport_echo:
1173 self.logger.debug(ppp("Looping back packet:", p))
1174 p[Ether].dst = self.pg0.local_mac
1175 self.pg0.add_stream(p)
1178 elif p.haslayer(BFD):
1182 raise Exception(ppp("Received unknown packet:", p))
1183 self.test_session.update(required_min_echo_rx=0)
1184 self.test_session.send_packet()
1185 # echo packets shouldn't arrive anymore
1186 for dummy in range(5):
1187 wait_for_bfd_packet(
1188 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1189 self.test_session.send_packet()
1190 events = self.vapi.collect_events()
1191 self.assert_equal(len(events), 0, "number of bfd events")
1193 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1194 def test_echo_source_removed(self):
1195 """ echo function stops if echo source is removed """
1196 bfd_session_up(self)
1197 self.test_session.update(required_min_echo_rx=150000)
1198 self.test_session.send_packet()
1199 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1200 # wait for first echo packet
1202 p = self.pg0.wait_for_packet(1)
1203 self.logger.debug(ppp("Got packet:", p))
1204 if p[UDP].dport == BFD.udp_dport_echo:
1205 self.logger.debug(ppp("Looping back packet:", p))
1206 p[Ether].dst = self.pg0.local_mac
1207 self.pg0.add_stream(p)
1210 elif p.haslayer(BFD):
1214 raise Exception(ppp("Received unknown packet:", p))
1215 self.vapi.bfd_udp_del_echo_source()
1216 self.test_session.send_packet()
1217 # echo packets shouldn't arrive anymore
1218 for dummy in range(5):
1219 wait_for_bfd_packet(
1220 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1221 self.test_session.send_packet()
1222 events = self.vapi.collect_events()
1223 self.assert_equal(len(events), 0, "number of bfd events")
1225 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1226 def test_stale_echo(self):
1227 """ stale echo packets don't keep a session up """
1228 bfd_session_up(self)
1229 self.test_session.update(required_min_echo_rx=150000)
1230 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1231 self.test_session.send_packet()
1232 # should be turned on - loopback echo packets
1236 for dummy in range(10 * self.vpp_session.detect_mult):
1237 p = self.pg0.wait_for_packet(1)
1238 if p[UDP].dport == BFD.udp_dport_echo:
1239 if echo_packet is None:
1240 self.logger.debug(ppp("Got first echo packet:", p))
1242 timeout_at = time.time() + self.vpp_session.detect_mult * \
1243 self.test_session.required_min_echo_rx / USEC_IN_SEC
1245 self.logger.debug(ppp("Got followup echo packet:", p))
1246 self.logger.debug(ppp("Looping back first echo packet:", p))
1247 echo_packet[Ether].dst = self.pg0.local_mac
1248 self.pg0.add_stream(echo_packet)
1250 elif p.haslayer(BFD):
1251 self.logger.debug(ppp("Got packet:", p))
1252 if "P" in p.sprintf("%BFD.flags%"):
1253 final = self.test_session.create_packet()
1254 final[BFD].flags = "F"
1255 self.test_session.send_packet(final)
1256 if p[BFD].state == BFDState.down:
1257 self.assertIsNotNone(
1259 "Session went down before first echo packet received")
1261 self.assertGreaterEqual(
1263 "Session timeout at %s, but is expected at %s" %
1265 self.assert_equal(p[BFD].diag,
1266 BFDDiagCode.echo_function_failed,
1268 events = self.vapi.collect_events()
1269 self.assert_equal(len(events), 1, "number of bfd events")
1270 self.assert_equal(events[0].state, BFDState.down, BFDState)
1274 raise Exception(ppp("Received unknown packet:", p))
1275 self.test_session.send_packet()
1276 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1278 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1279 def test_invalid_echo_checksum(self):
1280 """ echo packets with invalid checksum don't keep a session up """
1281 bfd_session_up(self)
1282 self.test_session.update(required_min_echo_rx=150000)
1283 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1284 self.test_session.send_packet()
1285 # should be turned on - loopback echo packets
1288 for dummy in range(10 * self.vpp_session.detect_mult):
1289 p = self.pg0.wait_for_packet(1)
1290 if p[UDP].dport == BFD.udp_dport_echo:
1291 self.logger.debug(ppp("Got echo packet:", p))
1292 if timeout_at is None:
1293 timeout_at = time.time() + self.vpp_session.detect_mult * \
1294 self.test_session.required_min_echo_rx / USEC_IN_SEC
1295 p[BFD_vpp_echo].checksum = getrandbits(64)
1296 p[Ether].dst = self.pg0.local_mac
1297 self.logger.debug(ppp("Looping back modified echo packet:", p))
1298 self.pg0.add_stream(p)
1300 elif p.haslayer(BFD):
1301 self.logger.debug(ppp("Got packet:", p))
1302 if "P" in p.sprintf("%BFD.flags%"):
1303 final = self.test_session.create_packet()
1304 final[BFD].flags = "F"
1305 self.test_session.send_packet(final)
1306 if p[BFD].state == BFDState.down:
1307 self.assertIsNotNone(
1309 "Session went down before first echo packet received")
1311 self.assertGreaterEqual(
1313 "Session timeout at %s, but is expected at %s" %
1315 self.assert_equal(p[BFD].diag,
1316 BFDDiagCode.echo_function_failed,
1318 events = self.vapi.collect_events()
1319 self.assert_equal(len(events), 1, "number of bfd events")
1320 self.assert_equal(events[0].state, BFDState.down, BFDState)
1324 raise Exception(ppp("Received unknown packet:", p))
1325 self.test_session.send_packet()
1326 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1328 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1329 def test_admin_up_down(self):
1330 """ put session admin-up and admin-down """
1331 bfd_session_up(self)
1332 self.vpp_session.admin_down()
1333 self.pg0.enable_capture()
1334 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1335 verify_event(self, e, expected_state=BFDState.admin_down)
1336 for dummy in range(2):
1337 p = wait_for_bfd_packet(self)
1338 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1339 # try to bring session up - shouldn't be possible
1340 self.test_session.update(state=BFDState.init)
1341 self.test_session.send_packet()
1342 for dummy in range(2):
1343 p = wait_for_bfd_packet(self)
1344 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1345 self.vpp_session.admin_up()
1346 self.test_session.update(state=BFDState.down)
1347 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1348 verify_event(self, e, expected_state=BFDState.down)
1349 p = wait_for_bfd_packet(
1350 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1351 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1352 self.test_session.send_packet()
1353 p = wait_for_bfd_packet(
1354 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1355 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1356 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1357 verify_event(self, e, expected_state=BFDState.init)
1358 self.test_session.update(state=BFDState.up)
1359 self.test_session.send_packet()
1360 p = wait_for_bfd_packet(
1361 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1362 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1363 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1364 verify_event(self, e, expected_state=BFDState.up)
1366 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1367 def test_config_change_remote_demand(self):
1368 """ configuration change while peer in demand mode """
1369 bfd_session_up(self)
1370 demand = self.test_session.create_packet()
1371 demand[BFD].flags = "D"
1372 self.test_session.send_packet(demand)
1373 self.vpp_session.modify_parameters(
1374 required_min_rx=2 * self.vpp_session.required_min_rx)
1375 p = wait_for_bfd_packet(
1376 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1377 # poll bit must be set
1378 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1379 # terminate poll sequence
1380 final = self.test_session.create_packet()
1381 final[BFD].flags = "D+F"
1382 self.test_session.send_packet(final)
1383 # vpp should be quiet now again
1384 transmit_time = 0.9 \
1385 * max(self.vpp_session.required_min_rx,
1386 self.test_session.desired_min_tx) \
1389 for dummy in range(self.test_session.detect_mult * 2):
1390 time.sleep(transmit_time)
1391 self.test_session.send_packet(demand)
1393 p = wait_for_bfd_packet(self, timeout=0)
1394 self.logger.error(ppp("Received unexpected packet:", p))
1396 except CaptureTimeoutError:
1398 events = self.vapi.collect_events()
1400 self.logger.error("Received unexpected event: %s", e)
1401 self.assert_equal(count, 0, "number of packets received")
1402 self.assert_equal(len(events), 0, "number of events received")
1405 class BFD6TestCase(VppTestCase):
1406 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1409 vpp_clock_offset = None
1414 def setUpClass(cls):
1415 super(BFD6TestCase, cls).setUpClass()
1417 cls.create_pg_interfaces([0])
1418 cls.pg0.config_ip6()
1419 cls.pg0.configure_ipv6_neighbors()
1421 cls.pg0.resolve_ndp()
1422 cls.create_loopback_interfaces([0])
1423 cls.loopback0 = cls.lo_interfaces[0]
1424 cls.loopback0.config_ip6()
1425 cls.loopback0.admin_up()
1428 super(BFD6TestCase, cls).tearDownClass()
1432 super(BFD6TestCase, self).setUp()
1433 self.factory = AuthKeyFactory()
1434 self.vapi.want_bfd_events()
1435 self.pg0.enable_capture()
1437 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1438 self.pg0.remote_ip6,
1440 self.vpp_session.add_vpp_config()
1441 self.vpp_session.admin_up()
1442 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1443 self.logger.debug(self.vapi.cli("show adj nbr"))
1445 self.vapi.want_bfd_events(enable_disable=0)
1449 if not self.vpp_dead:
1450 self.vapi.want_bfd_events(enable_disable=0)
1451 self.vapi.collect_events() # clear the event queue
1452 super(BFD6TestCase, self).tearDown()
1454 def test_session_up(self):
1455 """ bring BFD session up """
1456 bfd_session_up(self)
1458 def test_session_up_by_ip(self):
1459 """ bring BFD session up - first frame looked up by address pair """
1460 self.logger.info("BFD: Sending Slow control frame")
1461 self.test_session.update(my_discriminator=randint(0, 40000000))
1462 self.test_session.send_packet()
1463 self.pg0.enable_capture()
1464 p = self.pg0.wait_for_packet(1)
1465 self.assert_equal(p[BFD].your_discriminator,
1466 self.test_session.my_discriminator,
1467 "BFD - your discriminator")
1468 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1469 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1471 self.logger.info("BFD: Waiting for event")
1472 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1473 verify_event(self, e, expected_state=BFDState.init)
1474 self.logger.info("BFD: Sending Up")
1475 self.test_session.send_packet()
1476 self.logger.info("BFD: Waiting for event")
1477 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1478 verify_event(self, e, expected_state=BFDState.up)
1479 self.logger.info("BFD: Session is Up")
1480 self.test_session.update(state=BFDState.up)
1481 self.test_session.send_packet()
1482 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1484 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1485 def test_hold_up(self):
1486 """ hold BFD session up """
1487 bfd_session_up(self)
1488 for dummy in range(self.test_session.detect_mult * 2):
1489 wait_for_bfd_packet(self)
1490 self.test_session.send_packet()
1491 self.assert_equal(len(self.vapi.collect_events()), 0,
1492 "number of bfd events")
1493 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1495 def test_echo_looped_back(self):
1496 """ echo packets looped back """
1497 # don't need a session in this case..
1498 self.vpp_session.remove_vpp_config()
1499 self.pg0.enable_capture()
1500 echo_packet_count = 10
1501 # random source port low enough to increment a few times..
1502 udp_sport_tx = randint(1, 50000)
1503 udp_sport_rx = udp_sport_tx
1504 echo_packet = (Ether(src=self.pg0.remote_mac,
1505 dst=self.pg0.local_mac) /
1506 IPv6(src=self.pg0.remote_ip6,
1507 dst=self.pg0.remote_ip6) /
1508 UDP(dport=BFD.udp_dport_echo) /
1509 Raw("this should be looped back"))
1510 for dummy in range(echo_packet_count):
1511 self.sleep(.01, "delay between echo packets")
1512 echo_packet[UDP].sport = udp_sport_tx
1514 self.logger.debug(ppp("Sending packet:", echo_packet))
1515 self.pg0.add_stream(echo_packet)
1517 for dummy in range(echo_packet_count):
1518 p = self.pg0.wait_for_packet(1)
1519 self.logger.debug(ppp("Got packet:", p))
1521 self.assert_equal(self.pg0.remote_mac,
1522 ether.dst, "Destination MAC")
1523 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1525 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1526 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1528 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1529 "UDP destination port")
1530 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1532 # need to compare the hex payload here, otherwise BFD_vpp_echo
1534 self.assertEqual(str(p[UDP].payload),
1535 str(echo_packet[UDP].payload),
1536 "Received packet is not the echo packet sent")
1537 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1538 "ECHO packet identifier for test purposes)")
1539 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1540 "ECHO packet identifier for test purposes)")
1542 def test_echo(self):
1543 """ echo function """
1544 bfd_session_up(self)
1545 self.test_session.update(required_min_echo_rx=150000)
1546 self.test_session.send_packet()
1547 detection_time = self.test_session.detect_mult *\
1548 self.vpp_session.required_min_rx / USEC_IN_SEC
1549 # echo shouldn't work without echo source set
1550 for dummy in range(10):
1551 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1552 self.sleep(sleep, "delay before sending bfd packet")
1553 self.test_session.send_packet()
1554 p = wait_for_bfd_packet(
1555 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1556 self.assert_equal(p[BFD].required_min_rx_interval,
1557 self.vpp_session.required_min_rx,
1558 "BFD required min rx interval")
1559 self.test_session.send_packet()
1560 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1562 # should be turned on - loopback echo packets
1563 for dummy in range(3):
1564 loop_until = time.time() + 0.75 * detection_time
1565 while time.time() < loop_until:
1566 p = self.pg0.wait_for_packet(1)
1567 self.logger.debug(ppp("Got packet:", p))
1568 if p[UDP].dport == BFD.udp_dport_echo:
1570 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1571 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1572 "BFD ECHO src IP equal to loopback IP")
1573 self.logger.debug(ppp("Looping back packet:", p))
1574 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1575 "ECHO packet destination MAC address")
1576 p[Ether].dst = self.pg0.local_mac
1577 self.pg0.add_stream(p)
1580 elif p.haslayer(BFD):
1582 self.assertGreaterEqual(
1583 p[BFD].required_min_rx_interval,
1585 if "P" in p.sprintf("%BFD.flags%"):
1586 final = self.test_session.create_packet()
1587 final[BFD].flags = "F"
1588 self.test_session.send_packet(final)
1590 raise Exception(ppp("Received unknown packet:", p))
1592 self.assert_equal(len(self.vapi.collect_events()), 0,
1593 "number of bfd events")
1594 self.test_session.send_packet()
1595 self.assertTrue(echo_seen, "No echo packets received")
1598 class BFDFIBTestCase(VppTestCase):
1599 """ BFD-FIB interactions (IPv6) """
1605 super(BFDFIBTestCase, self).setUp()
1606 self.create_pg_interfaces(range(1))
1608 self.vapi.want_bfd_events()
1609 self.pg0.enable_capture()
1611 for i in self.pg_interfaces:
1614 i.configure_ipv6_neighbors()
1617 if not self.vpp_dead:
1618 self.vapi.want_bfd_events(enable_disable=0)
1620 super(BFDFIBTestCase, self).tearDown()
1623 def pkt_is_not_data_traffic(p):
1624 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1625 if p.haslayer(BFD) or is_ipv6_misc(p):
1629 def test_session_with_fib(self):
1630 """ BFD-FIB interactions """
1632 # packets to match against both of the routes
1633 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1634 IPv6(src="3001::1", dst="2001::1") /
1635 UDP(sport=1234, dport=1234) /
1637 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1638 IPv6(src="3001::1", dst="2002::1") /
1639 UDP(sport=1234, dport=1234) /
1642 # A recursive and a non-recursive route via a next-hop that
1643 # will have a BFD session
1644 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1645 [VppRoutePath(self.pg0.remote_ip6,
1646 self.pg0.sw_if_index,
1649 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1650 [VppRoutePath(self.pg0.remote_ip6,
1654 ip_2001_s_64.add_vpp_config()
1655 ip_2002_s_64.add_vpp_config()
1657 # bring the session up now the routes are present
1658 self.vpp_session = VppBFDUDPSession(self,
1660 self.pg0.remote_ip6,
1662 self.vpp_session.add_vpp_config()
1663 self.vpp_session.admin_up()
1664 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1666 # session is up - traffic passes
1667 bfd_session_up(self)
1669 self.pg0.add_stream(p)
1672 captured = self.pg0.wait_for_packet(
1674 filter_out_fn=self.pkt_is_not_data_traffic)
1675 self.assertEqual(captured[IPv6].dst,
1678 # session is up - traffic is dropped
1679 bfd_session_down(self)
1681 self.pg0.add_stream(p)
1683 with self.assertRaises(CaptureTimeoutError):
1684 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1686 # session is up - traffic passes
1687 bfd_session_up(self)
1689 self.pg0.add_stream(p)
1692 captured = self.pg0.wait_for_packet(
1694 filter_out_fn=self.pkt_is_not_data_traffic)
1695 self.assertEqual(captured[IPv6].dst,
1699 class BFDSHA1TestCase(VppTestCase):
1700 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1703 vpp_clock_offset = None
1708 def setUpClass(cls):
1709 super(BFDSHA1TestCase, cls).setUpClass()
1711 cls.create_pg_interfaces([0])
1712 cls.pg0.config_ip4()
1714 cls.pg0.resolve_arp()
1717 super(BFDSHA1TestCase, cls).tearDownClass()
1721 super(BFDSHA1TestCase, self).setUp()
1722 self.factory = AuthKeyFactory()
1723 self.vapi.want_bfd_events()
1724 self.pg0.enable_capture()
1727 if not self.vpp_dead:
1728 self.vapi.want_bfd_events(enable_disable=0)
1729 self.vapi.collect_events() # clear the event queue
1730 super(BFDSHA1TestCase, self).tearDown()
1732 def test_session_up(self):
1733 """ bring BFD session up """
1734 key = self.factory.create_random_key(self)
1735 key.add_vpp_config()
1736 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1737 self.pg0.remote_ip4,
1739 self.vpp_session.add_vpp_config()
1740 self.vpp_session.admin_up()
1741 self.test_session = BFDTestSession(
1742 self, self.pg0, AF_INET, sha1_key=key,
1743 bfd_key_id=self.vpp_session.bfd_key_id)
1744 bfd_session_up(self)
1746 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1747 def test_hold_up(self):
1748 """ hold BFD session up """
1749 key = self.factory.create_random_key(self)
1750 key.add_vpp_config()
1751 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1752 self.pg0.remote_ip4,
1754 self.vpp_session.add_vpp_config()
1755 self.vpp_session.admin_up()
1756 self.test_session = BFDTestSession(
1757 self, self.pg0, AF_INET, sha1_key=key,
1758 bfd_key_id=self.vpp_session.bfd_key_id)
1759 bfd_session_up(self)
1760 for dummy in range(self.test_session.detect_mult * 2):
1761 wait_for_bfd_packet(self)
1762 self.test_session.send_packet()
1763 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1765 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1766 def test_hold_up_meticulous(self):
1767 """ hold BFD session up - meticulous auth """
1768 key = self.factory.create_random_key(
1769 self, BFDAuthType.meticulous_keyed_sha1)
1770 key.add_vpp_config()
1771 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1772 self.pg0.remote_ip4, sha1_key=key)
1773 self.vpp_session.add_vpp_config()
1774 self.vpp_session.admin_up()
1775 # specify sequence number so that it wraps
1776 self.test_session = BFDTestSession(
1777 self, self.pg0, AF_INET, sha1_key=key,
1778 bfd_key_id=self.vpp_session.bfd_key_id,
1779 our_seq_number=0xFFFFFFFF - 4)
1780 bfd_session_up(self)
1781 for dummy in range(30):
1782 wait_for_bfd_packet(self)
1783 self.test_session.inc_seq_num()
1784 self.test_session.send_packet()
1785 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1787 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1788 def test_send_bad_seq_number(self):
1789 """ session is not kept alive by msgs with bad sequence numbers"""
1790 key = self.factory.create_random_key(
1791 self, BFDAuthType.meticulous_keyed_sha1)
1792 key.add_vpp_config()
1793 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1794 self.pg0.remote_ip4, sha1_key=key)
1795 self.vpp_session.add_vpp_config()
1796 self.test_session = BFDTestSession(
1797 self, self.pg0, AF_INET, sha1_key=key,
1798 bfd_key_id=self.vpp_session.bfd_key_id)
1799 bfd_session_up(self)
1800 detection_time = self.test_session.detect_mult *\
1801 self.vpp_session.required_min_rx / USEC_IN_SEC
1802 send_until = time.time() + 2 * detection_time
1803 while time.time() < send_until:
1804 self.test_session.send_packet()
1805 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1806 "time between bfd packets")
1807 e = self.vapi.collect_events()
1808 # session should be down now, because the sequence numbers weren't
1810 self.assert_equal(len(e), 1, "number of bfd events")
1811 verify_event(self, e[0], expected_state=BFDState.down)
1813 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1814 legitimate_test_session,
1816 rogue_bfd_values=None):
1817 """ execute a rogue session interaction scenario
1819 1. create vpp session, add config
1820 2. bring the legitimate session up
1821 3. copy the bfd values from legitimate session to rogue session
1822 4. apply rogue_bfd_values to rogue session
1823 5. set rogue session state to down
1824 6. send message to take the session down from the rogue session
1825 7. assert that the legitimate session is unaffected
1828 self.vpp_session = vpp_bfd_udp_session
1829 self.vpp_session.add_vpp_config()
1830 self.test_session = legitimate_test_session
1831 # bring vpp session up
1832 bfd_session_up(self)
1833 # send packet from rogue session
1834 rogue_test_session.update(
1835 my_discriminator=self.test_session.my_discriminator,
1836 your_discriminator=self.test_session.your_discriminator,
1837 desired_min_tx=self.test_session.desired_min_tx,
1838 required_min_rx=self.test_session.required_min_rx,
1839 detect_mult=self.test_session.detect_mult,
1840 diag=self.test_session.diag,
1841 state=self.test_session.state,
1842 auth_type=self.test_session.auth_type)
1843 if rogue_bfd_values:
1844 rogue_test_session.update(**rogue_bfd_values)
1845 rogue_test_session.update(state=BFDState.down)
1846 rogue_test_session.send_packet()
1847 wait_for_bfd_packet(self)
1848 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1850 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1851 def test_mismatch_auth(self):
1852 """ session is not brought down by unauthenticated msg """
1853 key = self.factory.create_random_key(self)
1854 key.add_vpp_config()
1855 vpp_session = VppBFDUDPSession(
1856 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1857 legitimate_test_session = BFDTestSession(
1858 self, self.pg0, AF_INET, sha1_key=key,
1859 bfd_key_id=vpp_session.bfd_key_id)
1860 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1861 self.execute_rogue_session_scenario(vpp_session,
1862 legitimate_test_session,
1865 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1866 def test_mismatch_bfd_key_id(self):
1867 """ session is not brought down by msg with non-existent key-id """
1868 key = self.factory.create_random_key(self)
1869 key.add_vpp_config()
1870 vpp_session = VppBFDUDPSession(
1871 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1872 # pick a different random bfd key id
1874 while x == vpp_session.bfd_key_id:
1876 legitimate_test_session = BFDTestSession(
1877 self, self.pg0, AF_INET, sha1_key=key,
1878 bfd_key_id=vpp_session.bfd_key_id)
1879 rogue_test_session = BFDTestSession(
1880 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1881 self.execute_rogue_session_scenario(vpp_session,
1882 legitimate_test_session,
1885 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1886 def test_mismatched_auth_type(self):
1887 """ session is not brought down by msg with wrong auth type """
1888 key = self.factory.create_random_key(self)
1889 key.add_vpp_config()
1890 vpp_session = VppBFDUDPSession(
1891 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1892 legitimate_test_session = BFDTestSession(
1893 self, self.pg0, AF_INET, sha1_key=key,
1894 bfd_key_id=vpp_session.bfd_key_id)
1895 rogue_test_session = BFDTestSession(
1896 self, self.pg0, AF_INET, sha1_key=key,
1897 bfd_key_id=vpp_session.bfd_key_id)
1898 self.execute_rogue_session_scenario(
1899 vpp_session, legitimate_test_session, rogue_test_session,
1900 {'auth_type': BFDAuthType.keyed_md5})
1902 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1903 def test_restart(self):
1904 """ simulate remote peer restart and resynchronization """
1905 key = self.factory.create_random_key(
1906 self, BFDAuthType.meticulous_keyed_sha1)
1907 key.add_vpp_config()
1908 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1909 self.pg0.remote_ip4, sha1_key=key)
1910 self.vpp_session.add_vpp_config()
1911 self.test_session = BFDTestSession(
1912 self, self.pg0, AF_INET, sha1_key=key,
1913 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1914 bfd_session_up(self)
1915 # don't send any packets for 2*detection_time
1916 detection_time = self.test_session.detect_mult *\
1917 self.vpp_session.required_min_rx / USEC_IN_SEC
1918 self.sleep(2 * detection_time, "simulating peer restart")
1919 events = self.vapi.collect_events()
1920 self.assert_equal(len(events), 1, "number of bfd events")
1921 verify_event(self, events[0], expected_state=BFDState.down)
1922 self.test_session.update(state=BFDState.down)
1923 # reset sequence number
1924 self.test_session.our_seq_number = 0
1925 self.test_session.vpp_seq_number = None
1926 # now throw away any pending packets
1927 self.pg0.enable_capture()
1928 bfd_session_up(self)
1931 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1932 class BFDAuthOnOffTestCase(VppTestCase):
1933 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1940 def setUpClass(cls):
1941 super(BFDAuthOnOffTestCase, cls).setUpClass()
1943 cls.create_pg_interfaces([0])
1944 cls.pg0.config_ip4()
1946 cls.pg0.resolve_arp()
1949 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1953 super(BFDAuthOnOffTestCase, self).setUp()
1954 self.factory = AuthKeyFactory()
1955 self.vapi.want_bfd_events()
1956 self.pg0.enable_capture()
1959 if not self.vpp_dead:
1960 self.vapi.want_bfd_events(enable_disable=0)
1961 self.vapi.collect_events() # clear the event queue
1962 super(BFDAuthOnOffTestCase, self).tearDown()
1964 def test_auth_on_immediate(self):
1965 """ turn auth on without disturbing session state (immediate) """
1966 key = self.factory.create_random_key(self)
1967 key.add_vpp_config()
1968 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1969 self.pg0.remote_ip4)
1970 self.vpp_session.add_vpp_config()
1971 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1972 bfd_session_up(self)
1973 for dummy in range(self.test_session.detect_mult * 2):
1974 p = wait_for_bfd_packet(self)
1975 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1976 self.test_session.send_packet()
1977 self.vpp_session.activate_auth(key)
1978 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1979 self.test_session.sha1_key = key
1980 for dummy in range(self.test_session.detect_mult * 2):
1981 p = wait_for_bfd_packet(self)
1982 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1983 self.test_session.send_packet()
1984 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1985 self.assert_equal(len(self.vapi.collect_events()), 0,
1986 "number of bfd events")
1988 def test_auth_off_immediate(self):
1989 """ turn auth off without disturbing session state (immediate) """
1990 key = self.factory.create_random_key(self)
1991 key.add_vpp_config()
1992 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1993 self.pg0.remote_ip4, sha1_key=key)
1994 self.vpp_session.add_vpp_config()
1995 self.test_session = BFDTestSession(
1996 self, self.pg0, AF_INET, sha1_key=key,
1997 bfd_key_id=self.vpp_session.bfd_key_id)
1998 bfd_session_up(self)
1999 # self.vapi.want_bfd_events(enable_disable=0)
2000 for dummy in range(self.test_session.detect_mult * 2):
2001 p = wait_for_bfd_packet(self)
2002 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2003 self.test_session.inc_seq_num()
2004 self.test_session.send_packet()
2005 self.vpp_session.deactivate_auth()
2006 self.test_session.bfd_key_id = None
2007 self.test_session.sha1_key = None
2008 for dummy in range(self.test_session.detect_mult * 2):
2009 p = wait_for_bfd_packet(self)
2010 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2011 self.test_session.inc_seq_num()
2012 self.test_session.send_packet()
2013 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2014 self.assert_equal(len(self.vapi.collect_events()), 0,
2015 "number of bfd events")
2017 def test_auth_change_key_immediate(self):
2018 """ change auth key without disturbing session state (immediate) """
2019 key1 = self.factory.create_random_key(self)
2020 key1.add_vpp_config()
2021 key2 = self.factory.create_random_key(self)
2022 key2.add_vpp_config()
2023 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2024 self.pg0.remote_ip4, sha1_key=key1)
2025 self.vpp_session.add_vpp_config()
2026 self.test_session = BFDTestSession(
2027 self, self.pg0, AF_INET, sha1_key=key1,
2028 bfd_key_id=self.vpp_session.bfd_key_id)
2029 bfd_session_up(self)
2030 for dummy in range(self.test_session.detect_mult * 2):
2031 p = wait_for_bfd_packet(self)
2032 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2033 self.test_session.send_packet()
2034 self.vpp_session.activate_auth(key2)
2035 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2036 self.test_session.sha1_key = key2
2037 for dummy in range(self.test_session.detect_mult * 2):
2038 p = wait_for_bfd_packet(self)
2039 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2040 self.test_session.send_packet()
2041 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2042 self.assert_equal(len(self.vapi.collect_events()), 0,
2043 "number of bfd events")
2045 def test_auth_on_delayed(self):
2046 """ turn auth on without disturbing session state (delayed) """
2047 key = self.factory.create_random_key(self)
2048 key.add_vpp_config()
2049 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2050 self.pg0.remote_ip4)
2051 self.vpp_session.add_vpp_config()
2052 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2053 bfd_session_up(self)
2054 for dummy in range(self.test_session.detect_mult * 2):
2055 wait_for_bfd_packet(self)
2056 self.test_session.send_packet()
2057 self.vpp_session.activate_auth(key, delayed=True)
2058 for dummy in range(self.test_session.detect_mult * 2):
2059 p = wait_for_bfd_packet(self)
2060 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2061 self.test_session.send_packet()
2062 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2063 self.test_session.sha1_key = key
2064 self.test_session.send_packet()
2065 for dummy in range(self.test_session.detect_mult * 2):
2066 p = wait_for_bfd_packet(self)
2067 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2068 self.test_session.send_packet()
2069 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2070 self.assert_equal(len(self.vapi.collect_events()), 0,
2071 "number of bfd events")
2073 def test_auth_off_delayed(self):
2074 """ turn auth off without disturbing session state (delayed) """
2075 key = self.factory.create_random_key(self)
2076 key.add_vpp_config()
2077 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2078 self.pg0.remote_ip4, sha1_key=key)
2079 self.vpp_session.add_vpp_config()
2080 self.test_session = BFDTestSession(
2081 self, self.pg0, AF_INET, sha1_key=key,
2082 bfd_key_id=self.vpp_session.bfd_key_id)
2083 bfd_session_up(self)
2084 for dummy in range(self.test_session.detect_mult * 2):
2085 p = wait_for_bfd_packet(self)
2086 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2087 self.test_session.send_packet()
2088 self.vpp_session.deactivate_auth(delayed=True)
2089 for dummy in range(self.test_session.detect_mult * 2):
2090 p = wait_for_bfd_packet(self)
2091 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2092 self.test_session.send_packet()
2093 self.test_session.bfd_key_id = None
2094 self.test_session.sha1_key = None
2095 self.test_session.send_packet()
2096 for dummy in range(self.test_session.detect_mult * 2):
2097 p = wait_for_bfd_packet(self)
2098 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2099 self.test_session.send_packet()
2100 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2101 self.assert_equal(len(self.vapi.collect_events()), 0,
2102 "number of bfd events")
2104 def test_auth_change_key_delayed(self):
2105 """ change auth key without disturbing session state (delayed) """
2106 key1 = self.factory.create_random_key(self)
2107 key1.add_vpp_config()
2108 key2 = self.factory.create_random_key(self)
2109 key2.add_vpp_config()
2110 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2111 self.pg0.remote_ip4, sha1_key=key1)
2112 self.vpp_session.add_vpp_config()
2113 self.vpp_session.admin_up()
2114 self.test_session = BFDTestSession(
2115 self, self.pg0, AF_INET, sha1_key=key1,
2116 bfd_key_id=self.vpp_session.bfd_key_id)
2117 bfd_session_up(self)
2118 for dummy in range(self.test_session.detect_mult * 2):
2119 p = wait_for_bfd_packet(self)
2120 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2121 self.test_session.send_packet()
2122 self.vpp_session.activate_auth(key2, delayed=True)
2123 for dummy in range(self.test_session.detect_mult * 2):
2124 p = wait_for_bfd_packet(self)
2125 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2126 self.test_session.send_packet()
2127 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2128 self.test_session.sha1_key = key2
2129 self.test_session.send_packet()
2130 for dummy in range(self.test_session.detect_mult * 2):
2131 p = wait_for_bfd_packet(self)
2132 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2133 self.test_session.send_packet()
2134 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2135 self.assert_equal(len(self.vapi.collect_events()), 0,
2136 "number of bfd events")
2139 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2140 class BFDCLITestCase(VppTestCase):
2141 """Bidirectional Forwarding Detection (BFD) (CLI) """
2145 def setUpClass(cls):
2146 super(BFDCLITestCase, cls).setUpClass()
2149 cls.create_pg_interfaces((0,))
2150 cls.pg0.config_ip4()
2151 cls.pg0.config_ip6()
2152 cls.pg0.resolve_arp()
2153 cls.pg0.resolve_ndp()
2156 super(BFDCLITestCase, cls).tearDownClass()
2160 super(BFDCLITestCase, self).setUp()
2161 self.factory = AuthKeyFactory()
2162 self.pg0.enable_capture()
2166 self.vapi.want_bfd_events(enable_disable=0)
2167 except UnexpectedApiReturnValueError:
2168 # some tests aren't subscribed, so this is not an issue
2170 self.vapi.collect_events() # clear the event queue
2171 super(BFDCLITestCase, self).tearDown()
2173 def cli_verify_no_response(self, cli):
2174 """ execute a CLI, asserting that the response is empty """
2175 self.assert_equal(self.vapi.cli(cli),
2177 "CLI command response")
2179 def cli_verify_response(self, cli, expected):
2180 """ execute a CLI, asserting that the response matches expectation """
2181 self.assert_equal(self.vapi.cli(cli).strip(),
2183 "CLI command response")
2185 def test_show(self):
2186 """ show commands """
2187 k1 = self.factory.create_random_key(self)
2189 k2 = self.factory.create_random_key(
2190 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2192 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2194 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2197 self.logger.info(self.vapi.ppcli("show bfd keys"))
2198 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2199 self.logger.info(self.vapi.ppcli("show bfd"))
2201 def test_set_del_sha1_key(self):
2202 """ set/delete SHA1 auth key """
2203 k = self.factory.create_random_key(self)
2204 self.registry.register(k, self.logger)
2205 self.cli_verify_no_response(
2206 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2208 "".join("{:02x}".format(ord(c)) for c in k.key)))
2209 self.assertTrue(k.query_vpp_config())
2210 self.vpp_session = VppBFDUDPSession(
2211 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2212 self.vpp_session.add_vpp_config()
2213 self.test_session = \
2214 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2215 bfd_key_id=self.vpp_session.bfd_key_id)
2216 self.vapi.want_bfd_events()
2217 bfd_session_up(self)
2218 bfd_session_down(self)
2219 # try to replace the secret for the key - should fail because the key
2221 k2 = self.factory.create_random_key(self)
2222 self.cli_verify_response(
2223 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2225 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2226 "bfd key set: `bfd_auth_set_key' API call failed, "
2227 "rv=-103:BFD object in use")
2228 # manipulating the session using old secret should still work
2229 bfd_session_up(self)
2230 bfd_session_down(self)
2231 self.vpp_session.remove_vpp_config()
2232 self.cli_verify_no_response(
2233 "bfd key del conf-key-id %s" % k.conf_key_id)
2234 self.assertFalse(k.query_vpp_config())
2236 def test_set_del_meticulous_sha1_key(self):
2237 """ set/delete meticulous SHA1 auth key """
2238 k = self.factory.create_random_key(
2239 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2240 self.registry.register(k, self.logger)
2241 self.cli_verify_no_response(
2242 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2244 "".join("{:02x}".format(ord(c)) for c in k.key)))
2245 self.assertTrue(k.query_vpp_config())
2246 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2247 self.pg0.remote_ip6, af=AF_INET6,
2249 self.vpp_session.add_vpp_config()
2250 self.vpp_session.admin_up()
2251 self.test_session = \
2252 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2253 bfd_key_id=self.vpp_session.bfd_key_id)
2254 self.vapi.want_bfd_events()
2255 bfd_session_up(self)
2256 bfd_session_down(self)
2257 # try to replace the secret for the key - should fail because the key
2259 k2 = self.factory.create_random_key(self)
2260 self.cli_verify_response(
2261 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2263 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2264 "bfd key set: `bfd_auth_set_key' API call failed, "
2265 "rv=-103:BFD object in use")
2266 # manipulating the session using old secret should still work
2267 bfd_session_up(self)
2268 bfd_session_down(self)
2269 self.vpp_session.remove_vpp_config()
2270 self.cli_verify_no_response(
2271 "bfd key del conf-key-id %s" % k.conf_key_id)
2272 self.assertFalse(k.query_vpp_config())
2274 def test_add_mod_del_bfd_udp(self):
2275 """ create/modify/delete IPv4 BFD UDP session """
2276 vpp_session = VppBFDUDPSession(
2277 self, self.pg0, self.pg0.remote_ip4)
2278 self.registry.register(vpp_session, self.logger)
2279 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2280 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2281 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2282 self.pg0.remote_ip4,
2283 vpp_session.desired_min_tx,
2284 vpp_session.required_min_rx,
2285 vpp_session.detect_mult)
2286 self.cli_verify_no_response(cli_add_cmd)
2287 # 2nd add should fail
2288 self.cli_verify_response(
2290 "bfd udp session add: `bfd_add_add_session' API call"
2291 " failed, rv=-101:Duplicate BFD object")
2292 verify_bfd_session_config(self, vpp_session)
2293 mod_session = VppBFDUDPSession(
2294 self, self.pg0, self.pg0.remote_ip4,
2295 required_min_rx=2 * vpp_session.required_min_rx,
2296 desired_min_tx=3 * vpp_session.desired_min_tx,
2297 detect_mult=4 * vpp_session.detect_mult)
2298 self.cli_verify_no_response(
2299 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2300 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2301 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2302 mod_session.desired_min_tx, mod_session.required_min_rx,
2303 mod_session.detect_mult))
2304 verify_bfd_session_config(self, mod_session)
2305 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2306 "peer-addr %s" % (self.pg0.name,
2307 self.pg0.local_ip4, self.pg0.remote_ip4)
2308 self.cli_verify_no_response(cli_del_cmd)
2309 # 2nd del is expected to fail
2310 self.cli_verify_response(
2311 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2312 " failed, rv=-102:No such BFD object")
2313 self.assertFalse(vpp_session.query_vpp_config())
2315 def test_add_mod_del_bfd_udp6(self):
2316 """ create/modify/delete IPv6 BFD UDP session """
2317 vpp_session = VppBFDUDPSession(
2318 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2319 self.registry.register(vpp_session, self.logger)
2320 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2321 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2322 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2323 self.pg0.remote_ip6,
2324 vpp_session.desired_min_tx,
2325 vpp_session.required_min_rx,
2326 vpp_session.detect_mult)
2327 self.cli_verify_no_response(cli_add_cmd)
2328 # 2nd add should fail
2329 self.cli_verify_response(
2331 "bfd udp session add: `bfd_add_add_session' API call"
2332 " failed, rv=-101:Duplicate BFD object")
2333 verify_bfd_session_config(self, vpp_session)
2334 mod_session = VppBFDUDPSession(
2335 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2336 required_min_rx=2 * vpp_session.required_min_rx,
2337 desired_min_tx=3 * vpp_session.desired_min_tx,
2338 detect_mult=4 * vpp_session.detect_mult)
2339 self.cli_verify_no_response(
2340 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2341 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2342 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2343 mod_session.desired_min_tx,
2344 mod_session.required_min_rx, mod_session.detect_mult))
2345 verify_bfd_session_config(self, mod_session)
2346 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2347 "peer-addr %s" % (self.pg0.name,
2348 self.pg0.local_ip6, self.pg0.remote_ip6)
2349 self.cli_verify_no_response(cli_del_cmd)
2350 # 2nd del is expected to fail
2351 self.cli_verify_response(
2353 "bfd udp session del: `bfd_udp_del_session' API call"
2354 " failed, rv=-102:No such BFD object")
2355 self.assertFalse(vpp_session.query_vpp_config())
2357 def test_add_mod_del_bfd_udp_auth(self):
2358 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2359 key = self.factory.create_random_key(self)
2360 key.add_vpp_config()
2361 vpp_session = VppBFDUDPSession(
2362 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2363 self.registry.register(vpp_session, self.logger)
2364 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2365 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2366 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2367 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2368 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2369 vpp_session.detect_mult, key.conf_key_id,
2370 vpp_session.bfd_key_id)
2371 self.cli_verify_no_response(cli_add_cmd)
2372 # 2nd add should fail
2373 self.cli_verify_response(
2375 "bfd udp session add: `bfd_add_add_session' API call"
2376 " failed, rv=-101:Duplicate BFD object")
2377 verify_bfd_session_config(self, vpp_session)
2378 mod_session = VppBFDUDPSession(
2379 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2380 bfd_key_id=vpp_session.bfd_key_id,
2381 required_min_rx=2 * vpp_session.required_min_rx,
2382 desired_min_tx=3 * vpp_session.desired_min_tx,
2383 detect_mult=4 * vpp_session.detect_mult)
2384 self.cli_verify_no_response(
2385 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2386 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2387 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2388 mod_session.desired_min_tx,
2389 mod_session.required_min_rx, mod_session.detect_mult))
2390 verify_bfd_session_config(self, mod_session)
2391 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2392 "peer-addr %s" % (self.pg0.name,
2393 self.pg0.local_ip4, self.pg0.remote_ip4)
2394 self.cli_verify_no_response(cli_del_cmd)
2395 # 2nd del is expected to fail
2396 self.cli_verify_response(
2398 "bfd udp session del: `bfd_udp_del_session' API call"
2399 " failed, rv=-102:No such BFD object")
2400 self.assertFalse(vpp_session.query_vpp_config())
2402 def test_add_mod_del_bfd_udp6_auth(self):
2403 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2404 key = self.factory.create_random_key(
2405 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2406 key.add_vpp_config()
2407 vpp_session = VppBFDUDPSession(
2408 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2409 self.registry.register(vpp_session, self.logger)
2410 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2411 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2412 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2413 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2414 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2415 vpp_session.detect_mult, key.conf_key_id,
2416 vpp_session.bfd_key_id)
2417 self.cli_verify_no_response(cli_add_cmd)
2418 # 2nd add should fail
2419 self.cli_verify_response(
2421 "bfd udp session add: `bfd_add_add_session' API call"
2422 " failed, rv=-101:Duplicate BFD object")
2423 verify_bfd_session_config(self, vpp_session)
2424 mod_session = VppBFDUDPSession(
2425 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2426 bfd_key_id=vpp_session.bfd_key_id,
2427 required_min_rx=2 * vpp_session.required_min_rx,
2428 desired_min_tx=3 * vpp_session.desired_min_tx,
2429 detect_mult=4 * vpp_session.detect_mult)
2430 self.cli_verify_no_response(
2431 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2432 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2433 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2434 mod_session.desired_min_tx,
2435 mod_session.required_min_rx, mod_session.detect_mult))
2436 verify_bfd_session_config(self, mod_session)
2437 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2438 "peer-addr %s" % (self.pg0.name,
2439 self.pg0.local_ip6, self.pg0.remote_ip6)
2440 self.cli_verify_no_response(cli_del_cmd)
2441 # 2nd del is expected to fail
2442 self.cli_verify_response(
2444 "bfd udp session del: `bfd_udp_del_session' API call"
2445 " failed, rv=-102:No such BFD object")
2446 self.assertFalse(vpp_session.query_vpp_config())
2448 def test_auth_on_off(self):
2449 """ turn authentication on and off """
2450 key = self.factory.create_random_key(
2451 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2452 key.add_vpp_config()
2453 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2454 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2456 session.add_vpp_config()
2458 "bfd udp session auth activate interface %s local-addr %s "\
2459 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2460 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2461 key.conf_key_id, auth_session.bfd_key_id)
2462 self.cli_verify_no_response(cli_activate)
2463 verify_bfd_session_config(self, auth_session)
2464 self.cli_verify_no_response(cli_activate)
2465 verify_bfd_session_config(self, auth_session)
2467 "bfd udp session auth deactivate interface %s local-addr %s "\
2469 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2470 self.cli_verify_no_response(cli_deactivate)
2471 verify_bfd_session_config(self, session)
2472 self.cli_verify_no_response(cli_deactivate)
2473 verify_bfd_session_config(self, session)
2475 def test_auth_on_off_delayed(self):
2476 """ turn authentication on and off (delayed) """
2477 key = self.factory.create_random_key(
2478 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2479 key.add_vpp_config()
2480 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2481 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2483 session.add_vpp_config()
2485 "bfd udp session auth activate interface %s local-addr %s "\
2486 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2487 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2488 key.conf_key_id, auth_session.bfd_key_id)
2489 self.cli_verify_no_response(cli_activate)
2490 verify_bfd_session_config(self, auth_session)
2491 self.cli_verify_no_response(cli_activate)
2492 verify_bfd_session_config(self, auth_session)
2494 "bfd udp session auth deactivate interface %s local-addr %s "\
2495 "peer-addr %s delayed yes"\
2496 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2497 self.cli_verify_no_response(cli_deactivate)
2498 verify_bfd_session_config(self, session)
2499 self.cli_verify_no_response(cli_deactivate)
2500 verify_bfd_session_config(self, session)
2502 def test_admin_up_down(self):
2503 """ put session admin-up and admin-down """
2504 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2505 session.add_vpp_config()
2507 "bfd udp session set-flags admin down interface %s local-addr %s "\
2509 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2511 "bfd udp session set-flags admin up interface %s local-addr %s "\
2513 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2514 self.cli_verify_no_response(cli_down)
2515 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2516 self.cli_verify_no_response(cli_up)
2517 verify_bfd_session_config(self, session, state=BFDState.down)
2519 def test_set_del_udp_echo_source(self):
2520 """ set/del udp echo source """
2521 self.create_loopback_interfaces([0])
2522 self.loopback0 = self.lo_interfaces[0]
2523 self.loopback0.admin_up()
2524 self.cli_verify_response("show bfd echo-source",
2525 "UDP echo source is not set.")
2526 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2527 self.cli_verify_no_response(cli_set)
2528 self.cli_verify_response("show bfd echo-source",
2529 "UDP echo source is: %s\n"
2530 "IPv4 address usable as echo source: none\n"
2531 "IPv6 address usable as echo source: none" %
2532 self.loopback0.name)
2533 self.loopback0.config_ip4()
2534 unpacked = unpack("!L", self.loopback0.local_ip4n)
2535 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2536 self.cli_verify_response("show bfd echo-source",
2537 "UDP echo source is: %s\n"
2538 "IPv4 address usable as echo source: %s\n"
2539 "IPv6 address usable as echo source: none" %
2540 (self.loopback0.name, echo_ip4))
2541 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2542 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2543 unpacked[2], unpacked[3] ^ 1))
2544 self.loopback0.config_ip6()
2545 self.cli_verify_response("show bfd echo-source",
2546 "UDP echo source is: %s\n"
2547 "IPv4 address usable as echo source: %s\n"
2548 "IPv6 address usable as echo source: %s" %
2549 (self.loopback0.name, echo_ip4, echo_ip6))
2550 cli_del = "bfd udp echo-source del"
2551 self.cli_verify_no_response(cli_del)
2552 self.cli_verify_response("show bfd echo-source",
2553 "UDP echo source is not set.")
2555 if __name__ == '__main__':
2556 unittest.main(testRunner=VppTestRunner)