4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import UnexpectedApiReturnValueError
23 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
28 class AuthKeyFactory(object):
29 """Factory class for creating auth keys with unique conf key ID"""
32 self._conf_key_ids = {}
34 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
35 """ create a random key with unique conf key id """
36 conf_key_id = randint(0, 0xFFFFFFFF)
37 while conf_key_id in self._conf_key_ids:
38 conf_key_id = randint(0, 0xFFFFFFFF)
39 self._conf_key_ids[conf_key_id] = 1
40 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
41 return VppBFDAuthKey(test=test, auth_type=auth_type,
42 conf_key_id=conf_key_id, key=key)
45 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
46 class BFDAPITestCase(VppTestCase):
47 """Bidirectional Forwarding Detection (BFD) - API"""
54 super(BFDAPITestCase, cls).setUpClass()
55 cls.vapi.cli("set log class bfd level debug")
57 cls.create_pg_interfaces(range(2))
58 for i in cls.pg_interfaces:
64 super(BFDAPITestCase, cls).tearDownClass()
68 super(BFDAPITestCase, self).setUp()
69 self.factory = AuthKeyFactory()
71 def test_add_bfd(self):
72 """ create a BFD session """
73 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
74 session.add_vpp_config()
75 self.logger.debug("Session state is %s", session.state)
76 session.remove_vpp_config()
77 session.add_vpp_config()
78 self.logger.debug("Session state is %s", session.state)
79 session.remove_vpp_config()
81 def test_double_add(self):
82 """ create the same BFD session twice (negative case) """
83 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
84 session.add_vpp_config()
86 with self.vapi.expect_negative_api_retval():
87 session.add_vpp_config()
89 session.remove_vpp_config()
91 def test_add_bfd6(self):
92 """ create IPv6 BFD session """
93 session = VppBFDUDPSession(
94 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
95 session.add_vpp_config()
96 self.logger.debug("Session state is %s", session.state)
97 session.remove_vpp_config()
98 session.add_vpp_config()
99 self.logger.debug("Session state is %s", session.state)
100 session.remove_vpp_config()
102 def test_mod_bfd(self):
103 """ modify BFD session parameters """
104 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
105 desired_min_tx=50000,
106 required_min_rx=10000,
108 session.add_vpp_config()
109 s = session.get_bfd_udp_session_dump_entry()
110 self.assert_equal(session.desired_min_tx,
112 "desired min transmit interval")
113 self.assert_equal(session.required_min_rx,
115 "required min receive interval")
116 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
117 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
118 required_min_rx=session.required_min_rx * 2,
119 detect_mult=session.detect_mult * 2)
120 s = session.get_bfd_udp_session_dump_entry()
121 self.assert_equal(session.desired_min_tx,
123 "desired min transmit interval")
124 self.assert_equal(session.required_min_rx,
126 "required min receive interval")
127 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129 def test_add_sha1_keys(self):
130 """ add SHA1 keys """
132 keys = [self.factory.create_random_key(
133 self) for i in range(0, key_count)]
135 self.assertFalse(key.query_vpp_config())
139 self.assertTrue(key.query_vpp_config())
141 indexes = range(key_count)
146 key.remove_vpp_config()
148 for j in range(key_count):
151 self.assertFalse(key.query_vpp_config())
153 self.assertTrue(key.query_vpp_config())
154 # should be removed now
156 self.assertFalse(key.query_vpp_config())
157 # add back and remove again
161 self.assertTrue(key.query_vpp_config())
163 key.remove_vpp_config()
165 self.assertFalse(key.query_vpp_config())
167 def test_add_bfd_sha1(self):
168 """ create a BFD session (SHA1) """
169 key = self.factory.create_random_key(self)
171 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
173 session.add_vpp_config()
174 self.logger.debug("Session state is %s", session.state)
175 session.remove_vpp_config()
176 session.add_vpp_config()
177 self.logger.debug("Session state is %s", session.state)
178 session.remove_vpp_config()
180 def test_double_add_sha1(self):
181 """ create the same BFD session twice (negative case) (SHA1) """
182 key = self.factory.create_random_key(self)
184 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186 session.add_vpp_config()
187 with self.assertRaises(Exception):
188 session.add_vpp_config()
190 def test_add_auth_nonexistent_key(self):
191 """ create BFD session using non-existent SHA1 (negative case) """
192 session = VppBFDUDPSession(
193 self, self.pg0, self.pg0.remote_ip4,
194 sha1_key=self.factory.create_random_key(self))
195 with self.assertRaises(Exception):
196 session.add_vpp_config()
198 def test_shared_sha1_key(self):
199 """ share single SHA1 key between multiple BFD sessions """
200 key = self.factory.create_random_key(self)
203 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
205 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
206 sha1_key=key, af=AF_INET6),
207 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
209 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
210 sha1_key=key, af=AF_INET6)]
215 e = key.get_bfd_auth_keys_dump_entry()
216 self.assert_equal(e.use_count, len(sessions) - removed,
217 "Use count for shared key")
218 s.remove_vpp_config()
220 e = key.get_bfd_auth_keys_dump_entry()
221 self.assert_equal(e.use_count, len(sessions) - removed,
222 "Use count for shared key")
224 def test_activate_auth(self):
225 """ activate SHA1 authentication """
226 key = self.factory.create_random_key(self)
228 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
229 session.add_vpp_config()
230 session.activate_auth(key)
232 def test_deactivate_auth(self):
233 """ deactivate SHA1 authentication """
234 key = self.factory.create_random_key(self)
236 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
237 session.add_vpp_config()
238 session.activate_auth(key)
239 session.deactivate_auth()
241 def test_change_key(self):
242 """ change SHA1 key """
243 key1 = self.factory.create_random_key(self)
244 key2 = self.factory.create_random_key(self)
245 while key2.conf_key_id == key1.conf_key_id:
246 key2 = self.factory.create_random_key(self)
247 key1.add_vpp_config()
248 key2.add_vpp_config()
249 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
251 session.add_vpp_config()
252 session.activate_auth(key2)
255 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
256 class BFDTestSession(object):
257 """ BFD session as seen from test framework side """
259 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
260 bfd_key_id=None, our_seq_number=None):
263 self.sha1_key = sha1_key
264 self.bfd_key_id = bfd_key_id
265 self.interface = interface
266 self.udp_sport = randint(49152, 65535)
267 if our_seq_number is None:
268 self.our_seq_number = randint(0, 40000000)
270 self.our_seq_number = our_seq_number
271 self.vpp_seq_number = None
272 self.my_discriminator = 0
273 self.desired_min_tx = 300000
274 self.required_min_rx = 300000
275 self.required_min_echo_rx = None
276 self.detect_mult = detect_mult
277 self.diag = BFDDiagCode.no_diagnostic
278 self.your_discriminator = None
279 self.state = BFDState.down
280 self.auth_type = BFDAuthType.no_auth
282 def inc_seq_num(self):
283 """ increment sequence number, wrapping if needed """
284 if self.our_seq_number == 0xFFFFFFFF:
285 self.our_seq_number = 0
287 self.our_seq_number += 1
289 def update(self, my_discriminator=None, your_discriminator=None,
290 desired_min_tx=None, required_min_rx=None,
291 required_min_echo_rx=None, detect_mult=None,
292 diag=None, state=None, auth_type=None):
293 """ update BFD parameters associated with session """
294 if my_discriminator is not None:
295 self.my_discriminator = my_discriminator
296 if your_discriminator is not None:
297 self.your_discriminator = your_discriminator
298 if required_min_rx is not None:
299 self.required_min_rx = required_min_rx
300 if required_min_echo_rx is not None:
301 self.required_min_echo_rx = required_min_echo_rx
302 if desired_min_tx is not None:
303 self.desired_min_tx = desired_min_tx
304 if detect_mult is not None:
305 self.detect_mult = detect_mult
308 if state is not None:
310 if auth_type is not None:
311 self.auth_type = auth_type
313 def fill_packet_fields(self, packet):
314 """ set packet fields with known values in packet """
316 if self.my_discriminator:
317 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
318 self.my_discriminator)
319 bfd.my_discriminator = self.my_discriminator
320 if self.your_discriminator:
321 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
322 self.your_discriminator)
323 bfd.your_discriminator = self.your_discriminator
324 if self.required_min_rx:
325 self.test.logger.debug(
326 "BFD: setting packet.required_min_rx_interval=%s",
327 self.required_min_rx)
328 bfd.required_min_rx_interval = self.required_min_rx
329 if self.required_min_echo_rx:
330 self.test.logger.debug(
331 "BFD: setting packet.required_min_echo_rx=%s",
332 self.required_min_echo_rx)
333 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
334 if self.desired_min_tx:
335 self.test.logger.debug(
336 "BFD: setting packet.desired_min_tx_interval=%s",
338 bfd.desired_min_tx_interval = self.desired_min_tx
340 self.test.logger.debug(
341 "BFD: setting packet.detect_mult=%s", self.detect_mult)
342 bfd.detect_mult = self.detect_mult
344 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
347 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
348 bfd.state = self.state
350 # this is used by a negative test-case
351 self.test.logger.debug("BFD: setting packet.auth_type=%s",
353 bfd.auth_type = self.auth_type
355 def create_packet(self):
356 """ create a BFD packet, reflecting the current state of session """
359 bfd.auth_type = self.sha1_key.auth_type
360 bfd.auth_len = BFD.sha1_auth_len
361 bfd.auth_key_id = self.bfd_key_id
362 bfd.auth_seq_num = self.our_seq_number
363 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
366 if self.af == AF_INET6:
367 packet = (Ether(src=self.interface.remote_mac,
368 dst=self.interface.local_mac) /
369 IPv6(src=self.interface.remote_ip6,
370 dst=self.interface.local_ip6,
372 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
375 packet = (Ether(src=self.interface.remote_mac,
376 dst=self.interface.local_mac) /
377 IP(src=self.interface.remote_ip4,
378 dst=self.interface.local_ip4,
380 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
382 self.test.logger.debug("BFD: Creating packet")
383 self.fill_packet_fields(packet)
385 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
386 "\0" * (20 - len(self.sha1_key.key))
387 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
388 hashlib.sha1(hash_material).hexdigest())
389 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
392 def send_packet(self, packet=None, interface=None):
393 """ send packet on interface, creating the packet if needed """
395 packet = self.create_packet()
396 if interface is None:
397 interface = self.test.pg0
398 self.test.logger.debug(ppp("Sending packet:", packet))
399 interface.add_stream(packet)
402 def verify_sha1_auth(self, packet):
403 """ Verify correctness of authentication in BFD layer. """
405 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
406 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
408 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
409 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
410 if self.vpp_seq_number is None:
411 self.vpp_seq_number = bfd.auth_seq_num
412 self.test.logger.debug("Received initial sequence number: %s" %
415 recvd_seq_num = bfd.auth_seq_num
416 self.test.logger.debug("Received followup sequence number: %s" %
418 if self.vpp_seq_number < 0xffffffff:
419 if self.sha1_key.auth_type == \
420 BFDAuthType.meticulous_keyed_sha1:
421 self.test.assert_equal(recvd_seq_num,
422 self.vpp_seq_number + 1,
423 "BFD sequence number")
425 self.test.assert_in_range(recvd_seq_num,
427 self.vpp_seq_number + 1,
428 "BFD sequence number")
430 if self.sha1_key.auth_type == \
431 BFDAuthType.meticulous_keyed_sha1:
432 self.test.assert_equal(recvd_seq_num, 0,
433 "BFD sequence number")
435 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
436 "BFD sequence number not one of "
437 "(%s, 0)" % self.vpp_seq_number)
438 self.vpp_seq_number = recvd_seq_num
439 # last 20 bytes represent the hash - so replace them with the key,
440 # pad the result with zeros and hash the result
441 hash_material = bfd.original[:-20] + self.sha1_key.key + \
442 "\0" * (20 - len(self.sha1_key.key))
443 expected_hash = hashlib.sha1(hash_material).hexdigest()
444 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
445 expected_hash, "Auth key hash")
447 def verify_bfd(self, packet):
448 """ Verify correctness of BFD layer. """
450 self.test.assert_equal(bfd.version, 1, "BFD version")
451 self.test.assert_equal(bfd.your_discriminator,
452 self.my_discriminator,
453 "BFD - your discriminator")
455 self.verify_sha1_auth(packet)
458 def bfd_session_up(test):
459 """ Bring BFD session up """
460 test.logger.info("BFD: Waiting for slow hello")
461 p = wait_for_bfd_packet(test, 2)
463 if hasattr(test, 'vpp_clock_offset'):
464 old_offset = test.vpp_clock_offset
465 test.vpp_clock_offset = time.time() - p.time
466 test.logger.debug("BFD: Calculated vpp clock offset: %s",
467 test.vpp_clock_offset)
469 test.assertAlmostEqual(
470 old_offset, test.vpp_clock_offset, delta=0.5,
471 msg="vpp clock offset not stable (new: %s, old: %s)" %
472 (test.vpp_clock_offset, old_offset))
473 test.logger.info("BFD: Sending Init")
474 test.test_session.update(my_discriminator=randint(0, 40000000),
475 your_discriminator=p[BFD].my_discriminator,
477 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
478 BFDAuthType.meticulous_keyed_sha1:
479 test.test_session.inc_seq_num()
480 test.test_session.send_packet()
481 test.logger.info("BFD: Waiting for event")
482 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
483 verify_event(test, e, expected_state=BFDState.up)
484 test.logger.info("BFD: Session is Up")
485 test.test_session.update(state=BFDState.up)
486 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
487 BFDAuthType.meticulous_keyed_sha1:
488 test.test_session.inc_seq_num()
489 test.test_session.send_packet()
490 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
493 def bfd_session_down(test):
494 """ Bring BFD session down """
495 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
496 test.test_session.update(state=BFDState.down)
497 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
498 BFDAuthType.meticulous_keyed_sha1:
499 test.test_session.inc_seq_num()
500 test.test_session.send_packet()
501 test.logger.info("BFD: Waiting for event")
502 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
503 verify_event(test, e, expected_state=BFDState.down)
504 test.logger.info("BFD: Session is Down")
505 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
508 def verify_bfd_session_config(test, session, state=None):
509 dump = session.get_bfd_udp_session_dump_entry()
510 test.assertIsNotNone(dump)
511 # since dump is not none, we have verified that sw_if_index and addresses
512 # are valid (in get_bfd_udp_session_dump_entry)
514 test.assert_equal(dump.state, state, "session state")
515 test.assert_equal(dump.required_min_rx, session.required_min_rx,
516 "required min rx interval")
517 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
518 "desired min tx interval")
519 test.assert_equal(dump.detect_mult, session.detect_mult,
521 if session.sha1_key is None:
522 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
524 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
525 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
527 test.assert_equal(dump.conf_key_id,
528 session.sha1_key.conf_key_id,
532 def verify_ip(test, packet):
533 """ Verify correctness of IP layer. """
534 if test.vpp_session.af == AF_INET6:
536 local_ip = test.pg0.local_ip6
537 remote_ip = test.pg0.remote_ip6
538 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
541 local_ip = test.pg0.local_ip4
542 remote_ip = test.pg0.remote_ip4
543 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
544 test.assert_equal(ip.src, local_ip, "IP source address")
545 test.assert_equal(ip.dst, remote_ip, "IP destination address")
548 def verify_udp(test, packet):
549 """ Verify correctness of UDP layer. """
551 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
552 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
556 def verify_event(test, event, expected_state):
557 """ Verify correctness of event values. """
559 test.logger.debug("BFD: Event: %s" % repr(e))
560 test.assert_equal(e.sw_if_index,
561 test.vpp_session.interface.sw_if_index,
562 "BFD interface index")
564 if test.vpp_session.af == AF_INET6:
566 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
567 if test.vpp_session.af == AF_INET:
568 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
569 "Local IPv4 address")
570 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
573 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
574 "Local IPv6 address")
575 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
577 test.assert_equal(e.state, expected_state, BFDState)
580 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
581 """ wait for BFD packet and verify its correctness
583 :param timeout: how long to wait
584 :param pcap_time_min: ignore packets with pcap timestamp lower than this
586 :returns: tuple (packet, time spent waiting for packet)
588 test.logger.info("BFD: Waiting for BFD packet")
589 deadline = time.time() + timeout
594 test.assert_in_range(counter, 0, 100, "number of packets ignored")
595 time_left = deadline - time.time()
597 raise CaptureTimeoutError("Packet did not arrive within timeout")
598 p = test.pg0.wait_for_packet(timeout=time_left)
599 test.logger.debug(ppp("BFD: Got packet:", p))
600 if pcap_time_min is not None and p.time < pcap_time_min:
601 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
602 "pcap time min %s):" %
603 (p.time, pcap_time_min), p))
608 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
610 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
613 test.test_session.verify_bfd(p)
617 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
618 class BFD4TestCase(VppTestCase):
619 """Bidirectional Forwarding Detection (BFD)"""
622 vpp_clock_offset = None
628 super(BFD4TestCase, cls).setUpClass()
629 cls.vapi.cli("set log class bfd level debug")
631 cls.create_pg_interfaces([0])
632 cls.create_loopback_interfaces([0])
633 cls.loopback0 = cls.lo_interfaces[0]
634 cls.loopback0.config_ip4()
635 cls.loopback0.admin_up()
637 cls.pg0.configure_ipv4_neighbors()
639 cls.pg0.resolve_arp()
642 super(BFD4TestCase, cls).tearDownClass()
646 super(BFD4TestCase, self).setUp()
647 self.factory = AuthKeyFactory()
648 self.vapi.want_bfd_events()
649 self.pg0.enable_capture()
651 self.vpp_session = VppBFDUDPSession(self, self.pg0,
653 self.vpp_session.add_vpp_config()
654 self.vpp_session.admin_up()
655 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
657 self.vapi.want_bfd_events(enable_disable=0)
661 if not self.vpp_dead:
662 self.vapi.want_bfd_events(enable_disable=0)
663 self.vapi.collect_events() # clear the event queue
664 super(BFD4TestCase, self).tearDown()
666 def test_session_up(self):
667 """ bring BFD session up """
670 def test_session_up_by_ip(self):
671 """ bring BFD session up - first frame looked up by address pair """
672 self.logger.info("BFD: Sending Slow control frame")
673 self.test_session.update(my_discriminator=randint(0, 40000000))
674 self.test_session.send_packet()
675 self.pg0.enable_capture()
676 p = self.pg0.wait_for_packet(1)
677 self.assert_equal(p[BFD].your_discriminator,
678 self.test_session.my_discriminator,
679 "BFD - your discriminator")
680 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
681 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
683 self.logger.info("BFD: Waiting for event")
684 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
685 verify_event(self, e, expected_state=BFDState.init)
686 self.logger.info("BFD: Sending Up")
687 self.test_session.send_packet()
688 self.logger.info("BFD: Waiting for event")
689 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
690 verify_event(self, e, expected_state=BFDState.up)
691 self.logger.info("BFD: Session is Up")
692 self.test_session.update(state=BFDState.up)
693 self.test_session.send_packet()
694 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
696 def test_session_down(self):
697 """ bring BFD session down """
699 bfd_session_down(self)
701 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
702 def test_hold_up(self):
703 """ hold BFD session up """
705 for dummy in range(self.test_session.detect_mult * 2):
706 wait_for_bfd_packet(self)
707 self.test_session.send_packet()
708 self.assert_equal(len(self.vapi.collect_events()), 0,
709 "number of bfd events")
711 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
712 def test_slow_timer(self):
713 """ verify slow periodic control frames while session down """
715 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
716 prev_packet = wait_for_bfd_packet(self, 2)
717 for dummy in range(packet_count):
718 next_packet = wait_for_bfd_packet(self, 2)
719 time_diff = next_packet.time - prev_packet.time
720 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
721 # to work around timing issues
722 self.assert_in_range(
723 time_diff, 0.70, 1.05, "time between slow packets")
724 prev_packet = next_packet
726 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
727 def test_zero_remote_min_rx(self):
728 """ no packets when zero remote required min rx interval """
730 self.test_session.update(required_min_rx=0)
731 self.test_session.send_packet()
732 for dummy in range(self.test_session.detect_mult):
733 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
734 "sleep before transmitting bfd packet")
735 self.test_session.send_packet()
737 p = wait_for_bfd_packet(self, timeout=0)
738 self.logger.error(ppp("Received unexpected packet:", p))
739 except CaptureTimeoutError:
742 len(self.vapi.collect_events()), 0, "number of bfd events")
743 self.test_session.update(required_min_rx=300000)
744 for dummy in range(3):
745 self.test_session.send_packet()
747 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
749 len(self.vapi.collect_events()), 0, "number of bfd events")
751 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
752 def test_conn_down(self):
753 """ verify session goes down after inactivity """
755 detection_time = self.test_session.detect_mult *\
756 self.vpp_session.required_min_rx / USEC_IN_SEC
757 self.sleep(detection_time, "waiting for BFD session time-out")
758 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
759 verify_event(self, e, expected_state=BFDState.down)
761 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
762 def test_large_required_min_rx(self):
763 """ large remote required min rx interval """
765 p = wait_for_bfd_packet(self)
767 self.test_session.update(required_min_rx=interval)
768 self.test_session.send_packet()
769 time_mark = time.time()
771 # busy wait here, trying to collect a packet or event, vpp is not
772 # allowed to send packets and the session will timeout first - so the
773 # Up->Down event must arrive before any packets do
774 while time.time() < time_mark + interval / USEC_IN_SEC:
776 p = wait_for_bfd_packet(self, timeout=0)
777 # if vpp managed to send a packet before we did the session
778 # session update, then that's fine, ignore it
779 if p.time < time_mark - self.vpp_clock_offset:
781 self.logger.error(ppp("Received unexpected packet:", p))
783 except CaptureTimeoutError:
785 events = self.vapi.collect_events()
787 verify_event(self, events[0], BFDState.down)
789 self.assert_equal(count, 0, "number of packets received")
791 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
792 def test_immediate_remote_min_rx_reduction(self):
793 """ immediately honor remote required min rx reduction """
794 self.vpp_session.remove_vpp_config()
795 self.vpp_session = VppBFDUDPSession(
796 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
797 self.pg0.enable_capture()
798 self.vpp_session.add_vpp_config()
799 self.test_session.update(desired_min_tx=1000000,
800 required_min_rx=1000000)
802 reference_packet = wait_for_bfd_packet(self)
803 time_mark = time.time()
805 self.test_session.update(required_min_rx=interval)
806 self.test_session.send_packet()
807 extra_time = time.time() - time_mark
808 p = wait_for_bfd_packet(self)
809 # first packet is allowed to be late by time we spent doing the update
810 # calculated in extra_time
811 self.assert_in_range(p.time - reference_packet.time,
812 .95 * 0.75 * interval / USEC_IN_SEC,
813 1.05 * interval / USEC_IN_SEC + extra_time,
814 "time between BFD packets")
816 for dummy in range(3):
817 p = wait_for_bfd_packet(self)
818 diff = p.time - reference_packet.time
819 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
820 1.05 * interval / USEC_IN_SEC,
821 "time between BFD packets")
824 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
825 def test_modify_req_min_rx_double(self):
826 """ modify session - double required min rx """
828 p = wait_for_bfd_packet(self)
829 self.test_session.update(desired_min_tx=10000,
830 required_min_rx=10000)
831 self.test_session.send_packet()
832 # double required min rx
833 self.vpp_session.modify_parameters(
834 required_min_rx=2 * self.vpp_session.required_min_rx)
835 p = wait_for_bfd_packet(
836 self, pcap_time_min=time.time() - self.vpp_clock_offset)
837 # poll bit needs to be set
838 self.assertIn("P", p.sprintf("%BFD.flags%"),
839 "Poll bit not set in BFD packet")
840 # finish poll sequence with final packet
841 final = self.test_session.create_packet()
842 final[BFD].flags = "F"
843 timeout = self.test_session.detect_mult * \
844 max(self.test_session.desired_min_tx,
845 self.vpp_session.required_min_rx) / USEC_IN_SEC
846 self.test_session.send_packet(final)
847 time_mark = time.time()
848 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
849 verify_event(self, e, expected_state=BFDState.down)
850 time_to_event = time.time() - time_mark
851 self.assert_in_range(time_to_event, .9 * timeout,
852 1.1 * timeout, "session timeout")
854 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
855 def test_modify_req_min_rx_halve(self):
856 """ modify session - halve required min rx """
857 self.vpp_session.modify_parameters(
858 required_min_rx=2 * self.vpp_session.required_min_rx)
860 p = wait_for_bfd_packet(self)
861 self.test_session.update(desired_min_tx=10000,
862 required_min_rx=10000)
863 self.test_session.send_packet()
864 p = wait_for_bfd_packet(
865 self, pcap_time_min=time.time() - self.vpp_clock_offset)
866 # halve required min rx
867 old_required_min_rx = self.vpp_session.required_min_rx
868 self.vpp_session.modify_parameters(
869 required_min_rx=0.5 * self.vpp_session.required_min_rx)
870 # now we wait 0.8*3*old-req-min-rx and the session should still be up
871 self.sleep(0.8 * self.vpp_session.detect_mult *
872 old_required_min_rx / USEC_IN_SEC,
873 "wait before finishing poll sequence")
874 self.assert_equal(len(self.vapi.collect_events()), 0,
875 "number of bfd events")
876 p = wait_for_bfd_packet(self)
877 # poll bit needs to be set
878 self.assertIn("P", p.sprintf("%BFD.flags%"),
879 "Poll bit not set in BFD packet")
880 # finish poll sequence with final packet
881 final = self.test_session.create_packet()
882 final[BFD].flags = "F"
883 self.test_session.send_packet(final)
884 # now the session should time out under new conditions
885 detection_time = self.test_session.detect_mult *\
886 self.vpp_session.required_min_rx / USEC_IN_SEC
888 e = self.vapi.wait_for_event(
889 2 * detection_time, "bfd_udp_session_details")
891 self.assert_in_range(after - before,
892 0.9 * detection_time,
893 1.1 * detection_time,
894 "time before bfd session goes down")
895 verify_event(self, e, expected_state=BFDState.down)
897 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
898 def test_modify_detect_mult(self):
899 """ modify detect multiplier """
901 p = wait_for_bfd_packet(self)
902 self.vpp_session.modify_parameters(detect_mult=1)
903 p = wait_for_bfd_packet(
904 self, pcap_time_min=time.time() - self.vpp_clock_offset)
905 self.assert_equal(self.vpp_session.detect_mult,
908 # poll bit must not be set
909 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
910 "Poll bit not set in BFD packet")
911 self.vpp_session.modify_parameters(detect_mult=10)
912 p = wait_for_bfd_packet(
913 self, pcap_time_min=time.time() - self.vpp_clock_offset)
914 self.assert_equal(self.vpp_session.detect_mult,
917 # poll bit must not be set
918 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
919 "Poll bit not set in BFD packet")
921 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
922 def test_queued_poll(self):
923 """ test poll sequence queueing """
925 p = wait_for_bfd_packet(self)
926 self.vpp_session.modify_parameters(
927 required_min_rx=2 * self.vpp_session.required_min_rx)
928 p = wait_for_bfd_packet(self)
929 poll_sequence_start = time.time()
930 poll_sequence_length_min = 0.5
931 send_final_after = time.time() + poll_sequence_length_min
932 # poll bit needs to be set
933 self.assertIn("P", p.sprintf("%BFD.flags%"),
934 "Poll bit not set in BFD packet")
935 self.assert_equal(p[BFD].required_min_rx_interval,
936 self.vpp_session.required_min_rx,
937 "BFD required min rx interval")
938 self.vpp_session.modify_parameters(
939 required_min_rx=2 * self.vpp_session.required_min_rx)
940 # 2nd poll sequence should be queued now
941 # don't send the reply back yet, wait for some time to emulate
942 # longer round-trip time
944 while time.time() < send_final_after:
945 self.test_session.send_packet()
946 p = wait_for_bfd_packet(self)
947 self.assert_equal(len(self.vapi.collect_events()), 0,
948 "number of bfd events")
949 self.assert_equal(p[BFD].required_min_rx_interval,
950 self.vpp_session.required_min_rx,
951 "BFD required min rx interval")
953 # poll bit must be set
954 self.assertIn("P", p.sprintf("%BFD.flags%"),
955 "Poll bit not set in BFD packet")
956 final = self.test_session.create_packet()
957 final[BFD].flags = "F"
958 self.test_session.send_packet(final)
959 # finish 1st with final
960 poll_sequence_length = time.time() - poll_sequence_start
961 # vpp must wait for some time before starting new poll sequence
962 poll_no_2_started = False
963 for dummy in range(2 * packet_count):
964 p = wait_for_bfd_packet(self)
965 self.assert_equal(len(self.vapi.collect_events()), 0,
966 "number of bfd events")
967 if "P" in p.sprintf("%BFD.flags%"):
968 poll_no_2_started = True
969 if time.time() < poll_sequence_start + poll_sequence_length:
970 raise Exception("VPP started 2nd poll sequence too soon")
971 final = self.test_session.create_packet()
972 final[BFD].flags = "F"
973 self.test_session.send_packet(final)
976 self.test_session.send_packet()
977 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
978 # finish 2nd with final
979 final = self.test_session.create_packet()
980 final[BFD].flags = "F"
981 self.test_session.send_packet(final)
982 p = wait_for_bfd_packet(self)
983 # poll bit must not be set
984 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
985 "Poll bit set in BFD packet")
987 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
988 def test_poll_response(self):
989 """ test correct response to control frame with poll bit set """
991 poll = self.test_session.create_packet()
992 poll[BFD].flags = "P"
993 self.test_session.send_packet(poll)
994 final = wait_for_bfd_packet(
995 self, pcap_time_min=time.time() - self.vpp_clock_offset)
996 self.assertIn("F", final.sprintf("%BFD.flags%"))
998 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
999 def test_no_periodic_if_remote_demand(self):
1000 """ no periodic frames outside poll sequence if remote demand set """
1001 bfd_session_up(self)
1002 demand = self.test_session.create_packet()
1003 demand[BFD].flags = "D"
1004 self.test_session.send_packet(demand)
1005 transmit_time = 0.9 \
1006 * max(self.vpp_session.required_min_rx,
1007 self.test_session.desired_min_tx) \
1010 for dummy in range(self.test_session.detect_mult * 2):
1011 time.sleep(transmit_time)
1012 self.test_session.send_packet(demand)
1014 p = wait_for_bfd_packet(self, timeout=0)
1015 self.logger.error(ppp("Received unexpected packet:", p))
1017 except CaptureTimeoutError:
1019 events = self.vapi.collect_events()
1021 self.logger.error("Received unexpected event: %s", e)
1022 self.assert_equal(count, 0, "number of packets received")
1023 self.assert_equal(len(events), 0, "number of events received")
1025 def test_echo_looped_back(self):
1026 """ echo packets looped back """
1027 # don't need a session in this case..
1028 self.vpp_session.remove_vpp_config()
1029 self.pg0.enable_capture()
1030 echo_packet_count = 10
1031 # random source port low enough to increment a few times..
1032 udp_sport_tx = randint(1, 50000)
1033 udp_sport_rx = udp_sport_tx
1034 echo_packet = (Ether(src=self.pg0.remote_mac,
1035 dst=self.pg0.local_mac) /
1036 IP(src=self.pg0.remote_ip4,
1037 dst=self.pg0.remote_ip4) /
1038 UDP(dport=BFD.udp_dport_echo) /
1039 Raw("this should be looped back"))
1040 for dummy in range(echo_packet_count):
1041 self.sleep(.01, "delay between echo packets")
1042 echo_packet[UDP].sport = udp_sport_tx
1044 self.logger.debug(ppp("Sending packet:", echo_packet))
1045 self.pg0.add_stream(echo_packet)
1047 for dummy in range(echo_packet_count):
1048 p = self.pg0.wait_for_packet(1)
1049 self.logger.debug(ppp("Got packet:", p))
1051 self.assert_equal(self.pg0.remote_mac,
1052 ether.dst, "Destination MAC")
1053 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1055 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1056 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1058 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1059 "UDP destination port")
1060 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1062 # need to compare the hex payload here, otherwise BFD_vpp_echo
1064 self.assertEqual(str(p[UDP].payload),
1065 str(echo_packet[UDP].payload),
1066 "Received packet is not the echo packet sent")
1067 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1068 "ECHO packet identifier for test purposes)")
1070 def test_echo(self):
1071 """ echo function """
1072 bfd_session_up(self)
1073 self.test_session.update(required_min_echo_rx=150000)
1074 self.test_session.send_packet()
1075 detection_time = self.test_session.detect_mult *\
1076 self.vpp_session.required_min_rx / USEC_IN_SEC
1077 # echo shouldn't work without echo source set
1078 for dummy in range(10):
1079 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1080 self.sleep(sleep, "delay before sending bfd packet")
1081 self.test_session.send_packet()
1082 p = wait_for_bfd_packet(
1083 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1084 self.assert_equal(p[BFD].required_min_rx_interval,
1085 self.vpp_session.required_min_rx,
1086 "BFD required min rx interval")
1087 self.test_session.send_packet()
1088 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1090 # should be turned on - loopback echo packets
1091 for dummy in range(3):
1092 loop_until = time.time() + 0.75 * detection_time
1093 while time.time() < loop_until:
1094 p = self.pg0.wait_for_packet(1)
1095 self.logger.debug(ppp("Got packet:", p))
1096 if p[UDP].dport == BFD.udp_dport_echo:
1098 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1099 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1100 "BFD ECHO src IP equal to loopback IP")
1101 self.logger.debug(ppp("Looping back packet:", p))
1102 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1103 "ECHO packet destination MAC address")
1104 p[Ether].dst = self.pg0.local_mac
1105 self.pg0.add_stream(p)
1108 elif p.haslayer(BFD):
1110 self.assertGreaterEqual(
1111 p[BFD].required_min_rx_interval,
1113 if "P" in p.sprintf("%BFD.flags%"):
1114 final = self.test_session.create_packet()
1115 final[BFD].flags = "F"
1116 self.test_session.send_packet(final)
1118 raise Exception(ppp("Received unknown packet:", p))
1120 self.assert_equal(len(self.vapi.collect_events()), 0,
1121 "number of bfd events")
1122 self.test_session.send_packet()
1123 self.assertTrue(echo_seen, "No echo packets received")
1125 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1126 def test_echo_fail(self):
1127 """ session goes down if echo function fails """
1128 bfd_session_up(self)
1129 self.test_session.update(required_min_echo_rx=150000)
1130 self.test_session.send_packet()
1131 detection_time = self.test_session.detect_mult *\
1132 self.vpp_session.required_min_rx / USEC_IN_SEC
1133 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1134 # echo function should be used now, but we will drop the echo packets
1135 verified_diag = False
1136 for dummy in range(3):
1137 loop_until = time.time() + 0.75 * detection_time
1138 while time.time() < loop_until:
1139 p = self.pg0.wait_for_packet(1)
1140 self.logger.debug(ppp("Got packet:", p))
1141 if p[UDP].dport == BFD.udp_dport_echo:
1144 elif p.haslayer(BFD):
1145 if "P" in p.sprintf("%BFD.flags%"):
1146 self.assertGreaterEqual(
1147 p[BFD].required_min_rx_interval,
1149 final = self.test_session.create_packet()
1150 final[BFD].flags = "F"
1151 self.test_session.send_packet(final)
1152 if p[BFD].state == BFDState.down:
1153 self.assert_equal(p[BFD].diag,
1154 BFDDiagCode.echo_function_failed,
1156 verified_diag = True
1158 raise Exception(ppp("Received unknown packet:", p))
1159 self.test_session.send_packet()
1160 events = self.vapi.collect_events()
1161 self.assert_equal(len(events), 1, "number of bfd events")
1162 self.assert_equal(events[0].state, BFDState.down, BFDState)
1163 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1165 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1166 def test_echo_stop(self):
1167 """ echo function stops if peer sets required min echo rx zero """
1168 bfd_session_up(self)
1169 self.test_session.update(required_min_echo_rx=150000)
1170 self.test_session.send_packet()
1171 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1172 # wait for first echo packet
1174 p = self.pg0.wait_for_packet(1)
1175 self.logger.debug(ppp("Got packet:", p))
1176 if p[UDP].dport == BFD.udp_dport_echo:
1177 self.logger.debug(ppp("Looping back packet:", p))
1178 p[Ether].dst = self.pg0.local_mac
1179 self.pg0.add_stream(p)
1182 elif p.haslayer(BFD):
1186 raise Exception(ppp("Received unknown packet:", p))
1187 self.test_session.update(required_min_echo_rx=0)
1188 self.test_session.send_packet()
1189 # echo packets shouldn't arrive anymore
1190 for dummy in range(5):
1191 wait_for_bfd_packet(
1192 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1193 self.test_session.send_packet()
1194 events = self.vapi.collect_events()
1195 self.assert_equal(len(events), 0, "number of bfd events")
1197 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1198 def test_echo_source_removed(self):
1199 """ echo function stops if echo source is removed """
1200 bfd_session_up(self)
1201 self.test_session.update(required_min_echo_rx=150000)
1202 self.test_session.send_packet()
1203 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1204 # wait for first echo packet
1206 p = self.pg0.wait_for_packet(1)
1207 self.logger.debug(ppp("Got packet:", p))
1208 if p[UDP].dport == BFD.udp_dport_echo:
1209 self.logger.debug(ppp("Looping back packet:", p))
1210 p[Ether].dst = self.pg0.local_mac
1211 self.pg0.add_stream(p)
1214 elif p.haslayer(BFD):
1218 raise Exception(ppp("Received unknown packet:", p))
1219 self.vapi.bfd_udp_del_echo_source()
1220 self.test_session.send_packet()
1221 # echo packets shouldn't arrive anymore
1222 for dummy in range(5):
1223 wait_for_bfd_packet(
1224 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1225 self.test_session.send_packet()
1226 events = self.vapi.collect_events()
1227 self.assert_equal(len(events), 0, "number of bfd events")
1229 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1230 def test_stale_echo(self):
1231 """ stale echo packets don't keep a session up """
1232 bfd_session_up(self)
1233 self.test_session.update(required_min_echo_rx=150000)
1234 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1235 self.test_session.send_packet()
1236 # should be turned on - loopback echo packets
1240 for dummy in range(10 * self.vpp_session.detect_mult):
1241 p = self.pg0.wait_for_packet(1)
1242 if p[UDP].dport == BFD.udp_dport_echo:
1243 if echo_packet is None:
1244 self.logger.debug(ppp("Got first echo packet:", p))
1246 timeout_at = time.time() + self.vpp_session.detect_mult * \
1247 self.test_session.required_min_echo_rx / USEC_IN_SEC
1249 self.logger.debug(ppp("Got followup echo packet:", p))
1250 self.logger.debug(ppp("Looping back first echo packet:", p))
1251 echo_packet[Ether].dst = self.pg0.local_mac
1252 self.pg0.add_stream(echo_packet)
1254 elif p.haslayer(BFD):
1255 self.logger.debug(ppp("Got packet:", p))
1256 if "P" in p.sprintf("%BFD.flags%"):
1257 final = self.test_session.create_packet()
1258 final[BFD].flags = "F"
1259 self.test_session.send_packet(final)
1260 if p[BFD].state == BFDState.down:
1261 self.assertIsNotNone(
1263 "Session went down before first echo packet received")
1265 self.assertGreaterEqual(
1267 "Session timeout at %s, but is expected at %s" %
1269 self.assert_equal(p[BFD].diag,
1270 BFDDiagCode.echo_function_failed,
1272 events = self.vapi.collect_events()
1273 self.assert_equal(len(events), 1, "number of bfd events")
1274 self.assert_equal(events[0].state, BFDState.down, BFDState)
1278 raise Exception(ppp("Received unknown packet:", p))
1279 self.test_session.send_packet()
1280 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1282 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1283 def test_invalid_echo_checksum(self):
1284 """ echo packets with invalid checksum don't keep a session up """
1285 bfd_session_up(self)
1286 self.test_session.update(required_min_echo_rx=150000)
1287 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1288 self.test_session.send_packet()
1289 # should be turned on - loopback echo packets
1292 for dummy in range(10 * self.vpp_session.detect_mult):
1293 p = self.pg0.wait_for_packet(1)
1294 if p[UDP].dport == BFD.udp_dport_echo:
1295 self.logger.debug(ppp("Got echo packet:", p))
1296 if timeout_at is None:
1297 timeout_at = time.time() + self.vpp_session.detect_mult * \
1298 self.test_session.required_min_echo_rx / USEC_IN_SEC
1299 p[BFD_vpp_echo].checksum = getrandbits(64)
1300 p[Ether].dst = self.pg0.local_mac
1301 self.logger.debug(ppp("Looping back modified echo packet:", p))
1302 self.pg0.add_stream(p)
1304 elif p.haslayer(BFD):
1305 self.logger.debug(ppp("Got packet:", p))
1306 if "P" in p.sprintf("%BFD.flags%"):
1307 final = self.test_session.create_packet()
1308 final[BFD].flags = "F"
1309 self.test_session.send_packet(final)
1310 if p[BFD].state == BFDState.down:
1311 self.assertIsNotNone(
1313 "Session went down before first echo packet received")
1315 self.assertGreaterEqual(
1317 "Session timeout at %s, but is expected at %s" %
1319 self.assert_equal(p[BFD].diag,
1320 BFDDiagCode.echo_function_failed,
1322 events = self.vapi.collect_events()
1323 self.assert_equal(len(events), 1, "number of bfd events")
1324 self.assert_equal(events[0].state, BFDState.down, BFDState)
1328 raise Exception(ppp("Received unknown packet:", p))
1329 self.test_session.send_packet()
1330 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1332 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1333 def test_admin_up_down(self):
1334 """ put session admin-up and admin-down """
1335 bfd_session_up(self)
1336 self.vpp_session.admin_down()
1337 self.pg0.enable_capture()
1338 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1339 verify_event(self, e, expected_state=BFDState.admin_down)
1340 for dummy in range(2):
1341 p = wait_for_bfd_packet(self)
1342 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1343 # try to bring session up - shouldn't be possible
1344 self.test_session.update(state=BFDState.init)
1345 self.test_session.send_packet()
1346 for dummy in range(2):
1347 p = wait_for_bfd_packet(self)
1348 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1349 self.vpp_session.admin_up()
1350 self.test_session.update(state=BFDState.down)
1351 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1352 verify_event(self, e, expected_state=BFDState.down)
1353 p = wait_for_bfd_packet(
1354 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1355 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1356 self.test_session.send_packet()
1357 p = wait_for_bfd_packet(
1358 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1359 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1360 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1361 verify_event(self, e, expected_state=BFDState.init)
1362 self.test_session.update(state=BFDState.up)
1363 self.test_session.send_packet()
1364 p = wait_for_bfd_packet(
1365 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1366 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1367 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1368 verify_event(self, e, expected_state=BFDState.up)
1370 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1371 def test_config_change_remote_demand(self):
1372 """ configuration change while peer in demand mode """
1373 bfd_session_up(self)
1374 demand = self.test_session.create_packet()
1375 demand[BFD].flags = "D"
1376 self.test_session.send_packet(demand)
1377 self.vpp_session.modify_parameters(
1378 required_min_rx=2 * self.vpp_session.required_min_rx)
1379 p = wait_for_bfd_packet(
1380 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1381 # poll bit must be set
1382 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1383 # terminate poll sequence
1384 final = self.test_session.create_packet()
1385 final[BFD].flags = "D+F"
1386 self.test_session.send_packet(final)
1387 # vpp should be quiet now again
1388 transmit_time = 0.9 \
1389 * max(self.vpp_session.required_min_rx,
1390 self.test_session.desired_min_tx) \
1393 for dummy in range(self.test_session.detect_mult * 2):
1394 time.sleep(transmit_time)
1395 self.test_session.send_packet(demand)
1397 p = wait_for_bfd_packet(self, timeout=0)
1398 self.logger.error(ppp("Received unexpected packet:", p))
1400 except CaptureTimeoutError:
1402 events = self.vapi.collect_events()
1404 self.logger.error("Received unexpected event: %s", e)
1405 self.assert_equal(count, 0, "number of packets received")
1406 self.assert_equal(len(events), 0, "number of events received")
1408 def test_intf_deleted(self):
1409 """ interface with bfd session deleted """
1410 intf = VppLoInterface(self, 0)
1413 sw_if_index = intf.sw_if_index
1414 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1415 vpp_session.add_vpp_config()
1416 vpp_session.admin_up()
1417 intf.remove_vpp_config()
1418 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1419 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1420 self.assertFalse(vpp_session.query_vpp_config())
1423 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1424 class BFD6TestCase(VppTestCase):
1425 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1428 vpp_clock_offset = None
1433 def setUpClass(cls):
1434 super(BFD6TestCase, cls).setUpClass()
1435 cls.vapi.cli("set log class bfd level debug")
1437 cls.create_pg_interfaces([0])
1438 cls.pg0.config_ip6()
1439 cls.pg0.configure_ipv6_neighbors()
1441 cls.pg0.resolve_ndp()
1442 cls.create_loopback_interfaces([0])
1443 cls.loopback0 = cls.lo_interfaces[0]
1444 cls.loopback0.config_ip6()
1445 cls.loopback0.admin_up()
1448 super(BFD6TestCase, cls).tearDownClass()
1452 super(BFD6TestCase, self).setUp()
1453 self.factory = AuthKeyFactory()
1454 self.vapi.want_bfd_events()
1455 self.pg0.enable_capture()
1457 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1458 self.pg0.remote_ip6,
1460 self.vpp_session.add_vpp_config()
1461 self.vpp_session.admin_up()
1462 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1463 self.logger.debug(self.vapi.cli("show adj nbr"))
1465 self.vapi.want_bfd_events(enable_disable=0)
1469 if not self.vpp_dead:
1470 self.vapi.want_bfd_events(enable_disable=0)
1471 self.vapi.collect_events() # clear the event queue
1472 super(BFD6TestCase, self).tearDown()
1474 def test_session_up(self):
1475 """ bring BFD session up """
1476 bfd_session_up(self)
1478 def test_session_up_by_ip(self):
1479 """ bring BFD session up - first frame looked up by address pair """
1480 self.logger.info("BFD: Sending Slow control frame")
1481 self.test_session.update(my_discriminator=randint(0, 40000000))
1482 self.test_session.send_packet()
1483 self.pg0.enable_capture()
1484 p = self.pg0.wait_for_packet(1)
1485 self.assert_equal(p[BFD].your_discriminator,
1486 self.test_session.my_discriminator,
1487 "BFD - your discriminator")
1488 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1489 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1491 self.logger.info("BFD: Waiting for event")
1492 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1493 verify_event(self, e, expected_state=BFDState.init)
1494 self.logger.info("BFD: Sending Up")
1495 self.test_session.send_packet()
1496 self.logger.info("BFD: Waiting for event")
1497 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1498 verify_event(self, e, expected_state=BFDState.up)
1499 self.logger.info("BFD: Session is Up")
1500 self.test_session.update(state=BFDState.up)
1501 self.test_session.send_packet()
1502 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1504 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1505 def test_hold_up(self):
1506 """ hold BFD session up """
1507 bfd_session_up(self)
1508 for dummy in range(self.test_session.detect_mult * 2):
1509 wait_for_bfd_packet(self)
1510 self.test_session.send_packet()
1511 self.assert_equal(len(self.vapi.collect_events()), 0,
1512 "number of bfd events")
1513 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1515 def test_echo_looped_back(self):
1516 """ echo packets looped back """
1517 # don't need a session in this case..
1518 self.vpp_session.remove_vpp_config()
1519 self.pg0.enable_capture()
1520 echo_packet_count = 10
1521 # random source port low enough to increment a few times..
1522 udp_sport_tx = randint(1, 50000)
1523 udp_sport_rx = udp_sport_tx
1524 echo_packet = (Ether(src=self.pg0.remote_mac,
1525 dst=self.pg0.local_mac) /
1526 IPv6(src=self.pg0.remote_ip6,
1527 dst=self.pg0.remote_ip6) /
1528 UDP(dport=BFD.udp_dport_echo) /
1529 Raw("this should be looped back"))
1530 for dummy in range(echo_packet_count):
1531 self.sleep(.01, "delay between echo packets")
1532 echo_packet[UDP].sport = udp_sport_tx
1534 self.logger.debug(ppp("Sending packet:", echo_packet))
1535 self.pg0.add_stream(echo_packet)
1537 for dummy in range(echo_packet_count):
1538 p = self.pg0.wait_for_packet(1)
1539 self.logger.debug(ppp("Got packet:", p))
1541 self.assert_equal(self.pg0.remote_mac,
1542 ether.dst, "Destination MAC")
1543 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1545 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1546 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1548 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1549 "UDP destination port")
1550 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1552 # need to compare the hex payload here, otherwise BFD_vpp_echo
1554 self.assertEqual(str(p[UDP].payload),
1555 str(echo_packet[UDP].payload),
1556 "Received packet is not the echo packet sent")
1557 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1558 "ECHO packet identifier for test purposes)")
1559 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1560 "ECHO packet identifier for test purposes)")
1562 def test_echo(self):
1563 """ echo function """
1564 bfd_session_up(self)
1565 self.test_session.update(required_min_echo_rx=150000)
1566 self.test_session.send_packet()
1567 detection_time = self.test_session.detect_mult *\
1568 self.vpp_session.required_min_rx / USEC_IN_SEC
1569 # echo shouldn't work without echo source set
1570 for dummy in range(10):
1571 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1572 self.sleep(sleep, "delay before sending bfd packet")
1573 self.test_session.send_packet()
1574 p = wait_for_bfd_packet(
1575 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1576 self.assert_equal(p[BFD].required_min_rx_interval,
1577 self.vpp_session.required_min_rx,
1578 "BFD required min rx interval")
1579 self.test_session.send_packet()
1580 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1582 # should be turned on - loopback echo packets
1583 for dummy in range(3):
1584 loop_until = time.time() + 0.75 * detection_time
1585 while time.time() < loop_until:
1586 p = self.pg0.wait_for_packet(1)
1587 self.logger.debug(ppp("Got packet:", p))
1588 if p[UDP].dport == BFD.udp_dport_echo:
1590 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1591 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1592 "BFD ECHO src IP equal to loopback IP")
1593 self.logger.debug(ppp("Looping back packet:", p))
1594 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1595 "ECHO packet destination MAC address")
1596 p[Ether].dst = self.pg0.local_mac
1597 self.pg0.add_stream(p)
1600 elif p.haslayer(BFD):
1602 self.assertGreaterEqual(
1603 p[BFD].required_min_rx_interval,
1605 if "P" in p.sprintf("%BFD.flags%"):
1606 final = self.test_session.create_packet()
1607 final[BFD].flags = "F"
1608 self.test_session.send_packet(final)
1610 raise Exception(ppp("Received unknown packet:", p))
1612 self.assert_equal(len(self.vapi.collect_events()), 0,
1613 "number of bfd events")
1614 self.test_session.send_packet()
1615 self.assertTrue(echo_seen, "No echo packets received")
1617 def test_intf_deleted(self):
1618 """ interface with bfd session deleted """
1619 intf = VppLoInterface(self, 0)
1622 sw_if_index = intf.sw_if_index
1623 vpp_session = VppBFDUDPSession(
1624 self, intf, intf.remote_ip6, af=AF_INET6)
1625 vpp_session.add_vpp_config()
1626 vpp_session.admin_up()
1627 intf.remove_vpp_config()
1628 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1629 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1630 self.assertFalse(vpp_session.query_vpp_config())
1633 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1634 class BFDFIBTestCase(VppTestCase):
1635 """ BFD-FIB interactions (IPv6) """
1641 super(BFDFIBTestCase, self).setUp()
1642 self.create_pg_interfaces(range(1))
1644 self.vapi.want_bfd_events()
1645 self.pg0.enable_capture()
1647 for i in self.pg_interfaces:
1650 i.configure_ipv6_neighbors()
1653 if not self.vpp_dead:
1654 self.vapi.want_bfd_events(enable_disable=0)
1656 super(BFDFIBTestCase, self).tearDown()
1659 def pkt_is_not_data_traffic(p):
1660 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1661 if p.haslayer(BFD) or is_ipv6_misc(p):
1665 def test_session_with_fib(self):
1666 """ BFD-FIB interactions """
1668 # packets to match against both of the routes
1669 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1670 IPv6(src="3001::1", dst="2001::1") /
1671 UDP(sport=1234, dport=1234) /
1673 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1674 IPv6(src="3001::1", dst="2002::1") /
1675 UDP(sport=1234, dport=1234) /
1678 # A recursive and a non-recursive route via a next-hop that
1679 # will have a BFD session
1680 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1681 [VppRoutePath(self.pg0.remote_ip6,
1682 self.pg0.sw_if_index,
1683 proto=DpoProto.DPO_PROTO_IP6)],
1685 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1686 [VppRoutePath(self.pg0.remote_ip6,
1688 proto=DpoProto.DPO_PROTO_IP6)],
1690 ip_2001_s_64.add_vpp_config()
1691 ip_2002_s_64.add_vpp_config()
1693 # bring the session up now the routes are present
1694 self.vpp_session = VppBFDUDPSession(self,
1696 self.pg0.remote_ip6,
1698 self.vpp_session.add_vpp_config()
1699 self.vpp_session.admin_up()
1700 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1702 # session is up - traffic passes
1703 bfd_session_up(self)
1705 self.pg0.add_stream(p)
1708 captured = self.pg0.wait_for_packet(
1710 filter_out_fn=self.pkt_is_not_data_traffic)
1711 self.assertEqual(captured[IPv6].dst,
1714 # session is up - traffic is dropped
1715 bfd_session_down(self)
1717 self.pg0.add_stream(p)
1719 with self.assertRaises(CaptureTimeoutError):
1720 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1722 # session is up - traffic passes
1723 bfd_session_up(self)
1725 self.pg0.add_stream(p)
1728 captured = self.pg0.wait_for_packet(
1730 filter_out_fn=self.pkt_is_not_data_traffic)
1731 self.assertEqual(captured[IPv6].dst,
1735 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1736 class BFDSHA1TestCase(VppTestCase):
1737 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1740 vpp_clock_offset = None
1745 def setUpClass(cls):
1746 super(BFDSHA1TestCase, cls).setUpClass()
1747 cls.vapi.cli("set log class bfd level debug")
1749 cls.create_pg_interfaces([0])
1750 cls.pg0.config_ip4()
1752 cls.pg0.resolve_arp()
1755 super(BFDSHA1TestCase, cls).tearDownClass()
1759 super(BFDSHA1TestCase, self).setUp()
1760 self.factory = AuthKeyFactory()
1761 self.vapi.want_bfd_events()
1762 self.pg0.enable_capture()
1765 if not self.vpp_dead:
1766 self.vapi.want_bfd_events(enable_disable=0)
1767 self.vapi.collect_events() # clear the event queue
1768 super(BFDSHA1TestCase, self).tearDown()
1770 def test_session_up(self):
1771 """ bring BFD session up """
1772 key = self.factory.create_random_key(self)
1773 key.add_vpp_config()
1774 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1775 self.pg0.remote_ip4,
1777 self.vpp_session.add_vpp_config()
1778 self.vpp_session.admin_up()
1779 self.test_session = BFDTestSession(
1780 self, self.pg0, AF_INET, sha1_key=key,
1781 bfd_key_id=self.vpp_session.bfd_key_id)
1782 bfd_session_up(self)
1784 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1785 def test_hold_up(self):
1786 """ hold BFD session up """
1787 key = self.factory.create_random_key(self)
1788 key.add_vpp_config()
1789 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1790 self.pg0.remote_ip4,
1792 self.vpp_session.add_vpp_config()
1793 self.vpp_session.admin_up()
1794 self.test_session = BFDTestSession(
1795 self, self.pg0, AF_INET, sha1_key=key,
1796 bfd_key_id=self.vpp_session.bfd_key_id)
1797 bfd_session_up(self)
1798 for dummy in range(self.test_session.detect_mult * 2):
1799 wait_for_bfd_packet(self)
1800 self.test_session.send_packet()
1801 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1803 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1804 def test_hold_up_meticulous(self):
1805 """ hold BFD session up - meticulous auth """
1806 key = self.factory.create_random_key(
1807 self, BFDAuthType.meticulous_keyed_sha1)
1808 key.add_vpp_config()
1809 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1810 self.pg0.remote_ip4, sha1_key=key)
1811 self.vpp_session.add_vpp_config()
1812 self.vpp_session.admin_up()
1813 # specify sequence number so that it wraps
1814 self.test_session = BFDTestSession(
1815 self, self.pg0, AF_INET, sha1_key=key,
1816 bfd_key_id=self.vpp_session.bfd_key_id,
1817 our_seq_number=0xFFFFFFFF - 4)
1818 bfd_session_up(self)
1819 for dummy in range(30):
1820 wait_for_bfd_packet(self)
1821 self.test_session.inc_seq_num()
1822 self.test_session.send_packet()
1823 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1825 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1826 def test_send_bad_seq_number(self):
1827 """ session is not kept alive by msgs with bad sequence numbers"""
1828 key = self.factory.create_random_key(
1829 self, BFDAuthType.meticulous_keyed_sha1)
1830 key.add_vpp_config()
1831 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1832 self.pg0.remote_ip4, sha1_key=key)
1833 self.vpp_session.add_vpp_config()
1834 self.test_session = BFDTestSession(
1835 self, self.pg0, AF_INET, sha1_key=key,
1836 bfd_key_id=self.vpp_session.bfd_key_id)
1837 bfd_session_up(self)
1838 detection_time = self.test_session.detect_mult *\
1839 self.vpp_session.required_min_rx / USEC_IN_SEC
1840 send_until = time.time() + 2 * detection_time
1841 while time.time() < send_until:
1842 self.test_session.send_packet()
1843 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1844 "time between bfd packets")
1845 e = self.vapi.collect_events()
1846 # session should be down now, because the sequence numbers weren't
1848 self.assert_equal(len(e), 1, "number of bfd events")
1849 verify_event(self, e[0], expected_state=BFDState.down)
1851 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1852 legitimate_test_session,
1854 rogue_bfd_values=None):
1855 """ execute a rogue session interaction scenario
1857 1. create vpp session, add config
1858 2. bring the legitimate session up
1859 3. copy the bfd values from legitimate session to rogue session
1860 4. apply rogue_bfd_values to rogue session
1861 5. set rogue session state to down
1862 6. send message to take the session down from the rogue session
1863 7. assert that the legitimate session is unaffected
1866 self.vpp_session = vpp_bfd_udp_session
1867 self.vpp_session.add_vpp_config()
1868 self.test_session = legitimate_test_session
1869 # bring vpp session up
1870 bfd_session_up(self)
1871 # send packet from rogue session
1872 rogue_test_session.update(
1873 my_discriminator=self.test_session.my_discriminator,
1874 your_discriminator=self.test_session.your_discriminator,
1875 desired_min_tx=self.test_session.desired_min_tx,
1876 required_min_rx=self.test_session.required_min_rx,
1877 detect_mult=self.test_session.detect_mult,
1878 diag=self.test_session.diag,
1879 state=self.test_session.state,
1880 auth_type=self.test_session.auth_type)
1881 if rogue_bfd_values:
1882 rogue_test_session.update(**rogue_bfd_values)
1883 rogue_test_session.update(state=BFDState.down)
1884 rogue_test_session.send_packet()
1885 wait_for_bfd_packet(self)
1886 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1888 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1889 def test_mismatch_auth(self):
1890 """ session is not brought down by unauthenticated msg """
1891 key = self.factory.create_random_key(self)
1892 key.add_vpp_config()
1893 vpp_session = VppBFDUDPSession(
1894 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1895 legitimate_test_session = BFDTestSession(
1896 self, self.pg0, AF_INET, sha1_key=key,
1897 bfd_key_id=vpp_session.bfd_key_id)
1898 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1899 self.execute_rogue_session_scenario(vpp_session,
1900 legitimate_test_session,
1903 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1904 def test_mismatch_bfd_key_id(self):
1905 """ session is not brought down by msg with non-existent key-id """
1906 key = self.factory.create_random_key(self)
1907 key.add_vpp_config()
1908 vpp_session = VppBFDUDPSession(
1909 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1910 # pick a different random bfd key id
1912 while x == vpp_session.bfd_key_id:
1914 legitimate_test_session = BFDTestSession(
1915 self, self.pg0, AF_INET, sha1_key=key,
1916 bfd_key_id=vpp_session.bfd_key_id)
1917 rogue_test_session = BFDTestSession(
1918 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1919 self.execute_rogue_session_scenario(vpp_session,
1920 legitimate_test_session,
1923 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1924 def test_mismatched_auth_type(self):
1925 """ session is not brought down by msg with wrong auth type """
1926 key = self.factory.create_random_key(self)
1927 key.add_vpp_config()
1928 vpp_session = VppBFDUDPSession(
1929 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1930 legitimate_test_session = BFDTestSession(
1931 self, self.pg0, AF_INET, sha1_key=key,
1932 bfd_key_id=vpp_session.bfd_key_id)
1933 rogue_test_session = BFDTestSession(
1934 self, self.pg0, AF_INET, sha1_key=key,
1935 bfd_key_id=vpp_session.bfd_key_id)
1936 self.execute_rogue_session_scenario(
1937 vpp_session, legitimate_test_session, rogue_test_session,
1938 {'auth_type': BFDAuthType.keyed_md5})
1940 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1941 def test_restart(self):
1942 """ simulate remote peer restart and resynchronization """
1943 key = self.factory.create_random_key(
1944 self, BFDAuthType.meticulous_keyed_sha1)
1945 key.add_vpp_config()
1946 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1947 self.pg0.remote_ip4, sha1_key=key)
1948 self.vpp_session.add_vpp_config()
1949 self.test_session = BFDTestSession(
1950 self, self.pg0, AF_INET, sha1_key=key,
1951 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1952 bfd_session_up(self)
1953 # don't send any packets for 2*detection_time
1954 detection_time = self.test_session.detect_mult *\
1955 self.vpp_session.required_min_rx / USEC_IN_SEC
1956 self.sleep(2 * detection_time, "simulating peer restart")
1957 events = self.vapi.collect_events()
1958 self.assert_equal(len(events), 1, "number of bfd events")
1959 verify_event(self, events[0], expected_state=BFDState.down)
1960 self.test_session.update(state=BFDState.down)
1961 # reset sequence number
1962 self.test_session.our_seq_number = 0
1963 self.test_session.vpp_seq_number = None
1964 # now throw away any pending packets
1965 self.pg0.enable_capture()
1966 bfd_session_up(self)
1969 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1970 class BFDAuthOnOffTestCase(VppTestCase):
1971 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1978 def setUpClass(cls):
1979 super(BFDAuthOnOffTestCase, cls).setUpClass()
1980 cls.vapi.cli("set log class bfd level debug")
1982 cls.create_pg_interfaces([0])
1983 cls.pg0.config_ip4()
1985 cls.pg0.resolve_arp()
1988 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1992 super(BFDAuthOnOffTestCase, self).setUp()
1993 self.factory = AuthKeyFactory()
1994 self.vapi.want_bfd_events()
1995 self.pg0.enable_capture()
1998 if not self.vpp_dead:
1999 self.vapi.want_bfd_events(enable_disable=0)
2000 self.vapi.collect_events() # clear the event queue
2001 super(BFDAuthOnOffTestCase, self).tearDown()
2003 def test_auth_on_immediate(self):
2004 """ turn auth on without disturbing session state (immediate) """
2005 key = self.factory.create_random_key(self)
2006 key.add_vpp_config()
2007 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2008 self.pg0.remote_ip4)
2009 self.vpp_session.add_vpp_config()
2010 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2011 bfd_session_up(self)
2012 for dummy in range(self.test_session.detect_mult * 2):
2013 p = wait_for_bfd_packet(self)
2014 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2015 self.test_session.send_packet()
2016 self.vpp_session.activate_auth(key)
2017 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2018 self.test_session.sha1_key = key
2019 for dummy in range(self.test_session.detect_mult * 2):
2020 p = wait_for_bfd_packet(self)
2021 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2022 self.test_session.send_packet()
2023 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2024 self.assert_equal(len(self.vapi.collect_events()), 0,
2025 "number of bfd events")
2027 def test_auth_off_immediate(self):
2028 """ turn auth off without disturbing session state (immediate) """
2029 key = self.factory.create_random_key(self)
2030 key.add_vpp_config()
2031 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2032 self.pg0.remote_ip4, sha1_key=key)
2033 self.vpp_session.add_vpp_config()
2034 self.test_session = BFDTestSession(
2035 self, self.pg0, AF_INET, sha1_key=key,
2036 bfd_key_id=self.vpp_session.bfd_key_id)
2037 bfd_session_up(self)
2038 # self.vapi.want_bfd_events(enable_disable=0)
2039 for dummy in range(self.test_session.detect_mult * 2):
2040 p = wait_for_bfd_packet(self)
2041 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2042 self.test_session.inc_seq_num()
2043 self.test_session.send_packet()
2044 self.vpp_session.deactivate_auth()
2045 self.test_session.bfd_key_id = None
2046 self.test_session.sha1_key = None
2047 for dummy in range(self.test_session.detect_mult * 2):
2048 p = wait_for_bfd_packet(self)
2049 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2050 self.test_session.inc_seq_num()
2051 self.test_session.send_packet()
2052 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2053 self.assert_equal(len(self.vapi.collect_events()), 0,
2054 "number of bfd events")
2056 def test_auth_change_key_immediate(self):
2057 """ change auth key without disturbing session state (immediate) """
2058 key1 = self.factory.create_random_key(self)
2059 key1.add_vpp_config()
2060 key2 = self.factory.create_random_key(self)
2061 key2.add_vpp_config()
2062 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2063 self.pg0.remote_ip4, sha1_key=key1)
2064 self.vpp_session.add_vpp_config()
2065 self.test_session = BFDTestSession(
2066 self, self.pg0, AF_INET, sha1_key=key1,
2067 bfd_key_id=self.vpp_session.bfd_key_id)
2068 bfd_session_up(self)
2069 for dummy in range(self.test_session.detect_mult * 2):
2070 p = wait_for_bfd_packet(self)
2071 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2072 self.test_session.send_packet()
2073 self.vpp_session.activate_auth(key2)
2074 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2075 self.test_session.sha1_key = key2
2076 for dummy in range(self.test_session.detect_mult * 2):
2077 p = wait_for_bfd_packet(self)
2078 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2079 self.test_session.send_packet()
2080 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2081 self.assert_equal(len(self.vapi.collect_events()), 0,
2082 "number of bfd events")
2084 def test_auth_on_delayed(self):
2085 """ turn auth on without disturbing session state (delayed) """
2086 key = self.factory.create_random_key(self)
2087 key.add_vpp_config()
2088 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2089 self.pg0.remote_ip4)
2090 self.vpp_session.add_vpp_config()
2091 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2092 bfd_session_up(self)
2093 for dummy in range(self.test_session.detect_mult * 2):
2094 wait_for_bfd_packet(self)
2095 self.test_session.send_packet()
2096 self.vpp_session.activate_auth(key, delayed=True)
2097 for dummy in range(self.test_session.detect_mult * 2):
2098 p = wait_for_bfd_packet(self)
2099 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2100 self.test_session.send_packet()
2101 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2102 self.test_session.sha1_key = key
2103 self.test_session.send_packet()
2104 for dummy in range(self.test_session.detect_mult * 2):
2105 p = wait_for_bfd_packet(self)
2106 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2107 self.test_session.send_packet()
2108 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2109 self.assert_equal(len(self.vapi.collect_events()), 0,
2110 "number of bfd events")
2112 def test_auth_off_delayed(self):
2113 """ turn auth off without disturbing session state (delayed) """
2114 key = self.factory.create_random_key(self)
2115 key.add_vpp_config()
2116 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2117 self.pg0.remote_ip4, sha1_key=key)
2118 self.vpp_session.add_vpp_config()
2119 self.test_session = BFDTestSession(
2120 self, self.pg0, AF_INET, sha1_key=key,
2121 bfd_key_id=self.vpp_session.bfd_key_id)
2122 bfd_session_up(self)
2123 for dummy in range(self.test_session.detect_mult * 2):
2124 p = wait_for_bfd_packet(self)
2125 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2126 self.test_session.send_packet()
2127 self.vpp_session.deactivate_auth(delayed=True)
2128 for dummy in range(self.test_session.detect_mult * 2):
2129 p = wait_for_bfd_packet(self)
2130 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2131 self.test_session.send_packet()
2132 self.test_session.bfd_key_id = None
2133 self.test_session.sha1_key = None
2134 self.test_session.send_packet()
2135 for dummy in range(self.test_session.detect_mult * 2):
2136 p = wait_for_bfd_packet(self)
2137 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2138 self.test_session.send_packet()
2139 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2140 self.assert_equal(len(self.vapi.collect_events()), 0,
2141 "number of bfd events")
2143 def test_auth_change_key_delayed(self):
2144 """ change auth key without disturbing session state (delayed) """
2145 key1 = self.factory.create_random_key(self)
2146 key1.add_vpp_config()
2147 key2 = self.factory.create_random_key(self)
2148 key2.add_vpp_config()
2149 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2150 self.pg0.remote_ip4, sha1_key=key1)
2151 self.vpp_session.add_vpp_config()
2152 self.vpp_session.admin_up()
2153 self.test_session = BFDTestSession(
2154 self, self.pg0, AF_INET, sha1_key=key1,
2155 bfd_key_id=self.vpp_session.bfd_key_id)
2156 bfd_session_up(self)
2157 for dummy in range(self.test_session.detect_mult * 2):
2158 p = wait_for_bfd_packet(self)
2159 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2160 self.test_session.send_packet()
2161 self.vpp_session.activate_auth(key2, delayed=True)
2162 for dummy in range(self.test_session.detect_mult * 2):
2163 p = wait_for_bfd_packet(self)
2164 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2165 self.test_session.send_packet()
2166 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2167 self.test_session.sha1_key = key2
2168 self.test_session.send_packet()
2169 for dummy in range(self.test_session.detect_mult * 2):
2170 p = wait_for_bfd_packet(self)
2171 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2172 self.test_session.send_packet()
2173 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2174 self.assert_equal(len(self.vapi.collect_events()), 0,
2175 "number of bfd events")
2178 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2179 class BFDCLITestCase(VppTestCase):
2180 """Bidirectional Forwarding Detection (BFD) (CLI) """
2184 def setUpClass(cls):
2185 super(BFDCLITestCase, cls).setUpClass()
2186 cls.vapi.cli("set log class bfd level debug")
2188 cls.create_pg_interfaces((0,))
2189 cls.pg0.config_ip4()
2190 cls.pg0.config_ip6()
2191 cls.pg0.resolve_arp()
2192 cls.pg0.resolve_ndp()
2195 super(BFDCLITestCase, cls).tearDownClass()
2199 super(BFDCLITestCase, self).setUp()
2200 self.factory = AuthKeyFactory()
2201 self.pg0.enable_capture()
2205 self.vapi.want_bfd_events(enable_disable=0)
2206 except UnexpectedApiReturnValueError:
2207 # some tests aren't subscribed, so this is not an issue
2209 self.vapi.collect_events() # clear the event queue
2210 super(BFDCLITestCase, self).tearDown()
2212 def cli_verify_no_response(self, cli):
2213 """ execute a CLI, asserting that the response is empty """
2214 self.assert_equal(self.vapi.cli(cli),
2216 "CLI command response")
2218 def cli_verify_response(self, cli, expected):
2219 """ execute a CLI, asserting that the response matches expectation """
2220 self.assert_equal(self.vapi.cli(cli).strip(),
2222 "CLI command response")
2224 def test_show(self):
2225 """ show commands """
2226 k1 = self.factory.create_random_key(self)
2228 k2 = self.factory.create_random_key(
2229 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2231 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2233 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2236 self.logger.info(self.vapi.ppcli("show bfd keys"))
2237 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2238 self.logger.info(self.vapi.ppcli("show bfd"))
2240 def test_set_del_sha1_key(self):
2241 """ set/delete SHA1 auth key """
2242 k = self.factory.create_random_key(self)
2243 self.registry.register(k, self.logger)
2244 self.cli_verify_no_response(
2245 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2247 "".join("{:02x}".format(ord(c)) for c in k.key)))
2248 self.assertTrue(k.query_vpp_config())
2249 self.vpp_session = VppBFDUDPSession(
2250 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2251 self.vpp_session.add_vpp_config()
2252 self.test_session = \
2253 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2254 bfd_key_id=self.vpp_session.bfd_key_id)
2255 self.vapi.want_bfd_events()
2256 bfd_session_up(self)
2257 bfd_session_down(self)
2258 # try to replace the secret for the key - should fail because the key
2260 k2 = self.factory.create_random_key(self)
2261 self.cli_verify_response(
2262 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2264 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2265 "bfd key set: `bfd_auth_set_key' API call failed, "
2266 "rv=-103:BFD object in use")
2267 # manipulating the session using old secret should still work
2268 bfd_session_up(self)
2269 bfd_session_down(self)
2270 self.vpp_session.remove_vpp_config()
2271 self.cli_verify_no_response(
2272 "bfd key del conf-key-id %s" % k.conf_key_id)
2273 self.assertFalse(k.query_vpp_config())
2275 def test_set_del_meticulous_sha1_key(self):
2276 """ set/delete meticulous SHA1 auth key """
2277 k = self.factory.create_random_key(
2278 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2279 self.registry.register(k, self.logger)
2280 self.cli_verify_no_response(
2281 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2283 "".join("{:02x}".format(ord(c)) for c in k.key)))
2284 self.assertTrue(k.query_vpp_config())
2285 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2286 self.pg0.remote_ip6, af=AF_INET6,
2288 self.vpp_session.add_vpp_config()
2289 self.vpp_session.admin_up()
2290 self.test_session = \
2291 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2292 bfd_key_id=self.vpp_session.bfd_key_id)
2293 self.vapi.want_bfd_events()
2294 bfd_session_up(self)
2295 bfd_session_down(self)
2296 # try to replace the secret for the key - should fail because the key
2298 k2 = self.factory.create_random_key(self)
2299 self.cli_verify_response(
2300 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2302 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2303 "bfd key set: `bfd_auth_set_key' API call failed, "
2304 "rv=-103:BFD object in use")
2305 # manipulating the session using old secret should still work
2306 bfd_session_up(self)
2307 bfd_session_down(self)
2308 self.vpp_session.remove_vpp_config()
2309 self.cli_verify_no_response(
2310 "bfd key del conf-key-id %s" % k.conf_key_id)
2311 self.assertFalse(k.query_vpp_config())
2313 def test_add_mod_del_bfd_udp(self):
2314 """ create/modify/delete IPv4 BFD UDP session """
2315 vpp_session = VppBFDUDPSession(
2316 self, self.pg0, self.pg0.remote_ip4)
2317 self.registry.register(vpp_session, self.logger)
2318 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2319 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2320 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2321 self.pg0.remote_ip4,
2322 vpp_session.desired_min_tx,
2323 vpp_session.required_min_rx,
2324 vpp_session.detect_mult)
2325 self.cli_verify_no_response(cli_add_cmd)
2326 # 2nd add should fail
2327 self.cli_verify_response(
2329 "bfd udp session add: `bfd_add_add_session' API call"
2330 " failed, rv=-101:Duplicate BFD object")
2331 verify_bfd_session_config(self, vpp_session)
2332 mod_session = VppBFDUDPSession(
2333 self, self.pg0, self.pg0.remote_ip4,
2334 required_min_rx=2 * vpp_session.required_min_rx,
2335 desired_min_tx=3 * vpp_session.desired_min_tx,
2336 detect_mult=4 * vpp_session.detect_mult)
2337 self.cli_verify_no_response(
2338 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2339 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2340 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2341 mod_session.desired_min_tx, mod_session.required_min_rx,
2342 mod_session.detect_mult))
2343 verify_bfd_session_config(self, mod_session)
2344 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2345 "peer-addr %s" % (self.pg0.name,
2346 self.pg0.local_ip4, self.pg0.remote_ip4)
2347 self.cli_verify_no_response(cli_del_cmd)
2348 # 2nd del is expected to fail
2349 self.cli_verify_response(
2350 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2351 " failed, rv=-102:No such BFD object")
2352 self.assertFalse(vpp_session.query_vpp_config())
2354 def test_add_mod_del_bfd_udp6(self):
2355 """ create/modify/delete IPv6 BFD UDP session """
2356 vpp_session = VppBFDUDPSession(
2357 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2358 self.registry.register(vpp_session, self.logger)
2359 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2360 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2361 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2362 self.pg0.remote_ip6,
2363 vpp_session.desired_min_tx,
2364 vpp_session.required_min_rx,
2365 vpp_session.detect_mult)
2366 self.cli_verify_no_response(cli_add_cmd)
2367 # 2nd add should fail
2368 self.cli_verify_response(
2370 "bfd udp session add: `bfd_add_add_session' API call"
2371 " failed, rv=-101:Duplicate BFD object")
2372 verify_bfd_session_config(self, vpp_session)
2373 mod_session = VppBFDUDPSession(
2374 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2375 required_min_rx=2 * vpp_session.required_min_rx,
2376 desired_min_tx=3 * vpp_session.desired_min_tx,
2377 detect_mult=4 * vpp_session.detect_mult)
2378 self.cli_verify_no_response(
2379 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2380 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2381 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2382 mod_session.desired_min_tx,
2383 mod_session.required_min_rx, mod_session.detect_mult))
2384 verify_bfd_session_config(self, mod_session)
2385 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2386 "peer-addr %s" % (self.pg0.name,
2387 self.pg0.local_ip6, self.pg0.remote_ip6)
2388 self.cli_verify_no_response(cli_del_cmd)
2389 # 2nd del is expected to fail
2390 self.cli_verify_response(
2392 "bfd udp session del: `bfd_udp_del_session' API call"
2393 " failed, rv=-102:No such BFD object")
2394 self.assertFalse(vpp_session.query_vpp_config())
2396 def test_add_mod_del_bfd_udp_auth(self):
2397 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2398 key = self.factory.create_random_key(self)
2399 key.add_vpp_config()
2400 vpp_session = VppBFDUDPSession(
2401 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2402 self.registry.register(vpp_session, self.logger)
2403 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2404 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2405 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2406 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2407 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2408 vpp_session.detect_mult, key.conf_key_id,
2409 vpp_session.bfd_key_id)
2410 self.cli_verify_no_response(cli_add_cmd)
2411 # 2nd add should fail
2412 self.cli_verify_response(
2414 "bfd udp session add: `bfd_add_add_session' API call"
2415 " failed, rv=-101:Duplicate BFD object")
2416 verify_bfd_session_config(self, vpp_session)
2417 mod_session = VppBFDUDPSession(
2418 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2419 bfd_key_id=vpp_session.bfd_key_id,
2420 required_min_rx=2 * vpp_session.required_min_rx,
2421 desired_min_tx=3 * vpp_session.desired_min_tx,
2422 detect_mult=4 * vpp_session.detect_mult)
2423 self.cli_verify_no_response(
2424 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2425 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2426 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2427 mod_session.desired_min_tx,
2428 mod_session.required_min_rx, mod_session.detect_mult))
2429 verify_bfd_session_config(self, mod_session)
2430 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2431 "peer-addr %s" % (self.pg0.name,
2432 self.pg0.local_ip4, self.pg0.remote_ip4)
2433 self.cli_verify_no_response(cli_del_cmd)
2434 # 2nd del is expected to fail
2435 self.cli_verify_response(
2437 "bfd udp session del: `bfd_udp_del_session' API call"
2438 " failed, rv=-102:No such BFD object")
2439 self.assertFalse(vpp_session.query_vpp_config())
2441 def test_add_mod_del_bfd_udp6_auth(self):
2442 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2443 key = self.factory.create_random_key(
2444 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2445 key.add_vpp_config()
2446 vpp_session = VppBFDUDPSession(
2447 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2448 self.registry.register(vpp_session, self.logger)
2449 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2450 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2451 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2452 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2453 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2454 vpp_session.detect_mult, key.conf_key_id,
2455 vpp_session.bfd_key_id)
2456 self.cli_verify_no_response(cli_add_cmd)
2457 # 2nd add should fail
2458 self.cli_verify_response(
2460 "bfd udp session add: `bfd_add_add_session' API call"
2461 " failed, rv=-101:Duplicate BFD object")
2462 verify_bfd_session_config(self, vpp_session)
2463 mod_session = VppBFDUDPSession(
2464 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2465 bfd_key_id=vpp_session.bfd_key_id,
2466 required_min_rx=2 * vpp_session.required_min_rx,
2467 desired_min_tx=3 * vpp_session.desired_min_tx,
2468 detect_mult=4 * vpp_session.detect_mult)
2469 self.cli_verify_no_response(
2470 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2471 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2472 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2473 mod_session.desired_min_tx,
2474 mod_session.required_min_rx, mod_session.detect_mult))
2475 verify_bfd_session_config(self, mod_session)
2476 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2477 "peer-addr %s" % (self.pg0.name,
2478 self.pg0.local_ip6, self.pg0.remote_ip6)
2479 self.cli_verify_no_response(cli_del_cmd)
2480 # 2nd del is expected to fail
2481 self.cli_verify_response(
2483 "bfd udp session del: `bfd_udp_del_session' API call"
2484 " failed, rv=-102:No such BFD object")
2485 self.assertFalse(vpp_session.query_vpp_config())
2487 def test_auth_on_off(self):
2488 """ turn authentication on and off """
2489 key = self.factory.create_random_key(
2490 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2491 key.add_vpp_config()
2492 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2493 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2495 session.add_vpp_config()
2497 "bfd udp session auth activate interface %s local-addr %s "\
2498 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2499 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2500 key.conf_key_id, auth_session.bfd_key_id)
2501 self.cli_verify_no_response(cli_activate)
2502 verify_bfd_session_config(self, auth_session)
2503 self.cli_verify_no_response(cli_activate)
2504 verify_bfd_session_config(self, auth_session)
2506 "bfd udp session auth deactivate interface %s local-addr %s "\
2508 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2509 self.cli_verify_no_response(cli_deactivate)
2510 verify_bfd_session_config(self, session)
2511 self.cli_verify_no_response(cli_deactivate)
2512 verify_bfd_session_config(self, session)
2514 def test_auth_on_off_delayed(self):
2515 """ turn authentication on and off (delayed) """
2516 key = self.factory.create_random_key(
2517 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2518 key.add_vpp_config()
2519 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2520 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2522 session.add_vpp_config()
2524 "bfd udp session auth activate interface %s local-addr %s "\
2525 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2526 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2527 key.conf_key_id, auth_session.bfd_key_id)
2528 self.cli_verify_no_response(cli_activate)
2529 verify_bfd_session_config(self, auth_session)
2530 self.cli_verify_no_response(cli_activate)
2531 verify_bfd_session_config(self, auth_session)
2533 "bfd udp session auth deactivate interface %s local-addr %s "\
2534 "peer-addr %s delayed yes"\
2535 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2536 self.cli_verify_no_response(cli_deactivate)
2537 verify_bfd_session_config(self, session)
2538 self.cli_verify_no_response(cli_deactivate)
2539 verify_bfd_session_config(self, session)
2541 def test_admin_up_down(self):
2542 """ put session admin-up and admin-down """
2543 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2544 session.add_vpp_config()
2546 "bfd udp session set-flags admin down interface %s local-addr %s "\
2548 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2550 "bfd udp session set-flags admin up interface %s local-addr %s "\
2552 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2553 self.cli_verify_no_response(cli_down)
2554 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2555 self.cli_verify_no_response(cli_up)
2556 verify_bfd_session_config(self, session, state=BFDState.down)
2558 def test_set_del_udp_echo_source(self):
2559 """ set/del udp echo source """
2560 self.create_loopback_interfaces([0])
2561 self.loopback0 = self.lo_interfaces[0]
2562 self.loopback0.admin_up()
2563 self.cli_verify_response("show bfd echo-source",
2564 "UDP echo source is not set.")
2565 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2566 self.cli_verify_no_response(cli_set)
2567 self.cli_verify_response("show bfd echo-source",
2568 "UDP echo source is: %s\n"
2569 "IPv4 address usable as echo source: none\n"
2570 "IPv6 address usable as echo source: none" %
2571 self.loopback0.name)
2572 self.loopback0.config_ip4()
2573 unpacked = unpack("!L", self.loopback0.local_ip4n)
2574 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2575 self.cli_verify_response("show bfd echo-source",
2576 "UDP echo source is: %s\n"
2577 "IPv4 address usable as echo source: %s\n"
2578 "IPv6 address usable as echo source: none" %
2579 (self.loopback0.name, echo_ip4))
2580 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2581 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2582 unpacked[2], unpacked[3] ^ 1))
2583 self.loopback0.config_ip6()
2584 self.cli_verify_response("show bfd echo-source",
2585 "UDP echo source is: %s\n"
2586 "IPv4 address usable as echo source: %s\n"
2587 "IPv6 address usable as echo source: %s" %
2588 (self.loopback0.name, echo_ip4, echo_ip6))
2589 cli_del = "bfd udp echo-source del"
2590 self.cli_verify_no_response(cli_del)
2591 self.cli_verify_response("show bfd echo-source",
2592 "UDP echo source is not set.")
2594 if __name__ == '__main__':
2595 unittest.main(testRunner=VppTestRunner)