4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import UnexpectedApiReturnValueError
23 from vpp_ip import DpoProto
24 from vpp_ip_route import VppIpRoute, VppRoutePath
29 class AuthKeyFactory(object):
30 """Factory class for creating auth keys with unique conf key ID"""
33 self._conf_key_ids = {}
35 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
36 """ create a random key with unique conf key id """
37 conf_key_id = randint(0, 0xFFFFFFFF)
38 while conf_key_id in self._conf_key_ids:
39 conf_key_id = randint(0, 0xFFFFFFFF)
40 self._conf_key_ids[conf_key_id] = 1
41 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
42 return VppBFDAuthKey(test=test, auth_type=auth_type,
43 conf_key_id=conf_key_id, key=key)
46 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
47 class BFDAPITestCase(VppTestCase):
48 """Bidirectional Forwarding Detection (BFD) - API"""
55 super(BFDAPITestCase, cls).setUpClass()
56 cls.vapi.cli("set log class bfd level debug")
58 cls.create_pg_interfaces(range(2))
59 for i in cls.pg_interfaces:
65 super(BFDAPITestCase, cls).tearDownClass()
69 super(BFDAPITestCase, self).setUp()
70 self.factory = AuthKeyFactory()
72 def test_add_bfd(self):
73 """ create a BFD session """
74 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
75 session.add_vpp_config()
76 self.logger.debug("Session state is %s", session.state)
77 session.remove_vpp_config()
78 session.add_vpp_config()
79 self.logger.debug("Session state is %s", session.state)
80 session.remove_vpp_config()
82 def test_double_add(self):
83 """ create the same BFD session twice (negative case) """
84 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
85 session.add_vpp_config()
87 with self.vapi.expect_negative_api_retval():
88 session.add_vpp_config()
90 session.remove_vpp_config()
92 def test_add_bfd6(self):
93 """ create IPv6 BFD session """
94 session = VppBFDUDPSession(
95 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
96 session.add_vpp_config()
97 self.logger.debug("Session state is %s", session.state)
98 session.remove_vpp_config()
99 session.add_vpp_config()
100 self.logger.debug("Session state is %s", session.state)
101 session.remove_vpp_config()
103 def test_mod_bfd(self):
104 """ modify BFD session parameters """
105 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
106 desired_min_tx=50000,
107 required_min_rx=10000,
109 session.add_vpp_config()
110 s = session.get_bfd_udp_session_dump_entry()
111 self.assert_equal(session.desired_min_tx,
113 "desired min transmit interval")
114 self.assert_equal(session.required_min_rx,
116 "required min receive interval")
117 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
118 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
119 required_min_rx=session.required_min_rx * 2,
120 detect_mult=session.detect_mult * 2)
121 s = session.get_bfd_udp_session_dump_entry()
122 self.assert_equal(session.desired_min_tx,
124 "desired min transmit interval")
125 self.assert_equal(session.required_min_rx,
127 "required min receive interval")
128 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130 def test_add_sha1_keys(self):
131 """ add SHA1 keys """
133 keys = [self.factory.create_random_key(
134 self) for i in range(0, key_count)]
136 self.assertFalse(key.query_vpp_config())
140 self.assertTrue(key.query_vpp_config())
142 indexes = range(key_count)
147 key.remove_vpp_config()
149 for j in range(key_count):
152 self.assertFalse(key.query_vpp_config())
154 self.assertTrue(key.query_vpp_config())
155 # should be removed now
157 self.assertFalse(key.query_vpp_config())
158 # add back and remove again
162 self.assertTrue(key.query_vpp_config())
164 key.remove_vpp_config()
166 self.assertFalse(key.query_vpp_config())
168 def test_add_bfd_sha1(self):
169 """ create a BFD session (SHA1) """
170 key = self.factory.create_random_key(self)
172 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
174 session.add_vpp_config()
175 self.logger.debug("Session state is %s", session.state)
176 session.remove_vpp_config()
177 session.add_vpp_config()
178 self.logger.debug("Session state is %s", session.state)
179 session.remove_vpp_config()
181 def test_double_add_sha1(self):
182 """ create the same BFD session twice (negative case) (SHA1) """
183 key = self.factory.create_random_key(self)
185 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
187 session.add_vpp_config()
188 with self.assertRaises(Exception):
189 session.add_vpp_config()
191 def test_add_auth_nonexistent_key(self):
192 """ create BFD session using non-existent SHA1 (negative case) """
193 session = VppBFDUDPSession(
194 self, self.pg0, self.pg0.remote_ip4,
195 sha1_key=self.factory.create_random_key(self))
196 with self.assertRaises(Exception):
197 session.add_vpp_config()
199 def test_shared_sha1_key(self):
200 """ share single SHA1 key between multiple BFD sessions """
201 key = self.factory.create_random_key(self)
204 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
206 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
207 sha1_key=key, af=AF_INET6),
208 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
210 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
211 sha1_key=key, af=AF_INET6)]
216 e = key.get_bfd_auth_keys_dump_entry()
217 self.assert_equal(e.use_count, len(sessions) - removed,
218 "Use count for shared key")
219 s.remove_vpp_config()
221 e = key.get_bfd_auth_keys_dump_entry()
222 self.assert_equal(e.use_count, len(sessions) - removed,
223 "Use count for shared key")
225 def test_activate_auth(self):
226 """ activate SHA1 authentication """
227 key = self.factory.create_random_key(self)
229 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
230 session.add_vpp_config()
231 session.activate_auth(key)
233 def test_deactivate_auth(self):
234 """ deactivate SHA1 authentication """
235 key = self.factory.create_random_key(self)
237 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
238 session.add_vpp_config()
239 session.activate_auth(key)
240 session.deactivate_auth()
242 def test_change_key(self):
243 """ change SHA1 key """
244 key1 = self.factory.create_random_key(self)
245 key2 = self.factory.create_random_key(self)
246 while key2.conf_key_id == key1.conf_key_id:
247 key2 = self.factory.create_random_key(self)
248 key1.add_vpp_config()
249 key2.add_vpp_config()
250 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
252 session.add_vpp_config()
253 session.activate_auth(key2)
256 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
257 class BFDTestSession(object):
258 """ BFD session as seen from test framework side """
260 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
261 bfd_key_id=None, our_seq_number=None):
264 self.sha1_key = sha1_key
265 self.bfd_key_id = bfd_key_id
266 self.interface = interface
267 self.udp_sport = randint(49152, 65535)
268 if our_seq_number is None:
269 self.our_seq_number = randint(0, 40000000)
271 self.our_seq_number = our_seq_number
272 self.vpp_seq_number = None
273 self.my_discriminator = 0
274 self.desired_min_tx = 300000
275 self.required_min_rx = 300000
276 self.required_min_echo_rx = None
277 self.detect_mult = detect_mult
278 self.diag = BFDDiagCode.no_diagnostic
279 self.your_discriminator = None
280 self.state = BFDState.down
281 self.auth_type = BFDAuthType.no_auth
283 def inc_seq_num(self):
284 """ increment sequence number, wrapping if needed """
285 if self.our_seq_number == 0xFFFFFFFF:
286 self.our_seq_number = 0
288 self.our_seq_number += 1
290 def update(self, my_discriminator=None, your_discriminator=None,
291 desired_min_tx=None, required_min_rx=None,
292 required_min_echo_rx=None, detect_mult=None,
293 diag=None, state=None, auth_type=None):
294 """ update BFD parameters associated with session """
295 if my_discriminator is not None:
296 self.my_discriminator = my_discriminator
297 if your_discriminator is not None:
298 self.your_discriminator = your_discriminator
299 if required_min_rx is not None:
300 self.required_min_rx = required_min_rx
301 if required_min_echo_rx is not None:
302 self.required_min_echo_rx = required_min_echo_rx
303 if desired_min_tx is not None:
304 self.desired_min_tx = desired_min_tx
305 if detect_mult is not None:
306 self.detect_mult = detect_mult
309 if state is not None:
311 if auth_type is not None:
312 self.auth_type = auth_type
314 def fill_packet_fields(self, packet):
315 """ set packet fields with known values in packet """
317 if self.my_discriminator:
318 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
319 self.my_discriminator)
320 bfd.my_discriminator = self.my_discriminator
321 if self.your_discriminator:
322 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
323 self.your_discriminator)
324 bfd.your_discriminator = self.your_discriminator
325 if self.required_min_rx:
326 self.test.logger.debug(
327 "BFD: setting packet.required_min_rx_interval=%s",
328 self.required_min_rx)
329 bfd.required_min_rx_interval = self.required_min_rx
330 if self.required_min_echo_rx:
331 self.test.logger.debug(
332 "BFD: setting packet.required_min_echo_rx=%s",
333 self.required_min_echo_rx)
334 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
335 if self.desired_min_tx:
336 self.test.logger.debug(
337 "BFD: setting packet.desired_min_tx_interval=%s",
339 bfd.desired_min_tx_interval = self.desired_min_tx
341 self.test.logger.debug(
342 "BFD: setting packet.detect_mult=%s", self.detect_mult)
343 bfd.detect_mult = self.detect_mult
345 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
348 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
349 bfd.state = self.state
351 # this is used by a negative test-case
352 self.test.logger.debug("BFD: setting packet.auth_type=%s",
354 bfd.auth_type = self.auth_type
356 def create_packet(self):
357 """ create a BFD packet, reflecting the current state of session """
360 bfd.auth_type = self.sha1_key.auth_type
361 bfd.auth_len = BFD.sha1_auth_len
362 bfd.auth_key_id = self.bfd_key_id
363 bfd.auth_seq_num = self.our_seq_number
364 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
367 if self.af == AF_INET6:
368 packet = (Ether(src=self.interface.remote_mac,
369 dst=self.interface.local_mac) /
370 IPv6(src=self.interface.remote_ip6,
371 dst=self.interface.local_ip6,
373 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
376 packet = (Ether(src=self.interface.remote_mac,
377 dst=self.interface.local_mac) /
378 IP(src=self.interface.remote_ip4,
379 dst=self.interface.local_ip4,
381 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
383 self.test.logger.debug("BFD: Creating packet")
384 self.fill_packet_fields(packet)
386 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
387 "\0" * (20 - len(self.sha1_key.key))
388 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
389 hashlib.sha1(hash_material).hexdigest())
390 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
393 def send_packet(self, packet=None, interface=None):
394 """ send packet on interface, creating the packet if needed """
396 packet = self.create_packet()
397 if interface is None:
398 interface = self.test.pg0
399 self.test.logger.debug(ppp("Sending packet:", packet))
400 interface.add_stream(packet)
403 def verify_sha1_auth(self, packet):
404 """ Verify correctness of authentication in BFD layer. """
406 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
407 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
409 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
410 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
411 if self.vpp_seq_number is None:
412 self.vpp_seq_number = bfd.auth_seq_num
413 self.test.logger.debug("Received initial sequence number: %s" %
416 recvd_seq_num = bfd.auth_seq_num
417 self.test.logger.debug("Received followup sequence number: %s" %
419 if self.vpp_seq_number < 0xffffffff:
420 if self.sha1_key.auth_type == \
421 BFDAuthType.meticulous_keyed_sha1:
422 self.test.assert_equal(recvd_seq_num,
423 self.vpp_seq_number + 1,
424 "BFD sequence number")
426 self.test.assert_in_range(recvd_seq_num,
428 self.vpp_seq_number + 1,
429 "BFD sequence number")
431 if self.sha1_key.auth_type == \
432 BFDAuthType.meticulous_keyed_sha1:
433 self.test.assert_equal(recvd_seq_num, 0,
434 "BFD sequence number")
436 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
437 "BFD sequence number not one of "
438 "(%s, 0)" % self.vpp_seq_number)
439 self.vpp_seq_number = recvd_seq_num
440 # last 20 bytes represent the hash - so replace them with the key,
441 # pad the result with zeros and hash the result
442 hash_material = bfd.original[:-20] + self.sha1_key.key + \
443 "\0" * (20 - len(self.sha1_key.key))
444 expected_hash = hashlib.sha1(hash_material).hexdigest()
445 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
446 expected_hash, "Auth key hash")
448 def verify_bfd(self, packet):
449 """ Verify correctness of BFD layer. """
451 self.test.assert_equal(bfd.version, 1, "BFD version")
452 self.test.assert_equal(bfd.your_discriminator,
453 self.my_discriminator,
454 "BFD - your discriminator")
456 self.verify_sha1_auth(packet)
459 def bfd_session_up(test):
460 """ Bring BFD session up """
461 test.logger.info("BFD: Waiting for slow hello")
462 p = wait_for_bfd_packet(test, 2)
464 if hasattr(test, 'vpp_clock_offset'):
465 old_offset = test.vpp_clock_offset
466 test.vpp_clock_offset = time.time() - p.time
467 test.logger.debug("BFD: Calculated vpp clock offset: %s",
468 test.vpp_clock_offset)
470 test.assertAlmostEqual(
471 old_offset, test.vpp_clock_offset, delta=0.5,
472 msg="vpp clock offset not stable (new: %s, old: %s)" %
473 (test.vpp_clock_offset, old_offset))
474 test.logger.info("BFD: Sending Init")
475 test.test_session.update(my_discriminator=randint(0, 40000000),
476 your_discriminator=p[BFD].my_discriminator,
478 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
479 BFDAuthType.meticulous_keyed_sha1:
480 test.test_session.inc_seq_num()
481 test.test_session.send_packet()
482 test.logger.info("BFD: Waiting for event")
483 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
484 verify_event(test, e, expected_state=BFDState.up)
485 test.logger.info("BFD: Session is Up")
486 test.test_session.update(state=BFDState.up)
487 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
488 BFDAuthType.meticulous_keyed_sha1:
489 test.test_session.inc_seq_num()
490 test.test_session.send_packet()
491 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
494 def bfd_session_down(test):
495 """ Bring BFD session down """
496 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
497 test.test_session.update(state=BFDState.down)
498 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
499 BFDAuthType.meticulous_keyed_sha1:
500 test.test_session.inc_seq_num()
501 test.test_session.send_packet()
502 test.logger.info("BFD: Waiting for event")
503 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
504 verify_event(test, e, expected_state=BFDState.down)
505 test.logger.info("BFD: Session is Down")
506 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
509 def verify_bfd_session_config(test, session, state=None):
510 dump = session.get_bfd_udp_session_dump_entry()
511 test.assertIsNotNone(dump)
512 # since dump is not none, we have verified that sw_if_index and addresses
513 # are valid (in get_bfd_udp_session_dump_entry)
515 test.assert_equal(dump.state, state, "session state")
516 test.assert_equal(dump.required_min_rx, session.required_min_rx,
517 "required min rx interval")
518 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
519 "desired min tx interval")
520 test.assert_equal(dump.detect_mult, session.detect_mult,
522 if session.sha1_key is None:
523 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
525 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
526 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
528 test.assert_equal(dump.conf_key_id,
529 session.sha1_key.conf_key_id,
533 def verify_ip(test, packet):
534 """ Verify correctness of IP layer. """
535 if test.vpp_session.af == AF_INET6:
537 local_ip = test.pg0.local_ip6
538 remote_ip = test.pg0.remote_ip6
539 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
542 local_ip = test.pg0.local_ip4
543 remote_ip = test.pg0.remote_ip4
544 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
545 test.assert_equal(ip.src, local_ip, "IP source address")
546 test.assert_equal(ip.dst, remote_ip, "IP destination address")
549 def verify_udp(test, packet):
550 """ Verify correctness of UDP layer. """
552 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
553 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
557 def verify_event(test, event, expected_state):
558 """ Verify correctness of event values. """
560 test.logger.debug("BFD: Event: %s" % repr(e))
561 test.assert_equal(e.sw_if_index,
562 test.vpp_session.interface.sw_if_index,
563 "BFD interface index")
565 if test.vpp_session.af == AF_INET6:
567 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
568 if test.vpp_session.af == AF_INET:
569 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
570 "Local IPv4 address")
571 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
574 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
575 "Local IPv6 address")
576 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
578 test.assert_equal(e.state, expected_state, BFDState)
581 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
582 """ wait for BFD packet and verify its correctness
584 :param timeout: how long to wait
585 :param pcap_time_min: ignore packets with pcap timestamp lower than this
587 :returns: tuple (packet, time spent waiting for packet)
589 test.logger.info("BFD: Waiting for BFD packet")
590 deadline = time.time() + timeout
595 test.assert_in_range(counter, 0, 100, "number of packets ignored")
596 time_left = deadline - time.time()
598 raise CaptureTimeoutError("Packet did not arrive within timeout")
599 p = test.pg0.wait_for_packet(timeout=time_left)
600 test.logger.debug(ppp("BFD: Got packet:", p))
601 if pcap_time_min is not None and p.time < pcap_time_min:
602 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
603 "pcap time min %s):" %
604 (p.time, pcap_time_min), p))
609 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
611 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
614 test.test_session.verify_bfd(p)
618 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
619 class BFD4TestCase(VppTestCase):
620 """Bidirectional Forwarding Detection (BFD)"""
623 vpp_clock_offset = None
629 super(BFD4TestCase, cls).setUpClass()
630 cls.vapi.cli("set log class bfd level debug")
632 cls.create_pg_interfaces([0])
633 cls.create_loopback_interfaces(1)
634 cls.loopback0 = cls.lo_interfaces[0]
635 cls.loopback0.config_ip4()
636 cls.loopback0.admin_up()
638 cls.pg0.configure_ipv4_neighbors()
640 cls.pg0.resolve_arp()
643 super(BFD4TestCase, cls).tearDownClass()
647 super(BFD4TestCase, self).setUp()
648 self.factory = AuthKeyFactory()
649 self.vapi.want_bfd_events()
650 self.pg0.enable_capture()
652 self.vpp_session = VppBFDUDPSession(self, self.pg0,
654 self.vpp_session.add_vpp_config()
655 self.vpp_session.admin_up()
656 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
658 self.vapi.want_bfd_events(enable_disable=0)
662 if not self.vpp_dead:
663 self.vapi.want_bfd_events(enable_disable=0)
664 self.vapi.collect_events() # clear the event queue
665 super(BFD4TestCase, self).tearDown()
667 def test_session_up(self):
668 """ bring BFD session up """
671 def test_session_up_by_ip(self):
672 """ bring BFD session up - first frame looked up by address pair """
673 self.logger.info("BFD: Sending Slow control frame")
674 self.test_session.update(my_discriminator=randint(0, 40000000))
675 self.test_session.send_packet()
676 self.pg0.enable_capture()
677 p = self.pg0.wait_for_packet(1)
678 self.assert_equal(p[BFD].your_discriminator,
679 self.test_session.my_discriminator,
680 "BFD - your discriminator")
681 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
682 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
684 self.logger.info("BFD: Waiting for event")
685 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
686 verify_event(self, e, expected_state=BFDState.init)
687 self.logger.info("BFD: Sending Up")
688 self.test_session.send_packet()
689 self.logger.info("BFD: Waiting for event")
690 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
691 verify_event(self, e, expected_state=BFDState.up)
692 self.logger.info("BFD: Session is Up")
693 self.test_session.update(state=BFDState.up)
694 self.test_session.send_packet()
695 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
697 def test_session_down(self):
698 """ bring BFD session down """
700 bfd_session_down(self)
702 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
703 def test_hold_up(self):
704 """ hold BFD session up """
706 for dummy in range(self.test_session.detect_mult * 2):
707 wait_for_bfd_packet(self)
708 self.test_session.send_packet()
709 self.assert_equal(len(self.vapi.collect_events()), 0,
710 "number of bfd events")
712 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
713 def test_slow_timer(self):
714 """ verify slow periodic control frames while session down """
716 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
717 prev_packet = wait_for_bfd_packet(self, 2)
718 for dummy in range(packet_count):
719 next_packet = wait_for_bfd_packet(self, 2)
720 time_diff = next_packet.time - prev_packet.time
721 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
722 # to work around timing issues
723 self.assert_in_range(
724 time_diff, 0.70, 1.05, "time between slow packets")
725 prev_packet = next_packet
727 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
728 def test_zero_remote_min_rx(self):
729 """ no packets when zero remote required min rx interval """
731 self.test_session.update(required_min_rx=0)
732 self.test_session.send_packet()
733 for dummy in range(self.test_session.detect_mult):
734 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
735 "sleep before transmitting bfd packet")
736 self.test_session.send_packet()
738 p = wait_for_bfd_packet(self, timeout=0)
739 self.logger.error(ppp("Received unexpected packet:", p))
740 except CaptureTimeoutError:
743 len(self.vapi.collect_events()), 0, "number of bfd events")
744 self.test_session.update(required_min_rx=300000)
745 for dummy in range(3):
746 self.test_session.send_packet()
748 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
750 len(self.vapi.collect_events()), 0, "number of bfd events")
752 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
753 def test_conn_down(self):
754 """ verify session goes down after inactivity """
756 detection_time = self.test_session.detect_mult *\
757 self.vpp_session.required_min_rx / USEC_IN_SEC
758 self.sleep(detection_time, "waiting for BFD session time-out")
759 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
760 verify_event(self, e, expected_state=BFDState.down)
762 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
763 def test_large_required_min_rx(self):
764 """ large remote required min rx interval """
766 p = wait_for_bfd_packet(self)
768 self.test_session.update(required_min_rx=interval)
769 self.test_session.send_packet()
770 time_mark = time.time()
772 # busy wait here, trying to collect a packet or event, vpp is not
773 # allowed to send packets and the session will timeout first - so the
774 # Up->Down event must arrive before any packets do
775 while time.time() < time_mark + interval / USEC_IN_SEC:
777 p = wait_for_bfd_packet(self, timeout=0)
778 # if vpp managed to send a packet before we did the session
779 # session update, then that's fine, ignore it
780 if p.time < time_mark - self.vpp_clock_offset:
782 self.logger.error(ppp("Received unexpected packet:", p))
784 except CaptureTimeoutError:
786 events = self.vapi.collect_events()
788 verify_event(self, events[0], BFDState.down)
790 self.assert_equal(count, 0, "number of packets received")
792 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
793 def test_immediate_remote_min_rx_reduction(self):
794 """ immediately honor remote required min rx reduction """
795 self.vpp_session.remove_vpp_config()
796 self.vpp_session = VppBFDUDPSession(
797 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
798 self.pg0.enable_capture()
799 self.vpp_session.add_vpp_config()
800 self.test_session.update(desired_min_tx=1000000,
801 required_min_rx=1000000)
803 reference_packet = wait_for_bfd_packet(self)
804 time_mark = time.time()
806 self.test_session.update(required_min_rx=interval)
807 self.test_session.send_packet()
808 extra_time = time.time() - time_mark
809 p = wait_for_bfd_packet(self)
810 # first packet is allowed to be late by time we spent doing the update
811 # calculated in extra_time
812 self.assert_in_range(p.time - reference_packet.time,
813 .95 * 0.75 * interval / USEC_IN_SEC,
814 1.05 * interval / USEC_IN_SEC + extra_time,
815 "time between BFD packets")
817 for dummy in range(3):
818 p = wait_for_bfd_packet(self)
819 diff = p.time - reference_packet.time
820 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
821 1.05 * interval / USEC_IN_SEC,
822 "time between BFD packets")
825 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
826 def test_modify_req_min_rx_double(self):
827 """ modify session - double required min rx """
829 p = wait_for_bfd_packet(self)
830 self.test_session.update(desired_min_tx=10000,
831 required_min_rx=10000)
832 self.test_session.send_packet()
833 # double required min rx
834 self.vpp_session.modify_parameters(
835 required_min_rx=2 * self.vpp_session.required_min_rx)
836 p = wait_for_bfd_packet(
837 self, pcap_time_min=time.time() - self.vpp_clock_offset)
838 # poll bit needs to be set
839 self.assertIn("P", p.sprintf("%BFD.flags%"),
840 "Poll bit not set in BFD packet")
841 # finish poll sequence with final packet
842 final = self.test_session.create_packet()
843 final[BFD].flags = "F"
844 timeout = self.test_session.detect_mult * \
845 max(self.test_session.desired_min_tx,
846 self.vpp_session.required_min_rx) / USEC_IN_SEC
847 self.test_session.send_packet(final)
848 time_mark = time.time()
849 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
850 verify_event(self, e, expected_state=BFDState.down)
851 time_to_event = time.time() - time_mark
852 self.assert_in_range(time_to_event, .9 * timeout,
853 1.1 * timeout, "session timeout")
855 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
856 def test_modify_req_min_rx_halve(self):
857 """ modify session - halve required min rx """
858 self.vpp_session.modify_parameters(
859 required_min_rx=2 * self.vpp_session.required_min_rx)
861 p = wait_for_bfd_packet(self)
862 self.test_session.update(desired_min_tx=10000,
863 required_min_rx=10000)
864 self.test_session.send_packet()
865 p = wait_for_bfd_packet(
866 self, pcap_time_min=time.time() - self.vpp_clock_offset)
867 # halve required min rx
868 old_required_min_rx = self.vpp_session.required_min_rx
869 self.vpp_session.modify_parameters(
870 required_min_rx=0.5 * self.vpp_session.required_min_rx)
871 # now we wait 0.8*3*old-req-min-rx and the session should still be up
872 self.sleep(0.8 * self.vpp_session.detect_mult *
873 old_required_min_rx / USEC_IN_SEC,
874 "wait before finishing poll sequence")
875 self.assert_equal(len(self.vapi.collect_events()), 0,
876 "number of bfd events")
877 p = wait_for_bfd_packet(self)
878 # poll bit needs to be set
879 self.assertIn("P", p.sprintf("%BFD.flags%"),
880 "Poll bit not set in BFD packet")
881 # finish poll sequence with final packet
882 final = self.test_session.create_packet()
883 final[BFD].flags = "F"
884 self.test_session.send_packet(final)
885 # now the session should time out under new conditions
886 detection_time = self.test_session.detect_mult *\
887 self.vpp_session.required_min_rx / USEC_IN_SEC
889 e = self.vapi.wait_for_event(
890 2 * detection_time, "bfd_udp_session_details")
892 self.assert_in_range(after - before,
893 0.9 * detection_time,
894 1.1 * detection_time,
895 "time before bfd session goes down")
896 verify_event(self, e, expected_state=BFDState.down)
898 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
899 def test_modify_detect_mult(self):
900 """ modify detect multiplier """
902 p = wait_for_bfd_packet(self)
903 self.vpp_session.modify_parameters(detect_mult=1)
904 p = wait_for_bfd_packet(
905 self, pcap_time_min=time.time() - self.vpp_clock_offset)
906 self.assert_equal(self.vpp_session.detect_mult,
909 # poll bit must not be set
910 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
911 "Poll bit not set in BFD packet")
912 self.vpp_session.modify_parameters(detect_mult=10)
913 p = wait_for_bfd_packet(
914 self, pcap_time_min=time.time() - self.vpp_clock_offset)
915 self.assert_equal(self.vpp_session.detect_mult,
918 # poll bit must not be set
919 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
920 "Poll bit not set in BFD packet")
922 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
923 def test_queued_poll(self):
924 """ test poll sequence queueing """
926 p = wait_for_bfd_packet(self)
927 self.vpp_session.modify_parameters(
928 required_min_rx=2 * self.vpp_session.required_min_rx)
929 p = wait_for_bfd_packet(self)
930 poll_sequence_start = time.time()
931 poll_sequence_length_min = 0.5
932 send_final_after = time.time() + poll_sequence_length_min
933 # poll bit needs to be set
934 self.assertIn("P", p.sprintf("%BFD.flags%"),
935 "Poll bit not set in BFD packet")
936 self.assert_equal(p[BFD].required_min_rx_interval,
937 self.vpp_session.required_min_rx,
938 "BFD required min rx interval")
939 self.vpp_session.modify_parameters(
940 required_min_rx=2 * self.vpp_session.required_min_rx)
941 # 2nd poll sequence should be queued now
942 # don't send the reply back yet, wait for some time to emulate
943 # longer round-trip time
945 while time.time() < send_final_after:
946 self.test_session.send_packet()
947 p = wait_for_bfd_packet(self)
948 self.assert_equal(len(self.vapi.collect_events()), 0,
949 "number of bfd events")
950 self.assert_equal(p[BFD].required_min_rx_interval,
951 self.vpp_session.required_min_rx,
952 "BFD required min rx interval")
954 # poll bit must be set
955 self.assertIn("P", p.sprintf("%BFD.flags%"),
956 "Poll bit not set in BFD packet")
957 final = self.test_session.create_packet()
958 final[BFD].flags = "F"
959 self.test_session.send_packet(final)
960 # finish 1st with final
961 poll_sequence_length = time.time() - poll_sequence_start
962 # vpp must wait for some time before starting new poll sequence
963 poll_no_2_started = False
964 for dummy in range(2 * packet_count):
965 p = wait_for_bfd_packet(self)
966 self.assert_equal(len(self.vapi.collect_events()), 0,
967 "number of bfd events")
968 if "P" in p.sprintf("%BFD.flags%"):
969 poll_no_2_started = True
970 if time.time() < poll_sequence_start + poll_sequence_length:
971 raise Exception("VPP started 2nd poll sequence too soon")
972 final = self.test_session.create_packet()
973 final[BFD].flags = "F"
974 self.test_session.send_packet(final)
977 self.test_session.send_packet()
978 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
979 # finish 2nd with final
980 final = self.test_session.create_packet()
981 final[BFD].flags = "F"
982 self.test_session.send_packet(final)
983 p = wait_for_bfd_packet(self)
984 # poll bit must not be set
985 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
986 "Poll bit set in BFD packet")
988 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
989 def test_poll_response(self):
990 """ test correct response to control frame with poll bit set """
992 poll = self.test_session.create_packet()
993 poll[BFD].flags = "P"
994 self.test_session.send_packet(poll)
995 final = wait_for_bfd_packet(
996 self, pcap_time_min=time.time() - self.vpp_clock_offset)
997 self.assertIn("F", final.sprintf("%BFD.flags%"))
999 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1000 def test_no_periodic_if_remote_demand(self):
1001 """ no periodic frames outside poll sequence if remote demand set """
1002 bfd_session_up(self)
1003 demand = self.test_session.create_packet()
1004 demand[BFD].flags = "D"
1005 self.test_session.send_packet(demand)
1006 transmit_time = 0.9 \
1007 * max(self.vpp_session.required_min_rx,
1008 self.test_session.desired_min_tx) \
1011 for dummy in range(self.test_session.detect_mult * 2):
1012 time.sleep(transmit_time)
1013 self.test_session.send_packet(demand)
1015 p = wait_for_bfd_packet(self, timeout=0)
1016 self.logger.error(ppp("Received unexpected packet:", p))
1018 except CaptureTimeoutError:
1020 events = self.vapi.collect_events()
1022 self.logger.error("Received unexpected event: %s", e)
1023 self.assert_equal(count, 0, "number of packets received")
1024 self.assert_equal(len(events), 0, "number of events received")
1026 def test_echo_looped_back(self):
1027 """ echo packets looped back """
1028 # don't need a session in this case..
1029 self.vpp_session.remove_vpp_config()
1030 self.pg0.enable_capture()
1031 echo_packet_count = 10
1032 # random source port low enough to increment a few times..
1033 udp_sport_tx = randint(1, 50000)
1034 udp_sport_rx = udp_sport_tx
1035 echo_packet = (Ether(src=self.pg0.remote_mac,
1036 dst=self.pg0.local_mac) /
1037 IP(src=self.pg0.remote_ip4,
1038 dst=self.pg0.remote_ip4) /
1039 UDP(dport=BFD.udp_dport_echo) /
1040 Raw("this should be looped back"))
1041 for dummy in range(echo_packet_count):
1042 self.sleep(.01, "delay between echo packets")
1043 echo_packet[UDP].sport = udp_sport_tx
1045 self.logger.debug(ppp("Sending packet:", echo_packet))
1046 self.pg0.add_stream(echo_packet)
1048 for dummy in range(echo_packet_count):
1049 p = self.pg0.wait_for_packet(1)
1050 self.logger.debug(ppp("Got packet:", p))
1052 self.assert_equal(self.pg0.remote_mac,
1053 ether.dst, "Destination MAC")
1054 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1056 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1057 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1059 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1060 "UDP destination port")
1061 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1063 # need to compare the hex payload here, otherwise BFD_vpp_echo
1065 self.assertEqual(str(p[UDP].payload),
1066 str(echo_packet[UDP].payload),
1067 "Received packet is not the echo packet sent")
1068 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1069 "ECHO packet identifier for test purposes)")
1071 def test_echo(self):
1072 """ echo function """
1073 bfd_session_up(self)
1074 self.test_session.update(required_min_echo_rx=150000)
1075 self.test_session.send_packet()
1076 detection_time = self.test_session.detect_mult *\
1077 self.vpp_session.required_min_rx / USEC_IN_SEC
1078 # echo shouldn't work without echo source set
1079 for dummy in range(10):
1080 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1081 self.sleep(sleep, "delay before sending bfd packet")
1082 self.test_session.send_packet()
1083 p = wait_for_bfd_packet(
1084 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1085 self.assert_equal(p[BFD].required_min_rx_interval,
1086 self.vpp_session.required_min_rx,
1087 "BFD required min rx interval")
1088 self.test_session.send_packet()
1089 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1091 # should be turned on - loopback echo packets
1092 for dummy in range(3):
1093 loop_until = time.time() + 0.75 * detection_time
1094 while time.time() < loop_until:
1095 p = self.pg0.wait_for_packet(1)
1096 self.logger.debug(ppp("Got packet:", p))
1097 if p[UDP].dport == BFD.udp_dport_echo:
1099 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1100 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1101 "BFD ECHO src IP equal to loopback IP")
1102 self.logger.debug(ppp("Looping back packet:", p))
1103 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1104 "ECHO packet destination MAC address")
1105 p[Ether].dst = self.pg0.local_mac
1106 self.pg0.add_stream(p)
1109 elif p.haslayer(BFD):
1111 self.assertGreaterEqual(
1112 p[BFD].required_min_rx_interval,
1114 if "P" in p.sprintf("%BFD.flags%"):
1115 final = self.test_session.create_packet()
1116 final[BFD].flags = "F"
1117 self.test_session.send_packet(final)
1119 raise Exception(ppp("Received unknown packet:", p))
1121 self.assert_equal(len(self.vapi.collect_events()), 0,
1122 "number of bfd events")
1123 self.test_session.send_packet()
1124 self.assertTrue(echo_seen, "No echo packets received")
1126 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1127 def test_echo_fail(self):
1128 """ session goes down if echo function fails """
1129 bfd_session_up(self)
1130 self.test_session.update(required_min_echo_rx=150000)
1131 self.test_session.send_packet()
1132 detection_time = self.test_session.detect_mult *\
1133 self.vpp_session.required_min_rx / USEC_IN_SEC
1134 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1135 # echo function should be used now, but we will drop the echo packets
1136 verified_diag = False
1137 for dummy in range(3):
1138 loop_until = time.time() + 0.75 * detection_time
1139 while time.time() < loop_until:
1140 p = self.pg0.wait_for_packet(1)
1141 self.logger.debug(ppp("Got packet:", p))
1142 if p[UDP].dport == BFD.udp_dport_echo:
1145 elif p.haslayer(BFD):
1146 if "P" in p.sprintf("%BFD.flags%"):
1147 self.assertGreaterEqual(
1148 p[BFD].required_min_rx_interval,
1150 final = self.test_session.create_packet()
1151 final[BFD].flags = "F"
1152 self.test_session.send_packet(final)
1153 if p[BFD].state == BFDState.down:
1154 self.assert_equal(p[BFD].diag,
1155 BFDDiagCode.echo_function_failed,
1157 verified_diag = True
1159 raise Exception(ppp("Received unknown packet:", p))
1160 self.test_session.send_packet()
1161 events = self.vapi.collect_events()
1162 self.assert_equal(len(events), 1, "number of bfd events")
1163 self.assert_equal(events[0].state, BFDState.down, BFDState)
1164 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1166 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1167 def test_echo_stop(self):
1168 """ echo function stops if peer sets required min echo rx zero """
1169 bfd_session_up(self)
1170 self.test_session.update(required_min_echo_rx=150000)
1171 self.test_session.send_packet()
1172 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1173 # wait for first echo packet
1175 p = self.pg0.wait_for_packet(1)
1176 self.logger.debug(ppp("Got packet:", p))
1177 if p[UDP].dport == BFD.udp_dport_echo:
1178 self.logger.debug(ppp("Looping back packet:", p))
1179 p[Ether].dst = self.pg0.local_mac
1180 self.pg0.add_stream(p)
1183 elif p.haslayer(BFD):
1187 raise Exception(ppp("Received unknown packet:", p))
1188 self.test_session.update(required_min_echo_rx=0)
1189 self.test_session.send_packet()
1190 # echo packets shouldn't arrive anymore
1191 for dummy in range(5):
1192 wait_for_bfd_packet(
1193 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1194 self.test_session.send_packet()
1195 events = self.vapi.collect_events()
1196 self.assert_equal(len(events), 0, "number of bfd events")
1198 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1199 def test_echo_source_removed(self):
1200 """ echo function stops if echo source is removed """
1201 bfd_session_up(self)
1202 self.test_session.update(required_min_echo_rx=150000)
1203 self.test_session.send_packet()
1204 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1205 # wait for first echo packet
1207 p = self.pg0.wait_for_packet(1)
1208 self.logger.debug(ppp("Got packet:", p))
1209 if p[UDP].dport == BFD.udp_dport_echo:
1210 self.logger.debug(ppp("Looping back packet:", p))
1211 p[Ether].dst = self.pg0.local_mac
1212 self.pg0.add_stream(p)
1215 elif p.haslayer(BFD):
1219 raise Exception(ppp("Received unknown packet:", p))
1220 self.vapi.bfd_udp_del_echo_source()
1221 self.test_session.send_packet()
1222 # echo packets shouldn't arrive anymore
1223 for dummy in range(5):
1224 wait_for_bfd_packet(
1225 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1226 self.test_session.send_packet()
1227 events = self.vapi.collect_events()
1228 self.assert_equal(len(events), 0, "number of bfd events")
1230 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1231 def test_stale_echo(self):
1232 """ stale echo packets don't keep a session up """
1233 bfd_session_up(self)
1234 self.test_session.update(required_min_echo_rx=150000)
1235 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1236 self.test_session.send_packet()
1237 # should be turned on - loopback echo packets
1241 for dummy in range(10 * self.vpp_session.detect_mult):
1242 p = self.pg0.wait_for_packet(1)
1243 if p[UDP].dport == BFD.udp_dport_echo:
1244 if echo_packet is None:
1245 self.logger.debug(ppp("Got first echo packet:", p))
1247 timeout_at = time.time() + self.vpp_session.detect_mult * \
1248 self.test_session.required_min_echo_rx / USEC_IN_SEC
1250 self.logger.debug(ppp("Got followup echo packet:", p))
1251 self.logger.debug(ppp("Looping back first echo packet:", p))
1252 echo_packet[Ether].dst = self.pg0.local_mac
1253 self.pg0.add_stream(echo_packet)
1255 elif p.haslayer(BFD):
1256 self.logger.debug(ppp("Got packet:", p))
1257 if "P" in p.sprintf("%BFD.flags%"):
1258 final = self.test_session.create_packet()
1259 final[BFD].flags = "F"
1260 self.test_session.send_packet(final)
1261 if p[BFD].state == BFDState.down:
1262 self.assertIsNotNone(
1264 "Session went down before first echo packet received")
1266 self.assertGreaterEqual(
1268 "Session timeout at %s, but is expected at %s" %
1270 self.assert_equal(p[BFD].diag,
1271 BFDDiagCode.echo_function_failed,
1273 events = self.vapi.collect_events()
1274 self.assert_equal(len(events), 1, "number of bfd events")
1275 self.assert_equal(events[0].state, BFDState.down, BFDState)
1279 raise Exception(ppp("Received unknown packet:", p))
1280 self.test_session.send_packet()
1281 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1283 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1284 def test_invalid_echo_checksum(self):
1285 """ echo packets with invalid checksum don't keep a session up """
1286 bfd_session_up(self)
1287 self.test_session.update(required_min_echo_rx=150000)
1288 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1289 self.test_session.send_packet()
1290 # should be turned on - loopback echo packets
1293 for dummy in range(10 * self.vpp_session.detect_mult):
1294 p = self.pg0.wait_for_packet(1)
1295 if p[UDP].dport == BFD.udp_dport_echo:
1296 self.logger.debug(ppp("Got echo packet:", p))
1297 if timeout_at is None:
1298 timeout_at = time.time() + self.vpp_session.detect_mult * \
1299 self.test_session.required_min_echo_rx / USEC_IN_SEC
1300 p[BFD_vpp_echo].checksum = getrandbits(64)
1301 p[Ether].dst = self.pg0.local_mac
1302 self.logger.debug(ppp("Looping back modified echo packet:", p))
1303 self.pg0.add_stream(p)
1305 elif p.haslayer(BFD):
1306 self.logger.debug(ppp("Got packet:", p))
1307 if "P" in p.sprintf("%BFD.flags%"):
1308 final = self.test_session.create_packet()
1309 final[BFD].flags = "F"
1310 self.test_session.send_packet(final)
1311 if p[BFD].state == BFDState.down:
1312 self.assertIsNotNone(
1314 "Session went down before first echo packet received")
1316 self.assertGreaterEqual(
1318 "Session timeout at %s, but is expected at %s" %
1320 self.assert_equal(p[BFD].diag,
1321 BFDDiagCode.echo_function_failed,
1323 events = self.vapi.collect_events()
1324 self.assert_equal(len(events), 1, "number of bfd events")
1325 self.assert_equal(events[0].state, BFDState.down, BFDState)
1329 raise Exception(ppp("Received unknown packet:", p))
1330 self.test_session.send_packet()
1331 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1333 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1334 def test_admin_up_down(self):
1335 """ put session admin-up and admin-down """
1336 bfd_session_up(self)
1337 self.vpp_session.admin_down()
1338 self.pg0.enable_capture()
1339 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1340 verify_event(self, e, expected_state=BFDState.admin_down)
1341 for dummy in range(2):
1342 p = wait_for_bfd_packet(self)
1343 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1344 # try to bring session up - shouldn't be possible
1345 self.test_session.update(state=BFDState.init)
1346 self.test_session.send_packet()
1347 for dummy in range(2):
1348 p = wait_for_bfd_packet(self)
1349 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1350 self.vpp_session.admin_up()
1351 self.test_session.update(state=BFDState.down)
1352 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1353 verify_event(self, e, expected_state=BFDState.down)
1354 p = wait_for_bfd_packet(
1355 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1356 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1357 self.test_session.send_packet()
1358 p = wait_for_bfd_packet(
1359 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1360 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1361 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1362 verify_event(self, e, expected_state=BFDState.init)
1363 self.test_session.update(state=BFDState.up)
1364 self.test_session.send_packet()
1365 p = wait_for_bfd_packet(
1366 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1367 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1368 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1369 verify_event(self, e, expected_state=BFDState.up)
1371 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1372 def test_config_change_remote_demand(self):
1373 """ configuration change while peer in demand mode """
1374 bfd_session_up(self)
1375 demand = self.test_session.create_packet()
1376 demand[BFD].flags = "D"
1377 self.test_session.send_packet(demand)
1378 self.vpp_session.modify_parameters(
1379 required_min_rx=2 * self.vpp_session.required_min_rx)
1380 p = wait_for_bfd_packet(
1381 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1382 # poll bit must be set
1383 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1384 # terminate poll sequence
1385 final = self.test_session.create_packet()
1386 final[BFD].flags = "D+F"
1387 self.test_session.send_packet(final)
1388 # vpp should be quiet now again
1389 transmit_time = 0.9 \
1390 * max(self.vpp_session.required_min_rx,
1391 self.test_session.desired_min_tx) \
1394 for dummy in range(self.test_session.detect_mult * 2):
1395 time.sleep(transmit_time)
1396 self.test_session.send_packet(demand)
1398 p = wait_for_bfd_packet(self, timeout=0)
1399 self.logger.error(ppp("Received unexpected packet:", p))
1401 except CaptureTimeoutError:
1403 events = self.vapi.collect_events()
1405 self.logger.error("Received unexpected event: %s", e)
1406 self.assert_equal(count, 0, "number of packets received")
1407 self.assert_equal(len(events), 0, "number of events received")
1409 def test_intf_deleted(self):
1410 """ interface with bfd session deleted """
1411 intf = VppLoInterface(self)
1414 sw_if_index = intf.sw_if_index
1415 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1416 vpp_session.add_vpp_config()
1417 vpp_session.admin_up()
1418 intf.remove_vpp_config()
1419 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1420 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1421 self.assertFalse(vpp_session.query_vpp_config())
1424 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1425 class BFD6TestCase(VppTestCase):
1426 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1429 vpp_clock_offset = None
1434 def setUpClass(cls):
1435 super(BFD6TestCase, cls).setUpClass()
1436 cls.vapi.cli("set log class bfd level debug")
1438 cls.create_pg_interfaces([0])
1439 cls.pg0.config_ip6()
1440 cls.pg0.configure_ipv6_neighbors()
1442 cls.pg0.resolve_ndp()
1443 cls.create_loopback_interfaces(1)
1444 cls.loopback0 = cls.lo_interfaces[0]
1445 cls.loopback0.config_ip6()
1446 cls.loopback0.admin_up()
1449 super(BFD6TestCase, cls).tearDownClass()
1453 super(BFD6TestCase, self).setUp()
1454 self.factory = AuthKeyFactory()
1455 self.vapi.want_bfd_events()
1456 self.pg0.enable_capture()
1458 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1459 self.pg0.remote_ip6,
1461 self.vpp_session.add_vpp_config()
1462 self.vpp_session.admin_up()
1463 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1464 self.logger.debug(self.vapi.cli("show adj nbr"))
1466 self.vapi.want_bfd_events(enable_disable=0)
1470 if not self.vpp_dead:
1471 self.vapi.want_bfd_events(enable_disable=0)
1472 self.vapi.collect_events() # clear the event queue
1473 super(BFD6TestCase, self).tearDown()
1475 def test_session_up(self):
1476 """ bring BFD session up """
1477 bfd_session_up(self)
1479 def test_session_up_by_ip(self):
1480 """ bring BFD session up - first frame looked up by address pair """
1481 self.logger.info("BFD: Sending Slow control frame")
1482 self.test_session.update(my_discriminator=randint(0, 40000000))
1483 self.test_session.send_packet()
1484 self.pg0.enable_capture()
1485 p = self.pg0.wait_for_packet(1)
1486 self.assert_equal(p[BFD].your_discriminator,
1487 self.test_session.my_discriminator,
1488 "BFD - your discriminator")
1489 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1490 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1492 self.logger.info("BFD: Waiting for event")
1493 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1494 verify_event(self, e, expected_state=BFDState.init)
1495 self.logger.info("BFD: Sending Up")
1496 self.test_session.send_packet()
1497 self.logger.info("BFD: Waiting for event")
1498 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1499 verify_event(self, e, expected_state=BFDState.up)
1500 self.logger.info("BFD: Session is Up")
1501 self.test_session.update(state=BFDState.up)
1502 self.test_session.send_packet()
1503 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1505 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1506 def test_hold_up(self):
1507 """ hold BFD session up """
1508 bfd_session_up(self)
1509 for dummy in range(self.test_session.detect_mult * 2):
1510 wait_for_bfd_packet(self)
1511 self.test_session.send_packet()
1512 self.assert_equal(len(self.vapi.collect_events()), 0,
1513 "number of bfd events")
1514 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1516 def test_echo_looped_back(self):
1517 """ echo packets looped back """
1518 # don't need a session in this case..
1519 self.vpp_session.remove_vpp_config()
1520 self.pg0.enable_capture()
1521 echo_packet_count = 10
1522 # random source port low enough to increment a few times..
1523 udp_sport_tx = randint(1, 50000)
1524 udp_sport_rx = udp_sport_tx
1525 echo_packet = (Ether(src=self.pg0.remote_mac,
1526 dst=self.pg0.local_mac) /
1527 IPv6(src=self.pg0.remote_ip6,
1528 dst=self.pg0.remote_ip6) /
1529 UDP(dport=BFD.udp_dport_echo) /
1530 Raw("this should be looped back"))
1531 for dummy in range(echo_packet_count):
1532 self.sleep(.01, "delay between echo packets")
1533 echo_packet[UDP].sport = udp_sport_tx
1535 self.logger.debug(ppp("Sending packet:", echo_packet))
1536 self.pg0.add_stream(echo_packet)
1538 for dummy in range(echo_packet_count):
1539 p = self.pg0.wait_for_packet(1)
1540 self.logger.debug(ppp("Got packet:", p))
1542 self.assert_equal(self.pg0.remote_mac,
1543 ether.dst, "Destination MAC")
1544 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1546 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1547 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1549 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1550 "UDP destination port")
1551 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1553 # need to compare the hex payload here, otherwise BFD_vpp_echo
1555 self.assertEqual(str(p[UDP].payload),
1556 str(echo_packet[UDP].payload),
1557 "Received packet is not the echo packet sent")
1558 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1559 "ECHO packet identifier for test purposes)")
1560 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1561 "ECHO packet identifier for test purposes)")
1563 def test_echo(self):
1564 """ echo function """
1565 bfd_session_up(self)
1566 self.test_session.update(required_min_echo_rx=150000)
1567 self.test_session.send_packet()
1568 detection_time = self.test_session.detect_mult *\
1569 self.vpp_session.required_min_rx / USEC_IN_SEC
1570 # echo shouldn't work without echo source set
1571 for dummy in range(10):
1572 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1573 self.sleep(sleep, "delay before sending bfd packet")
1574 self.test_session.send_packet()
1575 p = wait_for_bfd_packet(
1576 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1577 self.assert_equal(p[BFD].required_min_rx_interval,
1578 self.vpp_session.required_min_rx,
1579 "BFD required min rx interval")
1580 self.test_session.send_packet()
1581 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1583 # should be turned on - loopback echo packets
1584 for dummy in range(3):
1585 loop_until = time.time() + 0.75 * detection_time
1586 while time.time() < loop_until:
1587 p = self.pg0.wait_for_packet(1)
1588 self.logger.debug(ppp("Got packet:", p))
1589 if p[UDP].dport == BFD.udp_dport_echo:
1591 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1592 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1593 "BFD ECHO src IP equal to loopback IP")
1594 self.logger.debug(ppp("Looping back packet:", p))
1595 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1596 "ECHO packet destination MAC address")
1597 p[Ether].dst = self.pg0.local_mac
1598 self.pg0.add_stream(p)
1601 elif p.haslayer(BFD):
1603 self.assertGreaterEqual(
1604 p[BFD].required_min_rx_interval,
1606 if "P" in p.sprintf("%BFD.flags%"):
1607 final = self.test_session.create_packet()
1608 final[BFD].flags = "F"
1609 self.test_session.send_packet(final)
1611 raise Exception(ppp("Received unknown packet:", p))
1613 self.assert_equal(len(self.vapi.collect_events()), 0,
1614 "number of bfd events")
1615 self.test_session.send_packet()
1616 self.assertTrue(echo_seen, "No echo packets received")
1618 def test_intf_deleted(self):
1619 """ interface with bfd session deleted """
1620 intf = VppLoInterface(self)
1623 sw_if_index = intf.sw_if_index
1624 vpp_session = VppBFDUDPSession(
1625 self, intf, intf.remote_ip6, af=AF_INET6)
1626 vpp_session.add_vpp_config()
1627 vpp_session.admin_up()
1628 intf.remove_vpp_config()
1629 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1630 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1631 self.assertFalse(vpp_session.query_vpp_config())
1634 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1635 class BFDFIBTestCase(VppTestCase):
1636 """ BFD-FIB interactions (IPv6) """
1642 super(BFDFIBTestCase, self).setUp()
1643 self.create_pg_interfaces(range(1))
1645 self.vapi.want_bfd_events()
1646 self.pg0.enable_capture()
1648 for i in self.pg_interfaces:
1651 i.configure_ipv6_neighbors()
1654 if not self.vpp_dead:
1655 self.vapi.want_bfd_events(enable_disable=0)
1657 super(BFDFIBTestCase, self).tearDown()
1660 def pkt_is_not_data_traffic(p):
1661 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1662 if p.haslayer(BFD) or is_ipv6_misc(p):
1666 def test_session_with_fib(self):
1667 """ BFD-FIB interactions """
1669 # packets to match against both of the routes
1670 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1671 IPv6(src="3001::1", dst="2001::1") /
1672 UDP(sport=1234, dport=1234) /
1674 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1675 IPv6(src="3001::1", dst="2002::1") /
1676 UDP(sport=1234, dport=1234) /
1679 # A recursive and a non-recursive route via a next-hop that
1680 # will have a BFD session
1681 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1682 [VppRoutePath(self.pg0.remote_ip6,
1683 self.pg0.sw_if_index,
1684 proto=DpoProto.DPO_PROTO_IP6)],
1686 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1687 [VppRoutePath(self.pg0.remote_ip6,
1689 proto=DpoProto.DPO_PROTO_IP6)],
1691 ip_2001_s_64.add_vpp_config()
1692 ip_2002_s_64.add_vpp_config()
1694 # bring the session up now the routes are present
1695 self.vpp_session = VppBFDUDPSession(self,
1697 self.pg0.remote_ip6,
1699 self.vpp_session.add_vpp_config()
1700 self.vpp_session.admin_up()
1701 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1703 # session is up - traffic passes
1704 bfd_session_up(self)
1706 self.pg0.add_stream(p)
1709 captured = self.pg0.wait_for_packet(
1711 filter_out_fn=self.pkt_is_not_data_traffic)
1712 self.assertEqual(captured[IPv6].dst,
1715 # session is up - traffic is dropped
1716 bfd_session_down(self)
1718 self.pg0.add_stream(p)
1720 with self.assertRaises(CaptureTimeoutError):
1721 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1723 # session is up - traffic passes
1724 bfd_session_up(self)
1726 self.pg0.add_stream(p)
1729 captured = self.pg0.wait_for_packet(
1731 filter_out_fn=self.pkt_is_not_data_traffic)
1732 self.assertEqual(captured[IPv6].dst,
1736 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1737 class BFDSHA1TestCase(VppTestCase):
1738 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1741 vpp_clock_offset = None
1746 def setUpClass(cls):
1747 super(BFDSHA1TestCase, cls).setUpClass()
1748 cls.vapi.cli("set log class bfd level debug")
1750 cls.create_pg_interfaces([0])
1751 cls.pg0.config_ip4()
1753 cls.pg0.resolve_arp()
1756 super(BFDSHA1TestCase, cls).tearDownClass()
1760 super(BFDSHA1TestCase, self).setUp()
1761 self.factory = AuthKeyFactory()
1762 self.vapi.want_bfd_events()
1763 self.pg0.enable_capture()
1766 if not self.vpp_dead:
1767 self.vapi.want_bfd_events(enable_disable=0)
1768 self.vapi.collect_events() # clear the event queue
1769 super(BFDSHA1TestCase, self).tearDown()
1771 def test_session_up(self):
1772 """ bring BFD session up """
1773 key = self.factory.create_random_key(self)
1774 key.add_vpp_config()
1775 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1776 self.pg0.remote_ip4,
1778 self.vpp_session.add_vpp_config()
1779 self.vpp_session.admin_up()
1780 self.test_session = BFDTestSession(
1781 self, self.pg0, AF_INET, sha1_key=key,
1782 bfd_key_id=self.vpp_session.bfd_key_id)
1783 bfd_session_up(self)
1785 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1786 def test_hold_up(self):
1787 """ hold BFD session up """
1788 key = self.factory.create_random_key(self)
1789 key.add_vpp_config()
1790 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1791 self.pg0.remote_ip4,
1793 self.vpp_session.add_vpp_config()
1794 self.vpp_session.admin_up()
1795 self.test_session = BFDTestSession(
1796 self, self.pg0, AF_INET, sha1_key=key,
1797 bfd_key_id=self.vpp_session.bfd_key_id)
1798 bfd_session_up(self)
1799 for dummy in range(self.test_session.detect_mult * 2):
1800 wait_for_bfd_packet(self)
1801 self.test_session.send_packet()
1802 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1804 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1805 def test_hold_up_meticulous(self):
1806 """ hold BFD session up - meticulous auth """
1807 key = self.factory.create_random_key(
1808 self, BFDAuthType.meticulous_keyed_sha1)
1809 key.add_vpp_config()
1810 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1811 self.pg0.remote_ip4, sha1_key=key)
1812 self.vpp_session.add_vpp_config()
1813 self.vpp_session.admin_up()
1814 # specify sequence number so that it wraps
1815 self.test_session = BFDTestSession(
1816 self, self.pg0, AF_INET, sha1_key=key,
1817 bfd_key_id=self.vpp_session.bfd_key_id,
1818 our_seq_number=0xFFFFFFFF - 4)
1819 bfd_session_up(self)
1820 for dummy in range(30):
1821 wait_for_bfd_packet(self)
1822 self.test_session.inc_seq_num()
1823 self.test_session.send_packet()
1824 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1826 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1827 def test_send_bad_seq_number(self):
1828 """ session is not kept alive by msgs with bad sequence numbers"""
1829 key = self.factory.create_random_key(
1830 self, BFDAuthType.meticulous_keyed_sha1)
1831 key.add_vpp_config()
1832 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1833 self.pg0.remote_ip4, sha1_key=key)
1834 self.vpp_session.add_vpp_config()
1835 self.test_session = BFDTestSession(
1836 self, self.pg0, AF_INET, sha1_key=key,
1837 bfd_key_id=self.vpp_session.bfd_key_id)
1838 bfd_session_up(self)
1839 detection_time = self.test_session.detect_mult *\
1840 self.vpp_session.required_min_rx / USEC_IN_SEC
1841 send_until = time.time() + 2 * detection_time
1842 while time.time() < send_until:
1843 self.test_session.send_packet()
1844 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1845 "time between bfd packets")
1846 e = self.vapi.collect_events()
1847 # session should be down now, because the sequence numbers weren't
1849 self.assert_equal(len(e), 1, "number of bfd events")
1850 verify_event(self, e[0], expected_state=BFDState.down)
1852 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1853 legitimate_test_session,
1855 rogue_bfd_values=None):
1856 """ execute a rogue session interaction scenario
1858 1. create vpp session, add config
1859 2. bring the legitimate session up
1860 3. copy the bfd values from legitimate session to rogue session
1861 4. apply rogue_bfd_values to rogue session
1862 5. set rogue session state to down
1863 6. send message to take the session down from the rogue session
1864 7. assert that the legitimate session is unaffected
1867 self.vpp_session = vpp_bfd_udp_session
1868 self.vpp_session.add_vpp_config()
1869 self.test_session = legitimate_test_session
1870 # bring vpp session up
1871 bfd_session_up(self)
1872 # send packet from rogue session
1873 rogue_test_session.update(
1874 my_discriminator=self.test_session.my_discriminator,
1875 your_discriminator=self.test_session.your_discriminator,
1876 desired_min_tx=self.test_session.desired_min_tx,
1877 required_min_rx=self.test_session.required_min_rx,
1878 detect_mult=self.test_session.detect_mult,
1879 diag=self.test_session.diag,
1880 state=self.test_session.state,
1881 auth_type=self.test_session.auth_type)
1882 if rogue_bfd_values:
1883 rogue_test_session.update(**rogue_bfd_values)
1884 rogue_test_session.update(state=BFDState.down)
1885 rogue_test_session.send_packet()
1886 wait_for_bfd_packet(self)
1887 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1889 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1890 def test_mismatch_auth(self):
1891 """ session is not brought down by unauthenticated msg """
1892 key = self.factory.create_random_key(self)
1893 key.add_vpp_config()
1894 vpp_session = VppBFDUDPSession(
1895 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1896 legitimate_test_session = BFDTestSession(
1897 self, self.pg0, AF_INET, sha1_key=key,
1898 bfd_key_id=vpp_session.bfd_key_id)
1899 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1900 self.execute_rogue_session_scenario(vpp_session,
1901 legitimate_test_session,
1904 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1905 def test_mismatch_bfd_key_id(self):
1906 """ session is not brought down by msg with non-existent key-id """
1907 key = self.factory.create_random_key(self)
1908 key.add_vpp_config()
1909 vpp_session = VppBFDUDPSession(
1910 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1911 # pick a different random bfd key id
1913 while x == vpp_session.bfd_key_id:
1915 legitimate_test_session = BFDTestSession(
1916 self, self.pg0, AF_INET, sha1_key=key,
1917 bfd_key_id=vpp_session.bfd_key_id)
1918 rogue_test_session = BFDTestSession(
1919 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1920 self.execute_rogue_session_scenario(vpp_session,
1921 legitimate_test_session,
1924 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1925 def test_mismatched_auth_type(self):
1926 """ session is not brought down by msg with wrong auth type """
1927 key = self.factory.create_random_key(self)
1928 key.add_vpp_config()
1929 vpp_session = VppBFDUDPSession(
1930 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1931 legitimate_test_session = BFDTestSession(
1932 self, self.pg0, AF_INET, sha1_key=key,
1933 bfd_key_id=vpp_session.bfd_key_id)
1934 rogue_test_session = BFDTestSession(
1935 self, self.pg0, AF_INET, sha1_key=key,
1936 bfd_key_id=vpp_session.bfd_key_id)
1937 self.execute_rogue_session_scenario(
1938 vpp_session, legitimate_test_session, rogue_test_session,
1939 {'auth_type': BFDAuthType.keyed_md5})
1941 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1942 def test_restart(self):
1943 """ simulate remote peer restart and resynchronization """
1944 key = self.factory.create_random_key(
1945 self, BFDAuthType.meticulous_keyed_sha1)
1946 key.add_vpp_config()
1947 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1948 self.pg0.remote_ip4, sha1_key=key)
1949 self.vpp_session.add_vpp_config()
1950 self.test_session = BFDTestSession(
1951 self, self.pg0, AF_INET, sha1_key=key,
1952 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1953 bfd_session_up(self)
1954 # don't send any packets for 2*detection_time
1955 detection_time = self.test_session.detect_mult *\
1956 self.vpp_session.required_min_rx / USEC_IN_SEC
1957 self.sleep(2 * detection_time, "simulating peer restart")
1958 events = self.vapi.collect_events()
1959 self.assert_equal(len(events), 1, "number of bfd events")
1960 verify_event(self, events[0], expected_state=BFDState.down)
1961 self.test_session.update(state=BFDState.down)
1962 # reset sequence number
1963 self.test_session.our_seq_number = 0
1964 self.test_session.vpp_seq_number = None
1965 # now throw away any pending packets
1966 self.pg0.enable_capture()
1967 bfd_session_up(self)
1970 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1971 class BFDAuthOnOffTestCase(VppTestCase):
1972 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1979 def setUpClass(cls):
1980 super(BFDAuthOnOffTestCase, cls).setUpClass()
1981 cls.vapi.cli("set log class bfd level debug")
1983 cls.create_pg_interfaces([0])
1984 cls.pg0.config_ip4()
1986 cls.pg0.resolve_arp()
1989 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1993 super(BFDAuthOnOffTestCase, self).setUp()
1994 self.factory = AuthKeyFactory()
1995 self.vapi.want_bfd_events()
1996 self.pg0.enable_capture()
1999 if not self.vpp_dead:
2000 self.vapi.want_bfd_events(enable_disable=0)
2001 self.vapi.collect_events() # clear the event queue
2002 super(BFDAuthOnOffTestCase, self).tearDown()
2004 def test_auth_on_immediate(self):
2005 """ turn auth on without disturbing session state (immediate) """
2006 key = self.factory.create_random_key(self)
2007 key.add_vpp_config()
2008 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2009 self.pg0.remote_ip4)
2010 self.vpp_session.add_vpp_config()
2011 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2012 bfd_session_up(self)
2013 for dummy in range(self.test_session.detect_mult * 2):
2014 p = wait_for_bfd_packet(self)
2015 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2016 self.test_session.send_packet()
2017 self.vpp_session.activate_auth(key)
2018 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2019 self.test_session.sha1_key = key
2020 for dummy in range(self.test_session.detect_mult * 2):
2021 p = wait_for_bfd_packet(self)
2022 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2023 self.test_session.send_packet()
2024 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2025 self.assert_equal(len(self.vapi.collect_events()), 0,
2026 "number of bfd events")
2028 def test_auth_off_immediate(self):
2029 """ turn auth off without disturbing session state (immediate) """
2030 key = self.factory.create_random_key(self)
2031 key.add_vpp_config()
2032 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2033 self.pg0.remote_ip4, sha1_key=key)
2034 self.vpp_session.add_vpp_config()
2035 self.test_session = BFDTestSession(
2036 self, self.pg0, AF_INET, sha1_key=key,
2037 bfd_key_id=self.vpp_session.bfd_key_id)
2038 bfd_session_up(self)
2039 # self.vapi.want_bfd_events(enable_disable=0)
2040 for dummy in range(self.test_session.detect_mult * 2):
2041 p = wait_for_bfd_packet(self)
2042 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2043 self.test_session.inc_seq_num()
2044 self.test_session.send_packet()
2045 self.vpp_session.deactivate_auth()
2046 self.test_session.bfd_key_id = None
2047 self.test_session.sha1_key = None
2048 for dummy in range(self.test_session.detect_mult * 2):
2049 p = wait_for_bfd_packet(self)
2050 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2051 self.test_session.inc_seq_num()
2052 self.test_session.send_packet()
2053 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2054 self.assert_equal(len(self.vapi.collect_events()), 0,
2055 "number of bfd events")
2057 def test_auth_change_key_immediate(self):
2058 """ change auth key without disturbing session state (immediate) """
2059 key1 = self.factory.create_random_key(self)
2060 key1.add_vpp_config()
2061 key2 = self.factory.create_random_key(self)
2062 key2.add_vpp_config()
2063 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2064 self.pg0.remote_ip4, sha1_key=key1)
2065 self.vpp_session.add_vpp_config()
2066 self.test_session = BFDTestSession(
2067 self, self.pg0, AF_INET, sha1_key=key1,
2068 bfd_key_id=self.vpp_session.bfd_key_id)
2069 bfd_session_up(self)
2070 for dummy in range(self.test_session.detect_mult * 2):
2071 p = wait_for_bfd_packet(self)
2072 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2073 self.test_session.send_packet()
2074 self.vpp_session.activate_auth(key2)
2075 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2076 self.test_session.sha1_key = key2
2077 for dummy in range(self.test_session.detect_mult * 2):
2078 p = wait_for_bfd_packet(self)
2079 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2080 self.test_session.send_packet()
2081 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2082 self.assert_equal(len(self.vapi.collect_events()), 0,
2083 "number of bfd events")
2085 def test_auth_on_delayed(self):
2086 """ turn auth on without disturbing session state (delayed) """
2087 key = self.factory.create_random_key(self)
2088 key.add_vpp_config()
2089 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2090 self.pg0.remote_ip4)
2091 self.vpp_session.add_vpp_config()
2092 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2093 bfd_session_up(self)
2094 for dummy in range(self.test_session.detect_mult * 2):
2095 wait_for_bfd_packet(self)
2096 self.test_session.send_packet()
2097 self.vpp_session.activate_auth(key, delayed=True)
2098 for dummy in range(self.test_session.detect_mult * 2):
2099 p = wait_for_bfd_packet(self)
2100 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2101 self.test_session.send_packet()
2102 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2103 self.test_session.sha1_key = key
2104 self.test_session.send_packet()
2105 for dummy in range(self.test_session.detect_mult * 2):
2106 p = wait_for_bfd_packet(self)
2107 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2108 self.test_session.send_packet()
2109 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2110 self.assert_equal(len(self.vapi.collect_events()), 0,
2111 "number of bfd events")
2113 def test_auth_off_delayed(self):
2114 """ turn auth off without disturbing session state (delayed) """
2115 key = self.factory.create_random_key(self)
2116 key.add_vpp_config()
2117 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2118 self.pg0.remote_ip4, sha1_key=key)
2119 self.vpp_session.add_vpp_config()
2120 self.test_session = BFDTestSession(
2121 self, self.pg0, AF_INET, sha1_key=key,
2122 bfd_key_id=self.vpp_session.bfd_key_id)
2123 bfd_session_up(self)
2124 for dummy in range(self.test_session.detect_mult * 2):
2125 p = wait_for_bfd_packet(self)
2126 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2127 self.test_session.send_packet()
2128 self.vpp_session.deactivate_auth(delayed=True)
2129 for dummy in range(self.test_session.detect_mult * 2):
2130 p = wait_for_bfd_packet(self)
2131 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2132 self.test_session.send_packet()
2133 self.test_session.bfd_key_id = None
2134 self.test_session.sha1_key = None
2135 self.test_session.send_packet()
2136 for dummy in range(self.test_session.detect_mult * 2):
2137 p = wait_for_bfd_packet(self)
2138 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2139 self.test_session.send_packet()
2140 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2141 self.assert_equal(len(self.vapi.collect_events()), 0,
2142 "number of bfd events")
2144 def test_auth_change_key_delayed(self):
2145 """ change auth key without disturbing session state (delayed) """
2146 key1 = self.factory.create_random_key(self)
2147 key1.add_vpp_config()
2148 key2 = self.factory.create_random_key(self)
2149 key2.add_vpp_config()
2150 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2151 self.pg0.remote_ip4, sha1_key=key1)
2152 self.vpp_session.add_vpp_config()
2153 self.vpp_session.admin_up()
2154 self.test_session = BFDTestSession(
2155 self, self.pg0, AF_INET, sha1_key=key1,
2156 bfd_key_id=self.vpp_session.bfd_key_id)
2157 bfd_session_up(self)
2158 for dummy in range(self.test_session.detect_mult * 2):
2159 p = wait_for_bfd_packet(self)
2160 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2161 self.test_session.send_packet()
2162 self.vpp_session.activate_auth(key2, delayed=True)
2163 for dummy in range(self.test_session.detect_mult * 2):
2164 p = wait_for_bfd_packet(self)
2165 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2166 self.test_session.send_packet()
2167 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2168 self.test_session.sha1_key = key2
2169 self.test_session.send_packet()
2170 for dummy in range(self.test_session.detect_mult * 2):
2171 p = wait_for_bfd_packet(self)
2172 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2173 self.test_session.send_packet()
2174 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2175 self.assert_equal(len(self.vapi.collect_events()), 0,
2176 "number of bfd events")
2179 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2180 class BFDCLITestCase(VppTestCase):
2181 """Bidirectional Forwarding Detection (BFD) (CLI) """
2185 def setUpClass(cls):
2186 super(BFDCLITestCase, cls).setUpClass()
2187 cls.vapi.cli("set log class bfd level debug")
2189 cls.create_pg_interfaces((0,))
2190 cls.pg0.config_ip4()
2191 cls.pg0.config_ip6()
2192 cls.pg0.resolve_arp()
2193 cls.pg0.resolve_ndp()
2196 super(BFDCLITestCase, cls).tearDownClass()
2200 super(BFDCLITestCase, self).setUp()
2201 self.factory = AuthKeyFactory()
2202 self.pg0.enable_capture()
2206 self.vapi.want_bfd_events(enable_disable=0)
2207 except UnexpectedApiReturnValueError:
2208 # some tests aren't subscribed, so this is not an issue
2210 self.vapi.collect_events() # clear the event queue
2211 super(BFDCLITestCase, self).tearDown()
2213 def cli_verify_no_response(self, cli):
2214 """ execute a CLI, asserting that the response is empty """
2215 self.assert_equal(self.vapi.cli(cli),
2217 "CLI command response")
2219 def cli_verify_response(self, cli, expected):
2220 """ execute a CLI, asserting that the response matches expectation """
2221 self.assert_equal(self.vapi.cli(cli).strip(),
2223 "CLI command response")
2225 def test_show(self):
2226 """ show commands """
2227 k1 = self.factory.create_random_key(self)
2229 k2 = self.factory.create_random_key(
2230 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2232 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2234 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2237 self.logger.info(self.vapi.ppcli("show bfd keys"))
2238 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2239 self.logger.info(self.vapi.ppcli("show bfd"))
2241 def test_set_del_sha1_key(self):
2242 """ set/delete SHA1 auth key """
2243 k = self.factory.create_random_key(self)
2244 self.registry.register(k, self.logger)
2245 self.cli_verify_no_response(
2246 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2248 "".join("{:02x}".format(ord(c)) for c in k.key)))
2249 self.assertTrue(k.query_vpp_config())
2250 self.vpp_session = VppBFDUDPSession(
2251 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2252 self.vpp_session.add_vpp_config()
2253 self.test_session = \
2254 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2255 bfd_key_id=self.vpp_session.bfd_key_id)
2256 self.vapi.want_bfd_events()
2257 bfd_session_up(self)
2258 bfd_session_down(self)
2259 # try to replace the secret for the key - should fail because the key
2261 k2 = self.factory.create_random_key(self)
2262 self.cli_verify_response(
2263 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2265 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2266 "bfd key set: `bfd_auth_set_key' API call failed, "
2267 "rv=-103:BFD object in use")
2268 # manipulating the session using old secret should still work
2269 bfd_session_up(self)
2270 bfd_session_down(self)
2271 self.vpp_session.remove_vpp_config()
2272 self.cli_verify_no_response(
2273 "bfd key del conf-key-id %s" % k.conf_key_id)
2274 self.assertFalse(k.query_vpp_config())
2276 def test_set_del_meticulous_sha1_key(self):
2277 """ set/delete meticulous SHA1 auth key """
2278 k = self.factory.create_random_key(
2279 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2280 self.registry.register(k, self.logger)
2281 self.cli_verify_no_response(
2282 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2284 "".join("{:02x}".format(ord(c)) for c in k.key)))
2285 self.assertTrue(k.query_vpp_config())
2286 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2287 self.pg0.remote_ip6, af=AF_INET6,
2289 self.vpp_session.add_vpp_config()
2290 self.vpp_session.admin_up()
2291 self.test_session = \
2292 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2293 bfd_key_id=self.vpp_session.bfd_key_id)
2294 self.vapi.want_bfd_events()
2295 bfd_session_up(self)
2296 bfd_session_down(self)
2297 # try to replace the secret for the key - should fail because the key
2299 k2 = self.factory.create_random_key(self)
2300 self.cli_verify_response(
2301 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2303 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2304 "bfd key set: `bfd_auth_set_key' API call failed, "
2305 "rv=-103:BFD object in use")
2306 # manipulating the session using old secret should still work
2307 bfd_session_up(self)
2308 bfd_session_down(self)
2309 self.vpp_session.remove_vpp_config()
2310 self.cli_verify_no_response(
2311 "bfd key del conf-key-id %s" % k.conf_key_id)
2312 self.assertFalse(k.query_vpp_config())
2314 def test_add_mod_del_bfd_udp(self):
2315 """ create/modify/delete IPv4 BFD UDP session """
2316 vpp_session = VppBFDUDPSession(
2317 self, self.pg0, self.pg0.remote_ip4)
2318 self.registry.register(vpp_session, self.logger)
2319 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2320 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2321 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2322 self.pg0.remote_ip4,
2323 vpp_session.desired_min_tx,
2324 vpp_session.required_min_rx,
2325 vpp_session.detect_mult)
2326 self.cli_verify_no_response(cli_add_cmd)
2327 # 2nd add should fail
2328 self.cli_verify_response(
2330 "bfd udp session add: `bfd_add_add_session' API call"
2331 " failed, rv=-101:Duplicate BFD object")
2332 verify_bfd_session_config(self, vpp_session)
2333 mod_session = VppBFDUDPSession(
2334 self, self.pg0, self.pg0.remote_ip4,
2335 required_min_rx=2 * vpp_session.required_min_rx,
2336 desired_min_tx=3 * vpp_session.desired_min_tx,
2337 detect_mult=4 * vpp_session.detect_mult)
2338 self.cli_verify_no_response(
2339 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2340 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2341 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2342 mod_session.desired_min_tx, mod_session.required_min_rx,
2343 mod_session.detect_mult))
2344 verify_bfd_session_config(self, mod_session)
2345 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2346 "peer-addr %s" % (self.pg0.name,
2347 self.pg0.local_ip4, self.pg0.remote_ip4)
2348 self.cli_verify_no_response(cli_del_cmd)
2349 # 2nd del is expected to fail
2350 self.cli_verify_response(
2351 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2352 " failed, rv=-102:No such BFD object")
2353 self.assertFalse(vpp_session.query_vpp_config())
2355 def test_add_mod_del_bfd_udp6(self):
2356 """ create/modify/delete IPv6 BFD UDP session """
2357 vpp_session = VppBFDUDPSession(
2358 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2359 self.registry.register(vpp_session, self.logger)
2360 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2361 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2362 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2363 self.pg0.remote_ip6,
2364 vpp_session.desired_min_tx,
2365 vpp_session.required_min_rx,
2366 vpp_session.detect_mult)
2367 self.cli_verify_no_response(cli_add_cmd)
2368 # 2nd add should fail
2369 self.cli_verify_response(
2371 "bfd udp session add: `bfd_add_add_session' API call"
2372 " failed, rv=-101:Duplicate BFD object")
2373 verify_bfd_session_config(self, vpp_session)
2374 mod_session = VppBFDUDPSession(
2375 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2376 required_min_rx=2 * vpp_session.required_min_rx,
2377 desired_min_tx=3 * vpp_session.desired_min_tx,
2378 detect_mult=4 * vpp_session.detect_mult)
2379 self.cli_verify_no_response(
2380 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2381 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2382 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2383 mod_session.desired_min_tx,
2384 mod_session.required_min_rx, mod_session.detect_mult))
2385 verify_bfd_session_config(self, mod_session)
2386 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2387 "peer-addr %s" % (self.pg0.name,
2388 self.pg0.local_ip6, self.pg0.remote_ip6)
2389 self.cli_verify_no_response(cli_del_cmd)
2390 # 2nd del is expected to fail
2391 self.cli_verify_response(
2393 "bfd udp session del: `bfd_udp_del_session' API call"
2394 " failed, rv=-102:No such BFD object")
2395 self.assertFalse(vpp_session.query_vpp_config())
2397 def test_add_mod_del_bfd_udp_auth(self):
2398 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2399 key = self.factory.create_random_key(self)
2400 key.add_vpp_config()
2401 vpp_session = VppBFDUDPSession(
2402 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2403 self.registry.register(vpp_session, self.logger)
2404 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2405 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2406 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2407 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2408 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2409 vpp_session.detect_mult, key.conf_key_id,
2410 vpp_session.bfd_key_id)
2411 self.cli_verify_no_response(cli_add_cmd)
2412 # 2nd add should fail
2413 self.cli_verify_response(
2415 "bfd udp session add: `bfd_add_add_session' API call"
2416 " failed, rv=-101:Duplicate BFD object")
2417 verify_bfd_session_config(self, vpp_session)
2418 mod_session = VppBFDUDPSession(
2419 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2420 bfd_key_id=vpp_session.bfd_key_id,
2421 required_min_rx=2 * vpp_session.required_min_rx,
2422 desired_min_tx=3 * vpp_session.desired_min_tx,
2423 detect_mult=4 * vpp_session.detect_mult)
2424 self.cli_verify_no_response(
2425 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2426 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2427 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2428 mod_session.desired_min_tx,
2429 mod_session.required_min_rx, mod_session.detect_mult))
2430 verify_bfd_session_config(self, mod_session)
2431 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2432 "peer-addr %s" % (self.pg0.name,
2433 self.pg0.local_ip4, self.pg0.remote_ip4)
2434 self.cli_verify_no_response(cli_del_cmd)
2435 # 2nd del is expected to fail
2436 self.cli_verify_response(
2438 "bfd udp session del: `bfd_udp_del_session' API call"
2439 " failed, rv=-102:No such BFD object")
2440 self.assertFalse(vpp_session.query_vpp_config())
2442 def test_add_mod_del_bfd_udp6_auth(self):
2443 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2444 key = self.factory.create_random_key(
2445 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2446 key.add_vpp_config()
2447 vpp_session = VppBFDUDPSession(
2448 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2449 self.registry.register(vpp_session, self.logger)
2450 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2451 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2452 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2453 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2454 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2455 vpp_session.detect_mult, key.conf_key_id,
2456 vpp_session.bfd_key_id)
2457 self.cli_verify_no_response(cli_add_cmd)
2458 # 2nd add should fail
2459 self.cli_verify_response(
2461 "bfd udp session add: `bfd_add_add_session' API call"
2462 " failed, rv=-101:Duplicate BFD object")
2463 verify_bfd_session_config(self, vpp_session)
2464 mod_session = VppBFDUDPSession(
2465 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2466 bfd_key_id=vpp_session.bfd_key_id,
2467 required_min_rx=2 * vpp_session.required_min_rx,
2468 desired_min_tx=3 * vpp_session.desired_min_tx,
2469 detect_mult=4 * vpp_session.detect_mult)
2470 self.cli_verify_no_response(
2471 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2472 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2473 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2474 mod_session.desired_min_tx,
2475 mod_session.required_min_rx, mod_session.detect_mult))
2476 verify_bfd_session_config(self, mod_session)
2477 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2478 "peer-addr %s" % (self.pg0.name,
2479 self.pg0.local_ip6, self.pg0.remote_ip6)
2480 self.cli_verify_no_response(cli_del_cmd)
2481 # 2nd del is expected to fail
2482 self.cli_verify_response(
2484 "bfd udp session del: `bfd_udp_del_session' API call"
2485 " failed, rv=-102:No such BFD object")
2486 self.assertFalse(vpp_session.query_vpp_config())
2488 def test_auth_on_off(self):
2489 """ turn authentication on and off """
2490 key = self.factory.create_random_key(
2491 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2492 key.add_vpp_config()
2493 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2494 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2496 session.add_vpp_config()
2498 "bfd udp session auth activate interface %s local-addr %s "\
2499 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2500 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2501 key.conf_key_id, auth_session.bfd_key_id)
2502 self.cli_verify_no_response(cli_activate)
2503 verify_bfd_session_config(self, auth_session)
2504 self.cli_verify_no_response(cli_activate)
2505 verify_bfd_session_config(self, auth_session)
2507 "bfd udp session auth deactivate interface %s local-addr %s "\
2509 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2510 self.cli_verify_no_response(cli_deactivate)
2511 verify_bfd_session_config(self, session)
2512 self.cli_verify_no_response(cli_deactivate)
2513 verify_bfd_session_config(self, session)
2515 def test_auth_on_off_delayed(self):
2516 """ turn authentication on and off (delayed) """
2517 key = self.factory.create_random_key(
2518 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2519 key.add_vpp_config()
2520 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2521 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2523 session.add_vpp_config()
2525 "bfd udp session auth activate interface %s local-addr %s "\
2526 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2527 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2528 key.conf_key_id, auth_session.bfd_key_id)
2529 self.cli_verify_no_response(cli_activate)
2530 verify_bfd_session_config(self, auth_session)
2531 self.cli_verify_no_response(cli_activate)
2532 verify_bfd_session_config(self, auth_session)
2534 "bfd udp session auth deactivate interface %s local-addr %s "\
2535 "peer-addr %s delayed yes"\
2536 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2537 self.cli_verify_no_response(cli_deactivate)
2538 verify_bfd_session_config(self, session)
2539 self.cli_verify_no_response(cli_deactivate)
2540 verify_bfd_session_config(self, session)
2542 def test_admin_up_down(self):
2543 """ put session admin-up and admin-down """
2544 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2545 session.add_vpp_config()
2547 "bfd udp session set-flags admin down interface %s local-addr %s "\
2549 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2551 "bfd udp session set-flags admin up interface %s local-addr %s "\
2553 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2554 self.cli_verify_no_response(cli_down)
2555 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2556 self.cli_verify_no_response(cli_up)
2557 verify_bfd_session_config(self, session, state=BFDState.down)
2559 def test_set_del_udp_echo_source(self):
2560 """ set/del udp echo source """
2561 self.create_loopback_interfaces(1)
2562 self.loopback0 = self.lo_interfaces[0]
2563 self.loopback0.admin_up()
2564 self.cli_verify_response("show bfd echo-source",
2565 "UDP echo source is not set.")
2566 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2567 self.cli_verify_no_response(cli_set)
2568 self.cli_verify_response("show bfd echo-source",
2569 "UDP echo source is: %s\n"
2570 "IPv4 address usable as echo source: none\n"
2571 "IPv6 address usable as echo source: none" %
2572 self.loopback0.name)
2573 self.loopback0.config_ip4()
2574 unpacked = unpack("!L", self.loopback0.local_ip4n)
2575 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2576 self.cli_verify_response("show bfd echo-source",
2577 "UDP echo source is: %s\n"
2578 "IPv4 address usable as echo source: %s\n"
2579 "IPv6 address usable as echo source: none" %
2580 (self.loopback0.name, echo_ip4))
2581 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2582 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2583 unpacked[2], unpacked[3] ^ 1))
2584 self.loopback0.config_ip6()
2585 self.cli_verify_response("show bfd echo-source",
2586 "UDP echo source is: %s\n"
2587 "IPv4 address usable as echo source: %s\n"
2588 "IPv6 address usable as echo source: %s" %
2589 (self.loopback0.name, echo_ip4, echo_ip6))
2590 cli_del = "bfd udp echo-source del"
2591 self.cli_verify_no_response(cli_del)
2592 self.cli_verify_response("show bfd echo-source",
2593 "UDP echo source is not set.")
2595 if __name__ == '__main__':
2596 unittest.main(testRunner=VppTestRunner)