4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22 from vpp_ip_route import VppIpRoute, VppRoutePath
27 class AuthKeyFactory(object):
28 """Factory class for creating auth keys with unique conf key ID"""
31 self._conf_key_ids = {}
33 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
34 """ create a random key with unique conf key id """
35 conf_key_id = randint(0, 0xFFFFFFFF)
36 while conf_key_id in self._conf_key_ids:
37 conf_key_id = randint(0, 0xFFFFFFFF)
38 self._conf_key_ids[conf_key_id] = 1
39 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
40 return VppBFDAuthKey(test=test, auth_type=auth_type,
41 conf_key_id=conf_key_id, key=key)
44 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
45 class BFDAPITestCase(VppTestCase):
46 """Bidirectional Forwarding Detection (BFD) - API"""
53 super(BFDAPITestCase, cls).setUpClass()
56 cls.create_pg_interfaces(range(2))
57 for i in cls.pg_interfaces:
63 super(BFDAPITestCase, cls).tearDownClass()
67 super(BFDAPITestCase, self).setUp()
68 self.factory = AuthKeyFactory()
70 def test_add_bfd(self):
71 """ create a BFD session """
72 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
73 session.add_vpp_config()
74 self.logger.debug("Session state is %s", session.state)
75 session.remove_vpp_config()
76 session.add_vpp_config()
77 self.logger.debug("Session state is %s", session.state)
78 session.remove_vpp_config()
80 def test_double_add(self):
81 """ create the same BFD session twice (negative case) """
82 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
83 session.add_vpp_config()
85 with self.vapi.expect_negative_api_retval():
86 session.add_vpp_config()
88 session.remove_vpp_config()
90 def test_add_bfd6(self):
91 """ create IPv6 BFD session """
92 session = VppBFDUDPSession(
93 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
94 session.add_vpp_config()
95 self.logger.debug("Session state is %s", session.state)
96 session.remove_vpp_config()
97 session.add_vpp_config()
98 self.logger.debug("Session state is %s", session.state)
99 session.remove_vpp_config()
101 def test_mod_bfd(self):
102 """ modify BFD session parameters """
103 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
104 desired_min_tx=50000,
105 required_min_rx=10000,
107 session.add_vpp_config()
108 s = session.get_bfd_udp_session_dump_entry()
109 self.assert_equal(session.desired_min_tx,
111 "desired min transmit interval")
112 self.assert_equal(session.required_min_rx,
114 "required min receive interval")
115 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
116 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
117 required_min_rx=session.required_min_rx * 2,
118 detect_mult=session.detect_mult * 2)
119 s = session.get_bfd_udp_session_dump_entry()
120 self.assert_equal(session.desired_min_tx,
122 "desired min transmit interval")
123 self.assert_equal(session.required_min_rx,
125 "required min receive interval")
126 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
128 def test_add_sha1_keys(self):
129 """ add SHA1 keys """
131 keys = [self.factory.create_random_key(
132 self) for i in range(0, key_count)]
134 self.assertFalse(key.query_vpp_config())
138 self.assertTrue(key.query_vpp_config())
140 indexes = range(key_count)
145 key.remove_vpp_config()
147 for j in range(key_count):
150 self.assertFalse(key.query_vpp_config())
152 self.assertTrue(key.query_vpp_config())
153 # should be removed now
155 self.assertFalse(key.query_vpp_config())
156 # add back and remove again
160 self.assertTrue(key.query_vpp_config())
162 key.remove_vpp_config()
164 self.assertFalse(key.query_vpp_config())
166 def test_add_bfd_sha1(self):
167 """ create a BFD session (SHA1) """
168 key = self.factory.create_random_key(self)
170 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
172 session.add_vpp_config()
173 self.logger.debug("Session state is %s", session.state)
174 session.remove_vpp_config()
175 session.add_vpp_config()
176 self.logger.debug("Session state is %s", session.state)
177 session.remove_vpp_config()
179 def test_double_add_sha1(self):
180 """ create the same BFD session twice (negative case) (SHA1) """
181 key = self.factory.create_random_key(self)
183 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185 session.add_vpp_config()
186 with self.assertRaises(Exception):
187 session.add_vpp_config()
189 def test_add_auth_nonexistent_key(self):
190 """ create BFD session using non-existent SHA1 (negative case) """
191 session = VppBFDUDPSession(
192 self, self.pg0, self.pg0.remote_ip4,
193 sha1_key=self.factory.create_random_key(self))
194 with self.assertRaises(Exception):
195 session.add_vpp_config()
197 def test_shared_sha1_key(self):
198 """ share single SHA1 key between multiple BFD sessions """
199 key = self.factory.create_random_key(self)
202 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
204 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
205 sha1_key=key, af=AF_INET6),
206 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
208 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
209 sha1_key=key, af=AF_INET6)]
214 e = key.get_bfd_auth_keys_dump_entry()
215 self.assert_equal(e.use_count, len(sessions) - removed,
216 "Use count for shared key")
217 s.remove_vpp_config()
219 e = key.get_bfd_auth_keys_dump_entry()
220 self.assert_equal(e.use_count, len(sessions) - removed,
221 "Use count for shared key")
223 def test_activate_auth(self):
224 """ activate SHA1 authentication """
225 key = self.factory.create_random_key(self)
227 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
228 session.add_vpp_config()
229 session.activate_auth(key)
231 def test_deactivate_auth(self):
232 """ deactivate SHA1 authentication """
233 key = self.factory.create_random_key(self)
235 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
236 session.add_vpp_config()
237 session.activate_auth(key)
238 session.deactivate_auth()
240 def test_change_key(self):
241 """ change SHA1 key """
242 key1 = self.factory.create_random_key(self)
243 key2 = self.factory.create_random_key(self)
244 while key2.conf_key_id == key1.conf_key_id:
245 key2 = self.factory.create_random_key(self)
246 key1.add_vpp_config()
247 key2.add_vpp_config()
248 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
250 session.add_vpp_config()
251 session.activate_auth(key2)
254 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
255 class BFDTestSession(object):
256 """ BFD session as seen from test framework side """
258 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
259 bfd_key_id=None, our_seq_number=None):
262 self.sha1_key = sha1_key
263 self.bfd_key_id = bfd_key_id
264 self.interface = interface
265 self.udp_sport = randint(49152, 65535)
266 if our_seq_number is None:
267 self.our_seq_number = randint(0, 40000000)
269 self.our_seq_number = our_seq_number
270 self.vpp_seq_number = None
271 self.my_discriminator = 0
272 self.desired_min_tx = 300000
273 self.required_min_rx = 300000
274 self.required_min_echo_rx = None
275 self.detect_mult = detect_mult
276 self.diag = BFDDiagCode.no_diagnostic
277 self.your_discriminator = None
278 self.state = BFDState.down
279 self.auth_type = BFDAuthType.no_auth
281 def inc_seq_num(self):
282 """ increment sequence number, wrapping if needed """
283 if self.our_seq_number == 0xFFFFFFFF:
284 self.our_seq_number = 0
286 self.our_seq_number += 1
288 def update(self, my_discriminator=None, your_discriminator=None,
289 desired_min_tx=None, required_min_rx=None,
290 required_min_echo_rx=None, detect_mult=None,
291 diag=None, state=None, auth_type=None):
292 """ update BFD parameters associated with session """
293 if my_discriminator is not None:
294 self.my_discriminator = my_discriminator
295 if your_discriminator is not None:
296 self.your_discriminator = your_discriminator
297 if required_min_rx is not None:
298 self.required_min_rx = required_min_rx
299 if required_min_echo_rx is not None:
300 self.required_min_echo_rx = required_min_echo_rx
301 if desired_min_tx is not None:
302 self.desired_min_tx = desired_min_tx
303 if detect_mult is not None:
304 self.detect_mult = detect_mult
307 if state is not None:
309 if auth_type is not None:
310 self.auth_type = auth_type
312 def fill_packet_fields(self, packet):
313 """ set packet fields with known values in packet """
315 if self.my_discriminator:
316 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
317 self.my_discriminator)
318 bfd.my_discriminator = self.my_discriminator
319 if self.your_discriminator:
320 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
321 self.your_discriminator)
322 bfd.your_discriminator = self.your_discriminator
323 if self.required_min_rx:
324 self.test.logger.debug(
325 "BFD: setting packet.required_min_rx_interval=%s",
326 self.required_min_rx)
327 bfd.required_min_rx_interval = self.required_min_rx
328 if self.required_min_echo_rx:
329 self.test.logger.debug(
330 "BFD: setting packet.required_min_echo_rx=%s",
331 self.required_min_echo_rx)
332 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
333 if self.desired_min_tx:
334 self.test.logger.debug(
335 "BFD: setting packet.desired_min_tx_interval=%s",
337 bfd.desired_min_tx_interval = self.desired_min_tx
339 self.test.logger.debug(
340 "BFD: setting packet.detect_mult=%s", self.detect_mult)
341 bfd.detect_mult = self.detect_mult
343 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
346 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
347 bfd.state = self.state
349 # this is used by a negative test-case
350 self.test.logger.debug("BFD: setting packet.auth_type=%s",
352 bfd.auth_type = self.auth_type
354 def create_packet(self):
355 """ create a BFD packet, reflecting the current state of session """
358 bfd.auth_type = self.sha1_key.auth_type
359 bfd.auth_len = BFD.sha1_auth_len
360 bfd.auth_key_id = self.bfd_key_id
361 bfd.auth_seq_num = self.our_seq_number
362 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
365 if self.af == AF_INET6:
366 packet = (Ether(src=self.interface.remote_mac,
367 dst=self.interface.local_mac) /
368 IPv6(src=self.interface.remote_ip6,
369 dst=self.interface.local_ip6,
371 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
374 packet = (Ether(src=self.interface.remote_mac,
375 dst=self.interface.local_mac) /
376 IP(src=self.interface.remote_ip4,
377 dst=self.interface.local_ip4,
379 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
381 self.test.logger.debug("BFD: Creating packet")
382 self.fill_packet_fields(packet)
384 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
385 "\0" * (20 - len(self.sha1_key.key))
386 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
387 hashlib.sha1(hash_material).hexdigest())
388 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
391 def send_packet(self, packet=None, interface=None):
392 """ send packet on interface, creating the packet if needed """
394 packet = self.create_packet()
395 if interface is None:
396 interface = self.test.pg0
397 self.test.logger.debug(ppp("Sending packet:", packet))
398 interface.add_stream(packet)
401 def verify_sha1_auth(self, packet):
402 """ Verify correctness of authentication in BFD layer. """
404 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
405 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
407 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
408 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
409 if self.vpp_seq_number is None:
410 self.vpp_seq_number = bfd.auth_seq_num
411 self.test.logger.debug("Received initial sequence number: %s" %
414 recvd_seq_num = bfd.auth_seq_num
415 self.test.logger.debug("Received followup sequence number: %s" %
417 if self.vpp_seq_number < 0xffffffff:
418 if self.sha1_key.auth_type == \
419 BFDAuthType.meticulous_keyed_sha1:
420 self.test.assert_equal(recvd_seq_num,
421 self.vpp_seq_number + 1,
422 "BFD sequence number")
424 self.test.assert_in_range(recvd_seq_num,
426 self.vpp_seq_number + 1,
427 "BFD sequence number")
429 if self.sha1_key.auth_type == \
430 BFDAuthType.meticulous_keyed_sha1:
431 self.test.assert_equal(recvd_seq_num, 0,
432 "BFD sequence number")
434 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
435 "BFD sequence number not one of "
436 "(%s, 0)" % self.vpp_seq_number)
437 self.vpp_seq_number = recvd_seq_num
438 # last 20 bytes represent the hash - so replace them with the key,
439 # pad the result with zeros and hash the result
440 hash_material = bfd.original[:-20] + self.sha1_key.key + \
441 "\0" * (20 - len(self.sha1_key.key))
442 expected_hash = hashlib.sha1(hash_material).hexdigest()
443 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
444 expected_hash, "Auth key hash")
446 def verify_bfd(self, packet):
447 """ Verify correctness of BFD layer. """
449 self.test.assert_equal(bfd.version, 1, "BFD version")
450 self.test.assert_equal(bfd.your_discriminator,
451 self.my_discriminator,
452 "BFD - your discriminator")
454 self.verify_sha1_auth(packet)
457 def bfd_session_up(test):
458 """ Bring BFD session up """
459 test.logger.info("BFD: Waiting for slow hello")
460 p = wait_for_bfd_packet(test, 2)
462 if hasattr(test, 'vpp_clock_offset'):
463 old_offset = test.vpp_clock_offset
464 test.vpp_clock_offset = time.time() - p.time
465 test.logger.debug("BFD: Calculated vpp clock offset: %s",
466 test.vpp_clock_offset)
468 test.assertAlmostEqual(
469 old_offset, test.vpp_clock_offset, delta=0.5,
470 msg="vpp clock offset not stable (new: %s, old: %s)" %
471 (test.vpp_clock_offset, old_offset))
472 test.logger.info("BFD: Sending Init")
473 test.test_session.update(my_discriminator=randint(0, 40000000),
474 your_discriminator=p[BFD].my_discriminator,
476 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
477 BFDAuthType.meticulous_keyed_sha1:
478 test.test_session.inc_seq_num()
479 test.test_session.send_packet()
480 test.logger.info("BFD: Waiting for event")
481 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
482 verify_event(test, e, expected_state=BFDState.up)
483 test.logger.info("BFD: Session is Up")
484 test.test_session.update(state=BFDState.up)
485 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
486 BFDAuthType.meticulous_keyed_sha1:
487 test.test_session.inc_seq_num()
488 test.test_session.send_packet()
489 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
492 def bfd_session_down(test):
493 """ Bring BFD session down """
494 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
495 test.test_session.update(state=BFDState.down)
496 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
497 BFDAuthType.meticulous_keyed_sha1:
498 test.test_session.inc_seq_num()
499 test.test_session.send_packet()
500 test.logger.info("BFD: Waiting for event")
501 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
502 verify_event(test, e, expected_state=BFDState.down)
503 test.logger.info("BFD: Session is Down")
504 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
507 def verify_bfd_session_config(test, session, state=None):
508 dump = session.get_bfd_udp_session_dump_entry()
509 test.assertIsNotNone(dump)
510 # since dump is not none, we have verified that sw_if_index and addresses
511 # are valid (in get_bfd_udp_session_dump_entry)
513 test.assert_equal(dump.state, state, "session state")
514 test.assert_equal(dump.required_min_rx, session.required_min_rx,
515 "required min rx interval")
516 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
517 "desired min tx interval")
518 test.assert_equal(dump.detect_mult, session.detect_mult,
520 if session.sha1_key is None:
521 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
523 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
524 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
526 test.assert_equal(dump.conf_key_id,
527 session.sha1_key.conf_key_id,
531 def verify_ip(test, packet):
532 """ Verify correctness of IP layer. """
533 if test.vpp_session.af == AF_INET6:
535 local_ip = test.pg0.local_ip6
536 remote_ip = test.pg0.remote_ip6
537 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
540 local_ip = test.pg0.local_ip4
541 remote_ip = test.pg0.remote_ip4
542 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
543 test.assert_equal(ip.src, local_ip, "IP source address")
544 test.assert_equal(ip.dst, remote_ip, "IP destination address")
547 def verify_udp(test, packet):
548 """ Verify correctness of UDP layer. """
550 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
551 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
555 def verify_event(test, event, expected_state):
556 """ Verify correctness of event values. """
558 test.logger.debug("BFD: Event: %s" % repr(e))
559 test.assert_equal(e.sw_if_index,
560 test.vpp_session.interface.sw_if_index,
561 "BFD interface index")
563 if test.vpp_session.af == AF_INET6:
565 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
566 if test.vpp_session.af == AF_INET:
567 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
568 "Local IPv4 address")
569 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
572 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
573 "Local IPv6 address")
574 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
576 test.assert_equal(e.state, expected_state, BFDState)
579 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
580 """ wait for BFD packet and verify its correctness
582 :param timeout: how long to wait
583 :param pcap_time_min: ignore packets with pcap timestamp lower than this
585 :returns: tuple (packet, time spent waiting for packet)
587 test.logger.info("BFD: Waiting for BFD packet")
588 deadline = time.time() + timeout
593 test.assert_in_range(counter, 0, 100, "number of packets ignored")
594 time_left = deadline - time.time()
596 raise CaptureTimeoutError("Packet did not arrive within timeout")
597 p = test.pg0.wait_for_packet(timeout=time_left)
598 test.logger.debug(ppp("BFD: Got packet:", p))
599 if pcap_time_min is not None and p.time < pcap_time_min:
600 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
601 "pcap time min %s):" %
602 (p.time, pcap_time_min), p))
607 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
609 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
612 test.test_session.verify_bfd(p)
616 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
617 class BFD4TestCase(VppTestCase):
618 """Bidirectional Forwarding Detection (BFD)"""
621 vpp_clock_offset = None
627 super(BFD4TestCase, cls).setUpClass()
629 cls.create_pg_interfaces([0])
630 cls.create_loopback_interfaces([0])
631 cls.loopback0 = cls.lo_interfaces[0]
632 cls.loopback0.config_ip4()
633 cls.loopback0.admin_up()
635 cls.pg0.configure_ipv4_neighbors()
637 cls.pg0.resolve_arp()
640 super(BFD4TestCase, cls).tearDownClass()
644 super(BFD4TestCase, self).setUp()
645 self.factory = AuthKeyFactory()
646 self.vapi.want_bfd_events()
647 self.pg0.enable_capture()
649 self.vpp_session = VppBFDUDPSession(self, self.pg0,
651 self.vpp_session.add_vpp_config()
652 self.vpp_session.admin_up()
653 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
655 self.vapi.want_bfd_events(enable_disable=0)
659 if not self.vpp_dead:
660 self.vapi.want_bfd_events(enable_disable=0)
661 self.vapi.collect_events() # clear the event queue
662 super(BFD4TestCase, self).tearDown()
664 def test_session_up(self):
665 """ bring BFD session up """
668 def test_session_up_by_ip(self):
669 """ bring BFD session up - first frame looked up by address pair """
670 self.logger.info("BFD: Sending Slow control frame")
671 self.test_session.update(my_discriminator=randint(0, 40000000))
672 self.test_session.send_packet()
673 self.pg0.enable_capture()
674 p = self.pg0.wait_for_packet(1)
675 self.assert_equal(p[BFD].your_discriminator,
676 self.test_session.my_discriminator,
677 "BFD - your discriminator")
678 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
679 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
681 self.logger.info("BFD: Waiting for event")
682 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
683 verify_event(self, e, expected_state=BFDState.init)
684 self.logger.info("BFD: Sending Up")
685 self.test_session.send_packet()
686 self.logger.info("BFD: Waiting for event")
687 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
688 verify_event(self, e, expected_state=BFDState.up)
689 self.logger.info("BFD: Session is Up")
690 self.test_session.update(state=BFDState.up)
691 self.test_session.send_packet()
692 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
694 def test_session_down(self):
695 """ bring BFD session down """
697 bfd_session_down(self)
699 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
700 def test_hold_up(self):
701 """ hold BFD session up """
703 for dummy in range(self.test_session.detect_mult * 2):
704 wait_for_bfd_packet(self)
705 self.test_session.send_packet()
706 self.assert_equal(len(self.vapi.collect_events()), 0,
707 "number of bfd events")
709 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
710 def test_slow_timer(self):
711 """ verify slow periodic control frames while session down """
713 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
714 prev_packet = wait_for_bfd_packet(self, 2)
715 for dummy in range(packet_count):
716 next_packet = wait_for_bfd_packet(self, 2)
717 time_diff = next_packet.time - prev_packet.time
718 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
719 # to work around timing issues
720 self.assert_in_range(
721 time_diff, 0.70, 1.05, "time between slow packets")
722 prev_packet = next_packet
724 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
725 def test_zero_remote_min_rx(self):
726 """ no packets when zero remote required min rx interval """
728 self.test_session.update(required_min_rx=0)
729 self.test_session.send_packet()
730 for dummy in range(self.test_session.detect_mult):
731 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
732 "sleep before transmitting bfd packet")
733 self.test_session.send_packet()
735 p = wait_for_bfd_packet(self, timeout=0)
736 self.logger.error(ppp("Received unexpected packet:", p))
737 except CaptureTimeoutError:
740 len(self.vapi.collect_events()), 0, "number of bfd events")
741 self.test_session.update(required_min_rx=300000)
742 for dummy in range(3):
743 self.test_session.send_packet()
745 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
747 len(self.vapi.collect_events()), 0, "number of bfd events")
749 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
750 def test_conn_down(self):
751 """ verify session goes down after inactivity """
753 detection_time = self.test_session.detect_mult *\
754 self.vpp_session.required_min_rx / USEC_IN_SEC
755 self.sleep(detection_time, "waiting for BFD session time-out")
756 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
757 verify_event(self, e, expected_state=BFDState.down)
759 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
760 def test_large_required_min_rx(self):
761 """ large remote required min rx interval """
763 p = wait_for_bfd_packet(self)
765 self.test_session.update(required_min_rx=interval)
766 self.test_session.send_packet()
767 time_mark = time.time()
769 # busy wait here, trying to collect a packet or event, vpp is not
770 # allowed to send packets and the session will timeout first - so the
771 # Up->Down event must arrive before any packets do
772 while time.time() < time_mark + interval / USEC_IN_SEC:
774 p = wait_for_bfd_packet(self, timeout=0)
775 # if vpp managed to send a packet before we did the session
776 # session update, then that's fine, ignore it
777 if p.time < time_mark - self.vpp_clock_offset:
779 self.logger.error(ppp("Received unexpected packet:", p))
781 except CaptureTimeoutError:
783 events = self.vapi.collect_events()
785 verify_event(self, events[0], BFDState.down)
787 self.assert_equal(count, 0, "number of packets received")
789 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
790 def test_immediate_remote_min_rx_reduction(self):
791 """ immediately honor remote required min rx reduction """
792 self.vpp_session.remove_vpp_config()
793 self.vpp_session = VppBFDUDPSession(
794 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
795 self.pg0.enable_capture()
796 self.vpp_session.add_vpp_config()
797 self.test_session.update(desired_min_tx=1000000,
798 required_min_rx=1000000)
800 reference_packet = wait_for_bfd_packet(self)
801 time_mark = time.time()
803 self.test_session.update(required_min_rx=interval)
804 self.test_session.send_packet()
805 extra_time = time.time() - time_mark
806 p = wait_for_bfd_packet(self)
807 # first packet is allowed to be late by time we spent doing the update
808 # calculated in extra_time
809 self.assert_in_range(p.time - reference_packet.time,
810 .95 * 0.75 * interval / USEC_IN_SEC,
811 1.05 * interval / USEC_IN_SEC + extra_time,
812 "time between BFD packets")
814 for dummy in range(3):
815 p = wait_for_bfd_packet(self)
816 diff = p.time - reference_packet.time
817 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
818 1.05 * interval / USEC_IN_SEC,
819 "time between BFD packets")
822 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
823 def test_modify_req_min_rx_double(self):
824 """ modify session - double required min rx """
826 p = wait_for_bfd_packet(self)
827 self.test_session.update(desired_min_tx=10000,
828 required_min_rx=10000)
829 self.test_session.send_packet()
830 # double required min rx
831 self.vpp_session.modify_parameters(
832 required_min_rx=2 * self.vpp_session.required_min_rx)
833 p = wait_for_bfd_packet(
834 self, pcap_time_min=time.time() - self.vpp_clock_offset)
835 # poll bit needs to be set
836 self.assertIn("P", p.sprintf("%BFD.flags%"),
837 "Poll bit not set in BFD packet")
838 # finish poll sequence with final packet
839 final = self.test_session.create_packet()
840 final[BFD].flags = "F"
841 timeout = self.test_session.detect_mult * \
842 max(self.test_session.desired_min_tx,
843 self.vpp_session.required_min_rx) / USEC_IN_SEC
844 self.test_session.send_packet(final)
845 time_mark = time.time()
846 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
847 verify_event(self, e, expected_state=BFDState.down)
848 time_to_event = time.time() - time_mark
849 self.assert_in_range(time_to_event, .9 * timeout,
850 1.1 * timeout, "session timeout")
852 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
853 def test_modify_req_min_rx_halve(self):
854 """ modify session - halve required min rx """
855 self.vpp_session.modify_parameters(
856 required_min_rx=2 * self.vpp_session.required_min_rx)
858 p = wait_for_bfd_packet(self)
859 self.test_session.update(desired_min_tx=10000,
860 required_min_rx=10000)
861 self.test_session.send_packet()
862 p = wait_for_bfd_packet(
863 self, pcap_time_min=time.time() - self.vpp_clock_offset)
864 # halve required min rx
865 old_required_min_rx = self.vpp_session.required_min_rx
866 self.vpp_session.modify_parameters(
867 required_min_rx=0.5 * self.vpp_session.required_min_rx)
868 # now we wait 0.8*3*old-req-min-rx and the session should still be up
869 self.sleep(0.8 * self.vpp_session.detect_mult *
870 old_required_min_rx / USEC_IN_SEC,
871 "wait before finishing poll sequence")
872 self.assert_equal(len(self.vapi.collect_events()), 0,
873 "number of bfd events")
874 p = wait_for_bfd_packet(self)
875 # poll bit needs to be set
876 self.assertIn("P", p.sprintf("%BFD.flags%"),
877 "Poll bit not set in BFD packet")
878 # finish poll sequence with final packet
879 final = self.test_session.create_packet()
880 final[BFD].flags = "F"
881 self.test_session.send_packet(final)
882 # now the session should time out under new conditions
883 detection_time = self.test_session.detect_mult *\
884 self.vpp_session.required_min_rx / USEC_IN_SEC
886 e = self.vapi.wait_for_event(
887 2 * detection_time, "bfd_udp_session_details")
889 self.assert_in_range(after - before,
890 0.9 * detection_time,
891 1.1 * detection_time,
892 "time before bfd session goes down")
893 verify_event(self, e, expected_state=BFDState.down)
895 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
896 def test_modify_detect_mult(self):
897 """ modify detect multiplier """
899 p = wait_for_bfd_packet(self)
900 self.vpp_session.modify_parameters(detect_mult=1)
901 p = wait_for_bfd_packet(
902 self, pcap_time_min=time.time() - self.vpp_clock_offset)
903 self.assert_equal(self.vpp_session.detect_mult,
906 # poll bit must not be set
907 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
908 "Poll bit not set in BFD packet")
909 self.vpp_session.modify_parameters(detect_mult=10)
910 p = wait_for_bfd_packet(
911 self, pcap_time_min=time.time() - self.vpp_clock_offset)
912 self.assert_equal(self.vpp_session.detect_mult,
915 # poll bit must not be set
916 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
917 "Poll bit not set in BFD packet")
919 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
920 def test_queued_poll(self):
921 """ test poll sequence queueing """
923 p = wait_for_bfd_packet(self)
924 self.vpp_session.modify_parameters(
925 required_min_rx=2 * self.vpp_session.required_min_rx)
926 p = wait_for_bfd_packet(self)
927 poll_sequence_start = time.time()
928 poll_sequence_length_min = 0.5
929 send_final_after = time.time() + poll_sequence_length_min
930 # poll bit needs to be set
931 self.assertIn("P", p.sprintf("%BFD.flags%"),
932 "Poll bit not set in BFD packet")
933 self.assert_equal(p[BFD].required_min_rx_interval,
934 self.vpp_session.required_min_rx,
935 "BFD required min rx interval")
936 self.vpp_session.modify_parameters(
937 required_min_rx=2 * self.vpp_session.required_min_rx)
938 # 2nd poll sequence should be queued now
939 # don't send the reply back yet, wait for some time to emulate
940 # longer round-trip time
942 while time.time() < send_final_after:
943 self.test_session.send_packet()
944 p = wait_for_bfd_packet(self)
945 self.assert_equal(len(self.vapi.collect_events()), 0,
946 "number of bfd events")
947 self.assert_equal(p[BFD].required_min_rx_interval,
948 self.vpp_session.required_min_rx,
949 "BFD required min rx interval")
951 # poll bit must be set
952 self.assertIn("P", p.sprintf("%BFD.flags%"),
953 "Poll bit not set in BFD packet")
954 final = self.test_session.create_packet()
955 final[BFD].flags = "F"
956 self.test_session.send_packet(final)
957 # finish 1st with final
958 poll_sequence_length = time.time() - poll_sequence_start
959 # vpp must wait for some time before starting new poll sequence
960 poll_no_2_started = False
961 for dummy in range(2 * packet_count):
962 p = wait_for_bfd_packet(self)
963 self.assert_equal(len(self.vapi.collect_events()), 0,
964 "number of bfd events")
965 if "P" in p.sprintf("%BFD.flags%"):
966 poll_no_2_started = True
967 if time.time() < poll_sequence_start + poll_sequence_length:
968 raise Exception("VPP started 2nd poll sequence too soon")
969 final = self.test_session.create_packet()
970 final[BFD].flags = "F"
971 self.test_session.send_packet(final)
974 self.test_session.send_packet()
975 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
976 # finish 2nd with final
977 final = self.test_session.create_packet()
978 final[BFD].flags = "F"
979 self.test_session.send_packet(final)
980 p = wait_for_bfd_packet(self)
981 # poll bit must not be set
982 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
983 "Poll bit set in BFD packet")
985 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
986 def test_poll_response(self):
987 """ test correct response to control frame with poll bit set """
989 poll = self.test_session.create_packet()
990 poll[BFD].flags = "P"
991 self.test_session.send_packet(poll)
992 final = wait_for_bfd_packet(
993 self, pcap_time_min=time.time() - self.vpp_clock_offset)
994 self.assertIn("F", final.sprintf("%BFD.flags%"))
996 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
997 def test_no_periodic_if_remote_demand(self):
998 """ no periodic frames outside poll sequence if remote demand set """
1000 demand = self.test_session.create_packet()
1001 demand[BFD].flags = "D"
1002 self.test_session.send_packet(demand)
1003 transmit_time = 0.9 \
1004 * max(self.vpp_session.required_min_rx,
1005 self.test_session.desired_min_tx) \
1008 for dummy in range(self.test_session.detect_mult * 2):
1009 time.sleep(transmit_time)
1010 self.test_session.send_packet(demand)
1012 p = wait_for_bfd_packet(self, timeout=0)
1013 self.logger.error(ppp("Received unexpected packet:", p))
1015 except CaptureTimeoutError:
1017 events = self.vapi.collect_events()
1019 self.logger.error("Received unexpected event: %s", e)
1020 self.assert_equal(count, 0, "number of packets received")
1021 self.assert_equal(len(events), 0, "number of events received")
1023 def test_echo_looped_back(self):
1024 """ echo packets looped back """
1025 # don't need a session in this case..
1026 self.vpp_session.remove_vpp_config()
1027 self.pg0.enable_capture()
1028 echo_packet_count = 10
1029 # random source port low enough to increment a few times..
1030 udp_sport_tx = randint(1, 50000)
1031 udp_sport_rx = udp_sport_tx
1032 echo_packet = (Ether(src=self.pg0.remote_mac,
1033 dst=self.pg0.local_mac) /
1034 IP(src=self.pg0.remote_ip4,
1035 dst=self.pg0.remote_ip4) /
1036 UDP(dport=BFD.udp_dport_echo) /
1037 Raw("this should be looped back"))
1038 for dummy in range(echo_packet_count):
1039 self.sleep(.01, "delay between echo packets")
1040 echo_packet[UDP].sport = udp_sport_tx
1042 self.logger.debug(ppp("Sending packet:", echo_packet))
1043 self.pg0.add_stream(echo_packet)
1045 for dummy in range(echo_packet_count):
1046 p = self.pg0.wait_for_packet(1)
1047 self.logger.debug(ppp("Got packet:", p))
1049 self.assert_equal(self.pg0.remote_mac,
1050 ether.dst, "Destination MAC")
1051 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1053 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1054 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1056 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1057 "UDP destination port")
1058 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1060 # need to compare the hex payload here, otherwise BFD_vpp_echo
1062 self.assertEqual(str(p[UDP].payload),
1063 str(echo_packet[UDP].payload),
1064 "Received packet is not the echo packet sent")
1065 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1066 "ECHO packet identifier for test purposes)")
1068 def test_echo(self):
1069 """ echo function """
1070 bfd_session_up(self)
1071 self.test_session.update(required_min_echo_rx=150000)
1072 self.test_session.send_packet()
1073 detection_time = self.test_session.detect_mult *\
1074 self.vpp_session.required_min_rx / USEC_IN_SEC
1075 # echo shouldn't work without echo source set
1076 for dummy in range(10):
1077 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1078 self.sleep(sleep, "delay before sending bfd packet")
1079 self.test_session.send_packet()
1080 p = wait_for_bfd_packet(
1081 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1082 self.assert_equal(p[BFD].required_min_rx_interval,
1083 self.vpp_session.required_min_rx,
1084 "BFD required min rx interval")
1085 self.test_session.send_packet()
1086 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1088 # should be turned on - loopback echo packets
1089 for dummy in range(3):
1090 loop_until = time.time() + 0.75 * detection_time
1091 while time.time() < loop_until:
1092 p = self.pg0.wait_for_packet(1)
1093 self.logger.debug(ppp("Got packet:", p))
1094 if p[UDP].dport == BFD.udp_dport_echo:
1096 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1097 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1098 "BFD ECHO src IP equal to loopback IP")
1099 self.logger.debug(ppp("Looping back packet:", p))
1100 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1101 "ECHO packet destination MAC address")
1102 p[Ether].dst = self.pg0.local_mac
1103 self.pg0.add_stream(p)
1106 elif p.haslayer(BFD):
1108 self.assertGreaterEqual(
1109 p[BFD].required_min_rx_interval,
1111 if "P" in p.sprintf("%BFD.flags%"):
1112 final = self.test_session.create_packet()
1113 final[BFD].flags = "F"
1114 self.test_session.send_packet(final)
1116 raise Exception(ppp("Received unknown packet:", p))
1118 self.assert_equal(len(self.vapi.collect_events()), 0,
1119 "number of bfd events")
1120 self.test_session.send_packet()
1121 self.assertTrue(echo_seen, "No echo packets received")
1123 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1124 def test_echo_fail(self):
1125 """ session goes down if echo function fails """
1126 bfd_session_up(self)
1127 self.test_session.update(required_min_echo_rx=150000)
1128 self.test_session.send_packet()
1129 detection_time = self.test_session.detect_mult *\
1130 self.vpp_session.required_min_rx / USEC_IN_SEC
1131 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1132 # echo function should be used now, but we will drop the echo packets
1133 verified_diag = False
1134 for dummy in range(3):
1135 loop_until = time.time() + 0.75 * detection_time
1136 while time.time() < loop_until:
1137 p = self.pg0.wait_for_packet(1)
1138 self.logger.debug(ppp("Got packet:", p))
1139 if p[UDP].dport == BFD.udp_dport_echo:
1142 elif p.haslayer(BFD):
1143 if "P" in p.sprintf("%BFD.flags%"):
1144 self.assertGreaterEqual(
1145 p[BFD].required_min_rx_interval,
1147 final = self.test_session.create_packet()
1148 final[BFD].flags = "F"
1149 self.test_session.send_packet(final)
1150 if p[BFD].state == BFDState.down:
1151 self.assert_equal(p[BFD].diag,
1152 BFDDiagCode.echo_function_failed,
1154 verified_diag = True
1156 raise Exception(ppp("Received unknown packet:", p))
1157 self.test_session.send_packet()
1158 events = self.vapi.collect_events()
1159 self.assert_equal(len(events), 1, "number of bfd events")
1160 self.assert_equal(events[0].state, BFDState.down, BFDState)
1161 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1163 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1164 def test_echo_stop(self):
1165 """ echo function stops if peer sets required min echo rx zero """
1166 bfd_session_up(self)
1167 self.test_session.update(required_min_echo_rx=150000)
1168 self.test_session.send_packet()
1169 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1170 # wait for first echo packet
1172 p = self.pg0.wait_for_packet(1)
1173 self.logger.debug(ppp("Got packet:", p))
1174 if p[UDP].dport == BFD.udp_dport_echo:
1175 self.logger.debug(ppp("Looping back packet:", p))
1176 p[Ether].dst = self.pg0.local_mac
1177 self.pg0.add_stream(p)
1180 elif p.haslayer(BFD):
1184 raise Exception(ppp("Received unknown packet:", p))
1185 self.test_session.update(required_min_echo_rx=0)
1186 self.test_session.send_packet()
1187 # echo packets shouldn't arrive anymore
1188 for dummy in range(5):
1189 wait_for_bfd_packet(
1190 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1191 self.test_session.send_packet()
1192 events = self.vapi.collect_events()
1193 self.assert_equal(len(events), 0, "number of bfd events")
1195 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1196 def test_echo_source_removed(self):
1197 """ echo function stops if echo source is removed """
1198 bfd_session_up(self)
1199 self.test_session.update(required_min_echo_rx=150000)
1200 self.test_session.send_packet()
1201 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1202 # wait for first echo packet
1204 p = self.pg0.wait_for_packet(1)
1205 self.logger.debug(ppp("Got packet:", p))
1206 if p[UDP].dport == BFD.udp_dport_echo:
1207 self.logger.debug(ppp("Looping back packet:", p))
1208 p[Ether].dst = self.pg0.local_mac
1209 self.pg0.add_stream(p)
1212 elif p.haslayer(BFD):
1216 raise Exception(ppp("Received unknown packet:", p))
1217 self.vapi.bfd_udp_del_echo_source()
1218 self.test_session.send_packet()
1219 # echo packets shouldn't arrive anymore
1220 for dummy in range(5):
1221 wait_for_bfd_packet(
1222 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1223 self.test_session.send_packet()
1224 events = self.vapi.collect_events()
1225 self.assert_equal(len(events), 0, "number of bfd events")
1227 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1228 def test_stale_echo(self):
1229 """ stale echo packets don't keep a session up """
1230 bfd_session_up(self)
1231 self.test_session.update(required_min_echo_rx=150000)
1232 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1233 self.test_session.send_packet()
1234 # should be turned on - loopback echo packets
1238 for dummy in range(10 * self.vpp_session.detect_mult):
1239 p = self.pg0.wait_for_packet(1)
1240 if p[UDP].dport == BFD.udp_dport_echo:
1241 if echo_packet is None:
1242 self.logger.debug(ppp("Got first echo packet:", p))
1244 timeout_at = time.time() + self.vpp_session.detect_mult * \
1245 self.test_session.required_min_echo_rx / USEC_IN_SEC
1247 self.logger.debug(ppp("Got followup echo packet:", p))
1248 self.logger.debug(ppp("Looping back first echo packet:", p))
1249 echo_packet[Ether].dst = self.pg0.local_mac
1250 self.pg0.add_stream(echo_packet)
1252 elif p.haslayer(BFD):
1253 self.logger.debug(ppp("Got packet:", p))
1254 if "P" in p.sprintf("%BFD.flags%"):
1255 final = self.test_session.create_packet()
1256 final[BFD].flags = "F"
1257 self.test_session.send_packet(final)
1258 if p[BFD].state == BFDState.down:
1259 self.assertIsNotNone(
1261 "Session went down before first echo packet received")
1263 self.assertGreaterEqual(
1265 "Session timeout at %s, but is expected at %s" %
1267 self.assert_equal(p[BFD].diag,
1268 BFDDiagCode.echo_function_failed,
1270 events = self.vapi.collect_events()
1271 self.assert_equal(len(events), 1, "number of bfd events")
1272 self.assert_equal(events[0].state, BFDState.down, BFDState)
1276 raise Exception(ppp("Received unknown packet:", p))
1277 self.test_session.send_packet()
1278 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1280 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1281 def test_invalid_echo_checksum(self):
1282 """ echo packets with invalid checksum don't keep a session up """
1283 bfd_session_up(self)
1284 self.test_session.update(required_min_echo_rx=150000)
1285 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1286 self.test_session.send_packet()
1287 # should be turned on - loopback echo packets
1290 for dummy in range(10 * self.vpp_session.detect_mult):
1291 p = self.pg0.wait_for_packet(1)
1292 if p[UDP].dport == BFD.udp_dport_echo:
1293 self.logger.debug(ppp("Got echo packet:", p))
1294 if timeout_at is None:
1295 timeout_at = time.time() + self.vpp_session.detect_mult * \
1296 self.test_session.required_min_echo_rx / USEC_IN_SEC
1297 p[BFD_vpp_echo].checksum = getrandbits(64)
1298 p[Ether].dst = self.pg0.local_mac
1299 self.logger.debug(ppp("Looping back modified echo packet:", p))
1300 self.pg0.add_stream(p)
1302 elif p.haslayer(BFD):
1303 self.logger.debug(ppp("Got packet:", p))
1304 if "P" in p.sprintf("%BFD.flags%"):
1305 final = self.test_session.create_packet()
1306 final[BFD].flags = "F"
1307 self.test_session.send_packet(final)
1308 if p[BFD].state == BFDState.down:
1309 self.assertIsNotNone(
1311 "Session went down before first echo packet received")
1313 self.assertGreaterEqual(
1315 "Session timeout at %s, but is expected at %s" %
1317 self.assert_equal(p[BFD].diag,
1318 BFDDiagCode.echo_function_failed,
1320 events = self.vapi.collect_events()
1321 self.assert_equal(len(events), 1, "number of bfd events")
1322 self.assert_equal(events[0].state, BFDState.down, BFDState)
1326 raise Exception(ppp("Received unknown packet:", p))
1327 self.test_session.send_packet()
1328 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1330 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1331 def test_admin_up_down(self):
1332 """ put session admin-up and admin-down """
1333 bfd_session_up(self)
1334 self.vpp_session.admin_down()
1335 self.pg0.enable_capture()
1336 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1337 verify_event(self, e, expected_state=BFDState.admin_down)
1338 for dummy in range(2):
1339 p = wait_for_bfd_packet(self)
1340 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1341 # try to bring session up - shouldn't be possible
1342 self.test_session.update(state=BFDState.init)
1343 self.test_session.send_packet()
1344 for dummy in range(2):
1345 p = wait_for_bfd_packet(self)
1346 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1347 self.vpp_session.admin_up()
1348 self.test_session.update(state=BFDState.down)
1349 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1350 verify_event(self, e, expected_state=BFDState.down)
1351 p = wait_for_bfd_packet(
1352 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1353 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1354 self.test_session.send_packet()
1355 p = wait_for_bfd_packet(
1356 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1357 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1358 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1359 verify_event(self, e, expected_state=BFDState.init)
1360 self.test_session.update(state=BFDState.up)
1361 self.test_session.send_packet()
1362 p = wait_for_bfd_packet(
1363 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1364 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1365 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1366 verify_event(self, e, expected_state=BFDState.up)
1368 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1369 def test_config_change_remote_demand(self):
1370 """ configuration change while peer in demand mode """
1371 bfd_session_up(self)
1372 demand = self.test_session.create_packet()
1373 demand[BFD].flags = "D"
1374 self.test_session.send_packet(demand)
1375 self.vpp_session.modify_parameters(
1376 required_min_rx=2 * self.vpp_session.required_min_rx)
1377 p = wait_for_bfd_packet(
1378 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1379 # poll bit must be set
1380 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1381 # terminate poll sequence
1382 final = self.test_session.create_packet()
1383 final[BFD].flags = "D+F"
1384 self.test_session.send_packet(final)
1385 # vpp should be quiet now again
1386 transmit_time = 0.9 \
1387 * max(self.vpp_session.required_min_rx,
1388 self.test_session.desired_min_tx) \
1391 for dummy in range(self.test_session.detect_mult * 2):
1392 time.sleep(transmit_time)
1393 self.test_session.send_packet(demand)
1395 p = wait_for_bfd_packet(self, timeout=0)
1396 self.logger.error(ppp("Received unexpected packet:", p))
1398 except CaptureTimeoutError:
1400 events = self.vapi.collect_events()
1402 self.logger.error("Received unexpected event: %s", e)
1403 self.assert_equal(count, 0, "number of packets received")
1404 self.assert_equal(len(events), 0, "number of events received")
1407 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1408 class BFD6TestCase(VppTestCase):
1409 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1412 vpp_clock_offset = None
1417 def setUpClass(cls):
1418 super(BFD6TestCase, cls).setUpClass()
1420 cls.create_pg_interfaces([0])
1421 cls.pg0.config_ip6()
1422 cls.pg0.configure_ipv6_neighbors()
1424 cls.pg0.resolve_ndp()
1425 cls.create_loopback_interfaces([0])
1426 cls.loopback0 = cls.lo_interfaces[0]
1427 cls.loopback0.config_ip6()
1428 cls.loopback0.admin_up()
1431 super(BFD6TestCase, cls).tearDownClass()
1435 super(BFD6TestCase, self).setUp()
1436 self.factory = AuthKeyFactory()
1437 self.vapi.want_bfd_events()
1438 self.pg0.enable_capture()
1440 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1441 self.pg0.remote_ip6,
1443 self.vpp_session.add_vpp_config()
1444 self.vpp_session.admin_up()
1445 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1446 self.logger.debug(self.vapi.cli("show adj nbr"))
1448 self.vapi.want_bfd_events(enable_disable=0)
1452 if not self.vpp_dead:
1453 self.vapi.want_bfd_events(enable_disable=0)
1454 self.vapi.collect_events() # clear the event queue
1455 super(BFD6TestCase, self).tearDown()
1457 def test_session_up(self):
1458 """ bring BFD session up """
1459 bfd_session_up(self)
1461 def test_session_up_by_ip(self):
1462 """ bring BFD session up - first frame looked up by address pair """
1463 self.logger.info("BFD: Sending Slow control frame")
1464 self.test_session.update(my_discriminator=randint(0, 40000000))
1465 self.test_session.send_packet()
1466 self.pg0.enable_capture()
1467 p = self.pg0.wait_for_packet(1)
1468 self.assert_equal(p[BFD].your_discriminator,
1469 self.test_session.my_discriminator,
1470 "BFD - your discriminator")
1471 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1472 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1474 self.logger.info("BFD: Waiting for event")
1475 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1476 verify_event(self, e, expected_state=BFDState.init)
1477 self.logger.info("BFD: Sending Up")
1478 self.test_session.send_packet()
1479 self.logger.info("BFD: Waiting for event")
1480 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1481 verify_event(self, e, expected_state=BFDState.up)
1482 self.logger.info("BFD: Session is Up")
1483 self.test_session.update(state=BFDState.up)
1484 self.test_session.send_packet()
1485 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1487 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1488 def test_hold_up(self):
1489 """ hold BFD session up """
1490 bfd_session_up(self)
1491 for dummy in range(self.test_session.detect_mult * 2):
1492 wait_for_bfd_packet(self)
1493 self.test_session.send_packet()
1494 self.assert_equal(len(self.vapi.collect_events()), 0,
1495 "number of bfd events")
1496 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1498 def test_echo_looped_back(self):
1499 """ echo packets looped back """
1500 # don't need a session in this case..
1501 self.vpp_session.remove_vpp_config()
1502 self.pg0.enable_capture()
1503 echo_packet_count = 10
1504 # random source port low enough to increment a few times..
1505 udp_sport_tx = randint(1, 50000)
1506 udp_sport_rx = udp_sport_tx
1507 echo_packet = (Ether(src=self.pg0.remote_mac,
1508 dst=self.pg0.local_mac) /
1509 IPv6(src=self.pg0.remote_ip6,
1510 dst=self.pg0.remote_ip6) /
1511 UDP(dport=BFD.udp_dport_echo) /
1512 Raw("this should be looped back"))
1513 for dummy in range(echo_packet_count):
1514 self.sleep(.01, "delay between echo packets")
1515 echo_packet[UDP].sport = udp_sport_tx
1517 self.logger.debug(ppp("Sending packet:", echo_packet))
1518 self.pg0.add_stream(echo_packet)
1520 for dummy in range(echo_packet_count):
1521 p = self.pg0.wait_for_packet(1)
1522 self.logger.debug(ppp("Got packet:", p))
1524 self.assert_equal(self.pg0.remote_mac,
1525 ether.dst, "Destination MAC")
1526 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1528 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1529 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1531 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1532 "UDP destination port")
1533 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1535 # need to compare the hex payload here, otherwise BFD_vpp_echo
1537 self.assertEqual(str(p[UDP].payload),
1538 str(echo_packet[UDP].payload),
1539 "Received packet is not the echo packet sent")
1540 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1541 "ECHO packet identifier for test purposes)")
1542 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1543 "ECHO packet identifier for test purposes)")
1545 def test_echo(self):
1546 """ echo function """
1547 bfd_session_up(self)
1548 self.test_session.update(required_min_echo_rx=150000)
1549 self.test_session.send_packet()
1550 detection_time = self.test_session.detect_mult *\
1551 self.vpp_session.required_min_rx / USEC_IN_SEC
1552 # echo shouldn't work without echo source set
1553 for dummy in range(10):
1554 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1555 self.sleep(sleep, "delay before sending bfd packet")
1556 self.test_session.send_packet()
1557 p = wait_for_bfd_packet(
1558 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1559 self.assert_equal(p[BFD].required_min_rx_interval,
1560 self.vpp_session.required_min_rx,
1561 "BFD required min rx interval")
1562 self.test_session.send_packet()
1563 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1565 # should be turned on - loopback echo packets
1566 for dummy in range(3):
1567 loop_until = time.time() + 0.75 * detection_time
1568 while time.time() < loop_until:
1569 p = self.pg0.wait_for_packet(1)
1570 self.logger.debug(ppp("Got packet:", p))
1571 if p[UDP].dport == BFD.udp_dport_echo:
1573 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1574 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1575 "BFD ECHO src IP equal to loopback IP")
1576 self.logger.debug(ppp("Looping back packet:", p))
1577 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1578 "ECHO packet destination MAC address")
1579 p[Ether].dst = self.pg0.local_mac
1580 self.pg0.add_stream(p)
1583 elif p.haslayer(BFD):
1585 self.assertGreaterEqual(
1586 p[BFD].required_min_rx_interval,
1588 if "P" in p.sprintf("%BFD.flags%"):
1589 final = self.test_session.create_packet()
1590 final[BFD].flags = "F"
1591 self.test_session.send_packet(final)
1593 raise Exception(ppp("Received unknown packet:", p))
1595 self.assert_equal(len(self.vapi.collect_events()), 0,
1596 "number of bfd events")
1597 self.test_session.send_packet()
1598 self.assertTrue(echo_seen, "No echo packets received")
1601 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1602 class BFDFIBTestCase(VppTestCase):
1603 """ BFD-FIB interactions (IPv6) """
1609 super(BFDFIBTestCase, self).setUp()
1610 self.create_pg_interfaces(range(1))
1612 self.vapi.want_bfd_events()
1613 self.pg0.enable_capture()
1615 for i in self.pg_interfaces:
1618 i.configure_ipv6_neighbors()
1621 if not self.vpp_dead:
1622 self.vapi.want_bfd_events(enable_disable=0)
1624 super(BFDFIBTestCase, self).tearDown()
1627 def pkt_is_not_data_traffic(p):
1628 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1629 if p.haslayer(BFD) or is_ipv6_misc(p):
1633 def test_session_with_fib(self):
1634 """ BFD-FIB interactions """
1636 # packets to match against both of the routes
1637 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1638 IPv6(src="3001::1", dst="2001::1") /
1639 UDP(sport=1234, dport=1234) /
1641 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1642 IPv6(src="3001::1", dst="2002::1") /
1643 UDP(sport=1234, dport=1234) /
1646 # A recursive and a non-recursive route via a next-hop that
1647 # will have a BFD session
1648 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1649 [VppRoutePath(self.pg0.remote_ip6,
1650 self.pg0.sw_if_index,
1653 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1654 [VppRoutePath(self.pg0.remote_ip6,
1658 ip_2001_s_64.add_vpp_config()
1659 ip_2002_s_64.add_vpp_config()
1661 # bring the session up now the routes are present
1662 self.vpp_session = VppBFDUDPSession(self,
1664 self.pg0.remote_ip6,
1666 self.vpp_session.add_vpp_config()
1667 self.vpp_session.admin_up()
1668 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1670 # session is up - traffic passes
1671 bfd_session_up(self)
1673 self.pg0.add_stream(p)
1676 captured = self.pg0.wait_for_packet(
1678 filter_out_fn=self.pkt_is_not_data_traffic)
1679 self.assertEqual(captured[IPv6].dst,
1682 # session is up - traffic is dropped
1683 bfd_session_down(self)
1685 self.pg0.add_stream(p)
1687 with self.assertRaises(CaptureTimeoutError):
1688 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1690 # session is up - traffic passes
1691 bfd_session_up(self)
1693 self.pg0.add_stream(p)
1696 captured = self.pg0.wait_for_packet(
1698 filter_out_fn=self.pkt_is_not_data_traffic)
1699 self.assertEqual(captured[IPv6].dst,
1703 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1704 class BFDSHA1TestCase(VppTestCase):
1705 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1708 vpp_clock_offset = None
1713 def setUpClass(cls):
1714 super(BFDSHA1TestCase, cls).setUpClass()
1716 cls.create_pg_interfaces([0])
1717 cls.pg0.config_ip4()
1719 cls.pg0.resolve_arp()
1722 super(BFDSHA1TestCase, cls).tearDownClass()
1726 super(BFDSHA1TestCase, self).setUp()
1727 self.factory = AuthKeyFactory()
1728 self.vapi.want_bfd_events()
1729 self.pg0.enable_capture()
1732 if not self.vpp_dead:
1733 self.vapi.want_bfd_events(enable_disable=0)
1734 self.vapi.collect_events() # clear the event queue
1735 super(BFDSHA1TestCase, self).tearDown()
1737 def test_session_up(self):
1738 """ bring BFD session up """
1739 key = self.factory.create_random_key(self)
1740 key.add_vpp_config()
1741 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1742 self.pg0.remote_ip4,
1744 self.vpp_session.add_vpp_config()
1745 self.vpp_session.admin_up()
1746 self.test_session = BFDTestSession(
1747 self, self.pg0, AF_INET, sha1_key=key,
1748 bfd_key_id=self.vpp_session.bfd_key_id)
1749 bfd_session_up(self)
1751 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1752 def test_hold_up(self):
1753 """ hold BFD session up """
1754 key = self.factory.create_random_key(self)
1755 key.add_vpp_config()
1756 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1757 self.pg0.remote_ip4,
1759 self.vpp_session.add_vpp_config()
1760 self.vpp_session.admin_up()
1761 self.test_session = BFDTestSession(
1762 self, self.pg0, AF_INET, sha1_key=key,
1763 bfd_key_id=self.vpp_session.bfd_key_id)
1764 bfd_session_up(self)
1765 for dummy in range(self.test_session.detect_mult * 2):
1766 wait_for_bfd_packet(self)
1767 self.test_session.send_packet()
1768 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1770 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1771 def test_hold_up_meticulous(self):
1772 """ hold BFD session up - meticulous auth """
1773 key = self.factory.create_random_key(
1774 self, BFDAuthType.meticulous_keyed_sha1)
1775 key.add_vpp_config()
1776 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1777 self.pg0.remote_ip4, sha1_key=key)
1778 self.vpp_session.add_vpp_config()
1779 self.vpp_session.admin_up()
1780 # specify sequence number so that it wraps
1781 self.test_session = BFDTestSession(
1782 self, self.pg0, AF_INET, sha1_key=key,
1783 bfd_key_id=self.vpp_session.bfd_key_id,
1784 our_seq_number=0xFFFFFFFF - 4)
1785 bfd_session_up(self)
1786 for dummy in range(30):
1787 wait_for_bfd_packet(self)
1788 self.test_session.inc_seq_num()
1789 self.test_session.send_packet()
1790 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1792 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1793 def test_send_bad_seq_number(self):
1794 """ session is not kept alive by msgs with bad sequence numbers"""
1795 key = self.factory.create_random_key(
1796 self, BFDAuthType.meticulous_keyed_sha1)
1797 key.add_vpp_config()
1798 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1799 self.pg0.remote_ip4, sha1_key=key)
1800 self.vpp_session.add_vpp_config()
1801 self.test_session = BFDTestSession(
1802 self, self.pg0, AF_INET, sha1_key=key,
1803 bfd_key_id=self.vpp_session.bfd_key_id)
1804 bfd_session_up(self)
1805 detection_time = self.test_session.detect_mult *\
1806 self.vpp_session.required_min_rx / USEC_IN_SEC
1807 send_until = time.time() + 2 * detection_time
1808 while time.time() < send_until:
1809 self.test_session.send_packet()
1810 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1811 "time between bfd packets")
1812 e = self.vapi.collect_events()
1813 # session should be down now, because the sequence numbers weren't
1815 self.assert_equal(len(e), 1, "number of bfd events")
1816 verify_event(self, e[0], expected_state=BFDState.down)
1818 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1819 legitimate_test_session,
1821 rogue_bfd_values=None):
1822 """ execute a rogue session interaction scenario
1824 1. create vpp session, add config
1825 2. bring the legitimate session up
1826 3. copy the bfd values from legitimate session to rogue session
1827 4. apply rogue_bfd_values to rogue session
1828 5. set rogue session state to down
1829 6. send message to take the session down from the rogue session
1830 7. assert that the legitimate session is unaffected
1833 self.vpp_session = vpp_bfd_udp_session
1834 self.vpp_session.add_vpp_config()
1835 self.test_session = legitimate_test_session
1836 # bring vpp session up
1837 bfd_session_up(self)
1838 # send packet from rogue session
1839 rogue_test_session.update(
1840 my_discriminator=self.test_session.my_discriminator,
1841 your_discriminator=self.test_session.your_discriminator,
1842 desired_min_tx=self.test_session.desired_min_tx,
1843 required_min_rx=self.test_session.required_min_rx,
1844 detect_mult=self.test_session.detect_mult,
1845 diag=self.test_session.diag,
1846 state=self.test_session.state,
1847 auth_type=self.test_session.auth_type)
1848 if rogue_bfd_values:
1849 rogue_test_session.update(**rogue_bfd_values)
1850 rogue_test_session.update(state=BFDState.down)
1851 rogue_test_session.send_packet()
1852 wait_for_bfd_packet(self)
1853 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1855 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1856 def test_mismatch_auth(self):
1857 """ session is not brought down by unauthenticated msg """
1858 key = self.factory.create_random_key(self)
1859 key.add_vpp_config()
1860 vpp_session = VppBFDUDPSession(
1861 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1862 legitimate_test_session = BFDTestSession(
1863 self, self.pg0, AF_INET, sha1_key=key,
1864 bfd_key_id=vpp_session.bfd_key_id)
1865 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1866 self.execute_rogue_session_scenario(vpp_session,
1867 legitimate_test_session,
1870 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1871 def test_mismatch_bfd_key_id(self):
1872 """ session is not brought down by msg with non-existent key-id """
1873 key = self.factory.create_random_key(self)
1874 key.add_vpp_config()
1875 vpp_session = VppBFDUDPSession(
1876 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1877 # pick a different random bfd key id
1879 while x == vpp_session.bfd_key_id:
1881 legitimate_test_session = BFDTestSession(
1882 self, self.pg0, AF_INET, sha1_key=key,
1883 bfd_key_id=vpp_session.bfd_key_id)
1884 rogue_test_session = BFDTestSession(
1885 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1886 self.execute_rogue_session_scenario(vpp_session,
1887 legitimate_test_session,
1890 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1891 def test_mismatched_auth_type(self):
1892 """ session is not brought down by msg with wrong auth type """
1893 key = self.factory.create_random_key(self)
1894 key.add_vpp_config()
1895 vpp_session = VppBFDUDPSession(
1896 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1897 legitimate_test_session = BFDTestSession(
1898 self, self.pg0, AF_INET, sha1_key=key,
1899 bfd_key_id=vpp_session.bfd_key_id)
1900 rogue_test_session = BFDTestSession(
1901 self, self.pg0, AF_INET, sha1_key=key,
1902 bfd_key_id=vpp_session.bfd_key_id)
1903 self.execute_rogue_session_scenario(
1904 vpp_session, legitimate_test_session, rogue_test_session,
1905 {'auth_type': BFDAuthType.keyed_md5})
1907 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1908 def test_restart(self):
1909 """ simulate remote peer restart and resynchronization """
1910 key = self.factory.create_random_key(
1911 self, BFDAuthType.meticulous_keyed_sha1)
1912 key.add_vpp_config()
1913 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1914 self.pg0.remote_ip4, sha1_key=key)
1915 self.vpp_session.add_vpp_config()
1916 self.test_session = BFDTestSession(
1917 self, self.pg0, AF_INET, sha1_key=key,
1918 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1919 bfd_session_up(self)
1920 # don't send any packets for 2*detection_time
1921 detection_time = self.test_session.detect_mult *\
1922 self.vpp_session.required_min_rx / USEC_IN_SEC
1923 self.sleep(2 * detection_time, "simulating peer restart")
1924 events = self.vapi.collect_events()
1925 self.assert_equal(len(events), 1, "number of bfd events")
1926 verify_event(self, events[0], expected_state=BFDState.down)
1927 self.test_session.update(state=BFDState.down)
1928 # reset sequence number
1929 self.test_session.our_seq_number = 0
1930 self.test_session.vpp_seq_number = None
1931 # now throw away any pending packets
1932 self.pg0.enable_capture()
1933 bfd_session_up(self)
1936 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1937 class BFDAuthOnOffTestCase(VppTestCase):
1938 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1945 def setUpClass(cls):
1946 super(BFDAuthOnOffTestCase, cls).setUpClass()
1948 cls.create_pg_interfaces([0])
1949 cls.pg0.config_ip4()
1951 cls.pg0.resolve_arp()
1954 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1958 super(BFDAuthOnOffTestCase, self).setUp()
1959 self.factory = AuthKeyFactory()
1960 self.vapi.want_bfd_events()
1961 self.pg0.enable_capture()
1964 if not self.vpp_dead:
1965 self.vapi.want_bfd_events(enable_disable=0)
1966 self.vapi.collect_events() # clear the event queue
1967 super(BFDAuthOnOffTestCase, self).tearDown()
1969 def test_auth_on_immediate(self):
1970 """ turn auth on without disturbing session state (immediate) """
1971 key = self.factory.create_random_key(self)
1972 key.add_vpp_config()
1973 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1974 self.pg0.remote_ip4)
1975 self.vpp_session.add_vpp_config()
1976 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1977 bfd_session_up(self)
1978 for dummy in range(self.test_session.detect_mult * 2):
1979 p = wait_for_bfd_packet(self)
1980 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1981 self.test_session.send_packet()
1982 self.vpp_session.activate_auth(key)
1983 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1984 self.test_session.sha1_key = key
1985 for dummy in range(self.test_session.detect_mult * 2):
1986 p = wait_for_bfd_packet(self)
1987 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1988 self.test_session.send_packet()
1989 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1990 self.assert_equal(len(self.vapi.collect_events()), 0,
1991 "number of bfd events")
1993 def test_auth_off_immediate(self):
1994 """ turn auth off without disturbing session state (immediate) """
1995 key = self.factory.create_random_key(self)
1996 key.add_vpp_config()
1997 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1998 self.pg0.remote_ip4, sha1_key=key)
1999 self.vpp_session.add_vpp_config()
2000 self.test_session = BFDTestSession(
2001 self, self.pg0, AF_INET, sha1_key=key,
2002 bfd_key_id=self.vpp_session.bfd_key_id)
2003 bfd_session_up(self)
2004 # self.vapi.want_bfd_events(enable_disable=0)
2005 for dummy in range(self.test_session.detect_mult * 2):
2006 p = wait_for_bfd_packet(self)
2007 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2008 self.test_session.inc_seq_num()
2009 self.test_session.send_packet()
2010 self.vpp_session.deactivate_auth()
2011 self.test_session.bfd_key_id = None
2012 self.test_session.sha1_key = None
2013 for dummy in range(self.test_session.detect_mult * 2):
2014 p = wait_for_bfd_packet(self)
2015 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2016 self.test_session.inc_seq_num()
2017 self.test_session.send_packet()
2018 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2019 self.assert_equal(len(self.vapi.collect_events()), 0,
2020 "number of bfd events")
2022 def test_auth_change_key_immediate(self):
2023 """ change auth key without disturbing session state (immediate) """
2024 key1 = self.factory.create_random_key(self)
2025 key1.add_vpp_config()
2026 key2 = self.factory.create_random_key(self)
2027 key2.add_vpp_config()
2028 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2029 self.pg0.remote_ip4, sha1_key=key1)
2030 self.vpp_session.add_vpp_config()
2031 self.test_session = BFDTestSession(
2032 self, self.pg0, AF_INET, sha1_key=key1,
2033 bfd_key_id=self.vpp_session.bfd_key_id)
2034 bfd_session_up(self)
2035 for dummy in range(self.test_session.detect_mult * 2):
2036 p = wait_for_bfd_packet(self)
2037 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2038 self.test_session.send_packet()
2039 self.vpp_session.activate_auth(key2)
2040 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2041 self.test_session.sha1_key = key2
2042 for dummy in range(self.test_session.detect_mult * 2):
2043 p = wait_for_bfd_packet(self)
2044 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2045 self.test_session.send_packet()
2046 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2047 self.assert_equal(len(self.vapi.collect_events()), 0,
2048 "number of bfd events")
2050 def test_auth_on_delayed(self):
2051 """ turn auth on without disturbing session state (delayed) """
2052 key = self.factory.create_random_key(self)
2053 key.add_vpp_config()
2054 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2055 self.pg0.remote_ip4)
2056 self.vpp_session.add_vpp_config()
2057 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2058 bfd_session_up(self)
2059 for dummy in range(self.test_session.detect_mult * 2):
2060 wait_for_bfd_packet(self)
2061 self.test_session.send_packet()
2062 self.vpp_session.activate_auth(key, delayed=True)
2063 for dummy in range(self.test_session.detect_mult * 2):
2064 p = wait_for_bfd_packet(self)
2065 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2066 self.test_session.send_packet()
2067 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2068 self.test_session.sha1_key = key
2069 self.test_session.send_packet()
2070 for dummy in range(self.test_session.detect_mult * 2):
2071 p = wait_for_bfd_packet(self)
2072 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2073 self.test_session.send_packet()
2074 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2075 self.assert_equal(len(self.vapi.collect_events()), 0,
2076 "number of bfd events")
2078 def test_auth_off_delayed(self):
2079 """ turn auth off without disturbing session state (delayed) """
2080 key = self.factory.create_random_key(self)
2081 key.add_vpp_config()
2082 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2083 self.pg0.remote_ip4, sha1_key=key)
2084 self.vpp_session.add_vpp_config()
2085 self.test_session = BFDTestSession(
2086 self, self.pg0, AF_INET, sha1_key=key,
2087 bfd_key_id=self.vpp_session.bfd_key_id)
2088 bfd_session_up(self)
2089 for dummy in range(self.test_session.detect_mult * 2):
2090 p = wait_for_bfd_packet(self)
2091 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2092 self.test_session.send_packet()
2093 self.vpp_session.deactivate_auth(delayed=True)
2094 for dummy in range(self.test_session.detect_mult * 2):
2095 p = wait_for_bfd_packet(self)
2096 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2097 self.test_session.send_packet()
2098 self.test_session.bfd_key_id = None
2099 self.test_session.sha1_key = None
2100 self.test_session.send_packet()
2101 for dummy in range(self.test_session.detect_mult * 2):
2102 p = wait_for_bfd_packet(self)
2103 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2104 self.test_session.send_packet()
2105 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2106 self.assert_equal(len(self.vapi.collect_events()), 0,
2107 "number of bfd events")
2109 def test_auth_change_key_delayed(self):
2110 """ change auth key without disturbing session state (delayed) """
2111 key1 = self.factory.create_random_key(self)
2112 key1.add_vpp_config()
2113 key2 = self.factory.create_random_key(self)
2114 key2.add_vpp_config()
2115 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2116 self.pg0.remote_ip4, sha1_key=key1)
2117 self.vpp_session.add_vpp_config()
2118 self.vpp_session.admin_up()
2119 self.test_session = BFDTestSession(
2120 self, self.pg0, AF_INET, sha1_key=key1,
2121 bfd_key_id=self.vpp_session.bfd_key_id)
2122 bfd_session_up(self)
2123 for dummy in range(self.test_session.detect_mult * 2):
2124 p = wait_for_bfd_packet(self)
2125 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2126 self.test_session.send_packet()
2127 self.vpp_session.activate_auth(key2, delayed=True)
2128 for dummy in range(self.test_session.detect_mult * 2):
2129 p = wait_for_bfd_packet(self)
2130 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2131 self.test_session.send_packet()
2132 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2133 self.test_session.sha1_key = key2
2134 self.test_session.send_packet()
2135 for dummy in range(self.test_session.detect_mult * 2):
2136 p = wait_for_bfd_packet(self)
2137 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2138 self.test_session.send_packet()
2139 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2140 self.assert_equal(len(self.vapi.collect_events()), 0,
2141 "number of bfd events")
2144 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2145 class BFDCLITestCase(VppTestCase):
2146 """Bidirectional Forwarding Detection (BFD) (CLI) """
2150 def setUpClass(cls):
2151 super(BFDCLITestCase, cls).setUpClass()
2154 cls.create_pg_interfaces((0,))
2155 cls.pg0.config_ip4()
2156 cls.pg0.config_ip6()
2157 cls.pg0.resolve_arp()
2158 cls.pg0.resolve_ndp()
2161 super(BFDCLITestCase, cls).tearDownClass()
2165 super(BFDCLITestCase, self).setUp()
2166 self.factory = AuthKeyFactory()
2167 self.pg0.enable_capture()
2171 self.vapi.want_bfd_events(enable_disable=0)
2172 except UnexpectedApiReturnValueError:
2173 # some tests aren't subscribed, so this is not an issue
2175 self.vapi.collect_events() # clear the event queue
2176 super(BFDCLITestCase, self).tearDown()
2178 def cli_verify_no_response(self, cli):
2179 """ execute a CLI, asserting that the response is empty """
2180 self.assert_equal(self.vapi.cli(cli),
2182 "CLI command response")
2184 def cli_verify_response(self, cli, expected):
2185 """ execute a CLI, asserting that the response matches expectation """
2186 self.assert_equal(self.vapi.cli(cli).strip(),
2188 "CLI command response")
2190 def test_show(self):
2191 """ show commands """
2192 k1 = self.factory.create_random_key(self)
2194 k2 = self.factory.create_random_key(
2195 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2197 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2199 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2202 self.logger.info(self.vapi.ppcli("show bfd keys"))
2203 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2204 self.logger.info(self.vapi.ppcli("show bfd"))
2206 def test_set_del_sha1_key(self):
2207 """ set/delete SHA1 auth key """
2208 k = self.factory.create_random_key(self)
2209 self.registry.register(k, self.logger)
2210 self.cli_verify_no_response(
2211 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2213 "".join("{:02x}".format(ord(c)) for c in k.key)))
2214 self.assertTrue(k.query_vpp_config())
2215 self.vpp_session = VppBFDUDPSession(
2216 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2217 self.vpp_session.add_vpp_config()
2218 self.test_session = \
2219 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2220 bfd_key_id=self.vpp_session.bfd_key_id)
2221 self.vapi.want_bfd_events()
2222 bfd_session_up(self)
2223 bfd_session_down(self)
2224 # try to replace the secret for the key - should fail because the key
2226 k2 = self.factory.create_random_key(self)
2227 self.cli_verify_response(
2228 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2230 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2231 "bfd key set: `bfd_auth_set_key' API call failed, "
2232 "rv=-103:BFD object in use")
2233 # manipulating the session using old secret should still work
2234 bfd_session_up(self)
2235 bfd_session_down(self)
2236 self.vpp_session.remove_vpp_config()
2237 self.cli_verify_no_response(
2238 "bfd key del conf-key-id %s" % k.conf_key_id)
2239 self.assertFalse(k.query_vpp_config())
2241 def test_set_del_meticulous_sha1_key(self):
2242 """ set/delete meticulous SHA1 auth key """
2243 k = self.factory.create_random_key(
2244 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2245 self.registry.register(k, self.logger)
2246 self.cli_verify_no_response(
2247 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2249 "".join("{:02x}".format(ord(c)) for c in k.key)))
2250 self.assertTrue(k.query_vpp_config())
2251 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2252 self.pg0.remote_ip6, af=AF_INET6,
2254 self.vpp_session.add_vpp_config()
2255 self.vpp_session.admin_up()
2256 self.test_session = \
2257 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2258 bfd_key_id=self.vpp_session.bfd_key_id)
2259 self.vapi.want_bfd_events()
2260 bfd_session_up(self)
2261 bfd_session_down(self)
2262 # try to replace the secret for the key - should fail because the key
2264 k2 = self.factory.create_random_key(self)
2265 self.cli_verify_response(
2266 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2268 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2269 "bfd key set: `bfd_auth_set_key' API call failed, "
2270 "rv=-103:BFD object in use")
2271 # manipulating the session using old secret should still work
2272 bfd_session_up(self)
2273 bfd_session_down(self)
2274 self.vpp_session.remove_vpp_config()
2275 self.cli_verify_no_response(
2276 "bfd key del conf-key-id %s" % k.conf_key_id)
2277 self.assertFalse(k.query_vpp_config())
2279 def test_add_mod_del_bfd_udp(self):
2280 """ create/modify/delete IPv4 BFD UDP session """
2281 vpp_session = VppBFDUDPSession(
2282 self, self.pg0, self.pg0.remote_ip4)
2283 self.registry.register(vpp_session, self.logger)
2284 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2285 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2286 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2287 self.pg0.remote_ip4,
2288 vpp_session.desired_min_tx,
2289 vpp_session.required_min_rx,
2290 vpp_session.detect_mult)
2291 self.cli_verify_no_response(cli_add_cmd)
2292 # 2nd add should fail
2293 self.cli_verify_response(
2295 "bfd udp session add: `bfd_add_add_session' API call"
2296 " failed, rv=-101:Duplicate BFD object")
2297 verify_bfd_session_config(self, vpp_session)
2298 mod_session = VppBFDUDPSession(
2299 self, self.pg0, self.pg0.remote_ip4,
2300 required_min_rx=2 * vpp_session.required_min_rx,
2301 desired_min_tx=3 * vpp_session.desired_min_tx,
2302 detect_mult=4 * vpp_session.detect_mult)
2303 self.cli_verify_no_response(
2304 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2305 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2306 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2307 mod_session.desired_min_tx, mod_session.required_min_rx,
2308 mod_session.detect_mult))
2309 verify_bfd_session_config(self, mod_session)
2310 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2311 "peer-addr %s" % (self.pg0.name,
2312 self.pg0.local_ip4, self.pg0.remote_ip4)
2313 self.cli_verify_no_response(cli_del_cmd)
2314 # 2nd del is expected to fail
2315 self.cli_verify_response(
2316 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2317 " failed, rv=-102:No such BFD object")
2318 self.assertFalse(vpp_session.query_vpp_config())
2320 def test_add_mod_del_bfd_udp6(self):
2321 """ create/modify/delete IPv6 BFD UDP session """
2322 vpp_session = VppBFDUDPSession(
2323 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2324 self.registry.register(vpp_session, self.logger)
2325 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2326 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2327 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2328 self.pg0.remote_ip6,
2329 vpp_session.desired_min_tx,
2330 vpp_session.required_min_rx,
2331 vpp_session.detect_mult)
2332 self.cli_verify_no_response(cli_add_cmd)
2333 # 2nd add should fail
2334 self.cli_verify_response(
2336 "bfd udp session add: `bfd_add_add_session' API call"
2337 " failed, rv=-101:Duplicate BFD object")
2338 verify_bfd_session_config(self, vpp_session)
2339 mod_session = VppBFDUDPSession(
2340 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2341 required_min_rx=2 * vpp_session.required_min_rx,
2342 desired_min_tx=3 * vpp_session.desired_min_tx,
2343 detect_mult=4 * vpp_session.detect_mult)
2344 self.cli_verify_no_response(
2345 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2346 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2347 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2348 mod_session.desired_min_tx,
2349 mod_session.required_min_rx, mod_session.detect_mult))
2350 verify_bfd_session_config(self, mod_session)
2351 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2352 "peer-addr %s" % (self.pg0.name,
2353 self.pg0.local_ip6, self.pg0.remote_ip6)
2354 self.cli_verify_no_response(cli_del_cmd)
2355 # 2nd del is expected to fail
2356 self.cli_verify_response(
2358 "bfd udp session del: `bfd_udp_del_session' API call"
2359 " failed, rv=-102:No such BFD object")
2360 self.assertFalse(vpp_session.query_vpp_config())
2362 def test_add_mod_del_bfd_udp_auth(self):
2363 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2364 key = self.factory.create_random_key(self)
2365 key.add_vpp_config()
2366 vpp_session = VppBFDUDPSession(
2367 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2368 self.registry.register(vpp_session, self.logger)
2369 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2370 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2371 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2372 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2373 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2374 vpp_session.detect_mult, key.conf_key_id,
2375 vpp_session.bfd_key_id)
2376 self.cli_verify_no_response(cli_add_cmd)
2377 # 2nd add should fail
2378 self.cli_verify_response(
2380 "bfd udp session add: `bfd_add_add_session' API call"
2381 " failed, rv=-101:Duplicate BFD object")
2382 verify_bfd_session_config(self, vpp_session)
2383 mod_session = VppBFDUDPSession(
2384 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2385 bfd_key_id=vpp_session.bfd_key_id,
2386 required_min_rx=2 * vpp_session.required_min_rx,
2387 desired_min_tx=3 * vpp_session.desired_min_tx,
2388 detect_mult=4 * vpp_session.detect_mult)
2389 self.cli_verify_no_response(
2390 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2391 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2392 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2393 mod_session.desired_min_tx,
2394 mod_session.required_min_rx, mod_session.detect_mult))
2395 verify_bfd_session_config(self, mod_session)
2396 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2397 "peer-addr %s" % (self.pg0.name,
2398 self.pg0.local_ip4, self.pg0.remote_ip4)
2399 self.cli_verify_no_response(cli_del_cmd)
2400 # 2nd del is expected to fail
2401 self.cli_verify_response(
2403 "bfd udp session del: `bfd_udp_del_session' API call"
2404 " failed, rv=-102:No such BFD object")
2405 self.assertFalse(vpp_session.query_vpp_config())
2407 def test_add_mod_del_bfd_udp6_auth(self):
2408 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2409 key = self.factory.create_random_key(
2410 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2411 key.add_vpp_config()
2412 vpp_session = VppBFDUDPSession(
2413 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2414 self.registry.register(vpp_session, self.logger)
2415 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2416 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2417 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2418 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2419 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2420 vpp_session.detect_mult, key.conf_key_id,
2421 vpp_session.bfd_key_id)
2422 self.cli_verify_no_response(cli_add_cmd)
2423 # 2nd add should fail
2424 self.cli_verify_response(
2426 "bfd udp session add: `bfd_add_add_session' API call"
2427 " failed, rv=-101:Duplicate BFD object")
2428 verify_bfd_session_config(self, vpp_session)
2429 mod_session = VppBFDUDPSession(
2430 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2431 bfd_key_id=vpp_session.bfd_key_id,
2432 required_min_rx=2 * vpp_session.required_min_rx,
2433 desired_min_tx=3 * vpp_session.desired_min_tx,
2434 detect_mult=4 * vpp_session.detect_mult)
2435 self.cli_verify_no_response(
2436 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2437 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2438 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2439 mod_session.desired_min_tx,
2440 mod_session.required_min_rx, mod_session.detect_mult))
2441 verify_bfd_session_config(self, mod_session)
2442 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2443 "peer-addr %s" % (self.pg0.name,
2444 self.pg0.local_ip6, self.pg0.remote_ip6)
2445 self.cli_verify_no_response(cli_del_cmd)
2446 # 2nd del is expected to fail
2447 self.cli_verify_response(
2449 "bfd udp session del: `bfd_udp_del_session' API call"
2450 " failed, rv=-102:No such BFD object")
2451 self.assertFalse(vpp_session.query_vpp_config())
2453 def test_auth_on_off(self):
2454 """ turn authentication on and off """
2455 key = self.factory.create_random_key(
2456 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2457 key.add_vpp_config()
2458 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2459 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2461 session.add_vpp_config()
2463 "bfd udp session auth activate interface %s local-addr %s "\
2464 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2465 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2466 key.conf_key_id, auth_session.bfd_key_id)
2467 self.cli_verify_no_response(cli_activate)
2468 verify_bfd_session_config(self, auth_session)
2469 self.cli_verify_no_response(cli_activate)
2470 verify_bfd_session_config(self, auth_session)
2472 "bfd udp session auth deactivate interface %s local-addr %s "\
2474 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2475 self.cli_verify_no_response(cli_deactivate)
2476 verify_bfd_session_config(self, session)
2477 self.cli_verify_no_response(cli_deactivate)
2478 verify_bfd_session_config(self, session)
2480 def test_auth_on_off_delayed(self):
2481 """ turn authentication on and off (delayed) """
2482 key = self.factory.create_random_key(
2483 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2484 key.add_vpp_config()
2485 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2486 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2488 session.add_vpp_config()
2490 "bfd udp session auth activate interface %s local-addr %s "\
2491 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2492 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2493 key.conf_key_id, auth_session.bfd_key_id)
2494 self.cli_verify_no_response(cli_activate)
2495 verify_bfd_session_config(self, auth_session)
2496 self.cli_verify_no_response(cli_activate)
2497 verify_bfd_session_config(self, auth_session)
2499 "bfd udp session auth deactivate interface %s local-addr %s "\
2500 "peer-addr %s delayed yes"\
2501 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2502 self.cli_verify_no_response(cli_deactivate)
2503 verify_bfd_session_config(self, session)
2504 self.cli_verify_no_response(cli_deactivate)
2505 verify_bfd_session_config(self, session)
2507 def test_admin_up_down(self):
2508 """ put session admin-up and admin-down """
2509 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2510 session.add_vpp_config()
2512 "bfd udp session set-flags admin down interface %s local-addr %s "\
2514 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2516 "bfd udp session set-flags admin up interface %s local-addr %s "\
2518 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2519 self.cli_verify_no_response(cli_down)
2520 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2521 self.cli_verify_no_response(cli_up)
2522 verify_bfd_session_config(self, session, state=BFDState.down)
2524 def test_set_del_udp_echo_source(self):
2525 """ set/del udp echo source """
2526 self.create_loopback_interfaces([0])
2527 self.loopback0 = self.lo_interfaces[0]
2528 self.loopback0.admin_up()
2529 self.cli_verify_response("show bfd echo-source",
2530 "UDP echo source is not set.")
2531 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2532 self.cli_verify_no_response(cli_set)
2533 self.cli_verify_response("show bfd echo-source",
2534 "UDP echo source is: %s\n"
2535 "IPv4 address usable as echo source: none\n"
2536 "IPv6 address usable as echo source: none" %
2537 self.loopback0.name)
2538 self.loopback0.config_ip4()
2539 unpacked = unpack("!L", self.loopback0.local_ip4n)
2540 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2541 self.cli_verify_response("show bfd echo-source",
2542 "UDP echo source is: %s\n"
2543 "IPv4 address usable as echo source: %s\n"
2544 "IPv6 address usable as echo source: none" %
2545 (self.loopback0.name, echo_ip4))
2546 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2547 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2548 unpacked[2], unpacked[3] ^ 1))
2549 self.loopback0.config_ip6()
2550 self.cli_verify_response("show bfd echo-source",
2551 "UDP echo source is: %s\n"
2552 "IPv4 address usable as echo source: %s\n"
2553 "IPv6 address usable as echo source: %s" %
2554 (self.loopback0.name, echo_ip4, echo_ip6))
2555 cli_del = "bfd udp echo-source del"
2556 self.cli_verify_no_response(cli_del)
2557 self.cli_verify_response("show bfd echo-source",
2558 "UDP echo source is not set.")
2560 if __name__ == '__main__':
2561 unittest.main(testRunner=VppTestRunner)