4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22 from vpp_ip_route import VppIpRoute, VppRoutePath
27 class AuthKeyFactory(object):
28 """Factory class for creating auth keys with unique conf key ID"""
31 self._conf_key_ids = {}
33 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
34 """ create a random key with unique conf key id """
35 conf_key_id = randint(0, 0xFFFFFFFF)
36 while conf_key_id in self._conf_key_ids:
37 conf_key_id = randint(0, 0xFFFFFFFF)
38 self._conf_key_ids[conf_key_id] = 1
39 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
40 return VppBFDAuthKey(test=test, auth_type=auth_type,
41 conf_key_id=conf_key_id, key=key)
44 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
45 class BFDAPITestCase(VppTestCase):
46 """Bidirectional Forwarding Detection (BFD) - API"""
53 super(BFDAPITestCase, cls).setUpClass()
56 cls.create_pg_interfaces(range(2))
57 for i in cls.pg_interfaces:
63 super(BFDAPITestCase, cls).tearDownClass()
67 super(BFDAPITestCase, self).setUp()
68 self.factory = AuthKeyFactory()
70 def test_add_bfd(self):
71 """ create a BFD session """
72 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
73 session.add_vpp_config()
74 self.logger.debug("Session state is %s", session.state)
75 session.remove_vpp_config()
76 session.add_vpp_config()
77 self.logger.debug("Session state is %s", session.state)
78 session.remove_vpp_config()
80 def test_double_add(self):
81 """ create the same BFD session twice (negative case) """
82 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
83 session.add_vpp_config()
85 with self.vapi.expect_negative_api_retval():
86 session.add_vpp_config()
88 session.remove_vpp_config()
90 def test_add_bfd6(self):
91 """ create IPv6 BFD session """
92 session = VppBFDUDPSession(
93 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
94 session.add_vpp_config()
95 self.logger.debug("Session state is %s", session.state)
96 session.remove_vpp_config()
97 session.add_vpp_config()
98 self.logger.debug("Session state is %s", session.state)
99 session.remove_vpp_config()
101 def test_mod_bfd(self):
102 """ modify BFD session parameters """
103 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
104 desired_min_tx=50000,
105 required_min_rx=10000,
107 session.add_vpp_config()
108 s = session.get_bfd_udp_session_dump_entry()
109 self.assert_equal(session.desired_min_tx,
111 "desired min transmit interval")
112 self.assert_equal(session.required_min_rx,
114 "required min receive interval")
115 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
116 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
117 required_min_rx=session.required_min_rx * 2,
118 detect_mult=session.detect_mult * 2)
119 s = session.get_bfd_udp_session_dump_entry()
120 self.assert_equal(session.desired_min_tx,
122 "desired min transmit interval")
123 self.assert_equal(session.required_min_rx,
125 "required min receive interval")
126 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
128 def test_add_sha1_keys(self):
129 """ add SHA1 keys """
131 keys = [self.factory.create_random_key(
132 self) for i in range(0, key_count)]
134 self.assertFalse(key.query_vpp_config())
138 self.assertTrue(key.query_vpp_config())
140 indexes = range(key_count)
145 key.remove_vpp_config()
147 for j in range(key_count):
150 self.assertFalse(key.query_vpp_config())
152 self.assertTrue(key.query_vpp_config())
153 # should be removed now
155 self.assertFalse(key.query_vpp_config())
156 # add back and remove again
160 self.assertTrue(key.query_vpp_config())
162 key.remove_vpp_config()
164 self.assertFalse(key.query_vpp_config())
166 def test_add_bfd_sha1(self):
167 """ create a BFD session (SHA1) """
168 key = self.factory.create_random_key(self)
170 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
172 session.add_vpp_config()
173 self.logger.debug("Session state is %s", session.state)
174 session.remove_vpp_config()
175 session.add_vpp_config()
176 self.logger.debug("Session state is %s", session.state)
177 session.remove_vpp_config()
179 def test_double_add_sha1(self):
180 """ create the same BFD session twice (negative case) (SHA1) """
181 key = self.factory.create_random_key(self)
183 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185 session.add_vpp_config()
186 with self.assertRaises(Exception):
187 session.add_vpp_config()
189 def test_add_auth_nonexistent_key(self):
190 """ create BFD session using non-existent SHA1 (negative case) """
191 session = VppBFDUDPSession(
192 self, self.pg0, self.pg0.remote_ip4,
193 sha1_key=self.factory.create_random_key(self))
194 with self.assertRaises(Exception):
195 session.add_vpp_config()
197 def test_shared_sha1_key(self):
198 """ share single SHA1 key between multiple BFD sessions """
199 key = self.factory.create_random_key(self)
202 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
204 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
205 sha1_key=key, af=AF_INET6),
206 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
208 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
209 sha1_key=key, af=AF_INET6)]
214 e = key.get_bfd_auth_keys_dump_entry()
215 self.assert_equal(e.use_count, len(sessions) - removed,
216 "Use count for shared key")
217 s.remove_vpp_config()
219 e = key.get_bfd_auth_keys_dump_entry()
220 self.assert_equal(e.use_count, len(sessions) - removed,
221 "Use count for shared key")
223 def test_activate_auth(self):
224 """ activate SHA1 authentication """
225 key = self.factory.create_random_key(self)
227 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
228 session.add_vpp_config()
229 session.activate_auth(key)
231 def test_deactivate_auth(self):
232 """ deactivate SHA1 authentication """
233 key = self.factory.create_random_key(self)
235 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
236 session.add_vpp_config()
237 session.activate_auth(key)
238 session.deactivate_auth()
240 def test_change_key(self):
241 """ change SHA1 key """
242 key1 = self.factory.create_random_key(self)
243 key2 = self.factory.create_random_key(self)
244 while key2.conf_key_id == key1.conf_key_id:
245 key2 = self.factory.create_random_key(self)
246 key1.add_vpp_config()
247 key2.add_vpp_config()
248 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
250 session.add_vpp_config()
251 session.activate_auth(key2)
254 class BFDTestSession(object):
255 """ BFD session as seen from test framework side """
257 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
258 bfd_key_id=None, our_seq_number=None):
261 self.sha1_key = sha1_key
262 self.bfd_key_id = bfd_key_id
263 self.interface = interface
264 self.udp_sport = randint(49152, 65535)
265 if our_seq_number is None:
266 self.our_seq_number = randint(0, 40000000)
268 self.our_seq_number = our_seq_number
269 self.vpp_seq_number = None
270 self.my_discriminator = 0
271 self.desired_min_tx = 300000
272 self.required_min_rx = 300000
273 self.required_min_echo_rx = None
274 self.detect_mult = detect_mult
275 self.diag = BFDDiagCode.no_diagnostic
276 self.your_discriminator = None
277 self.state = BFDState.down
278 self.auth_type = BFDAuthType.no_auth
280 def inc_seq_num(self):
281 """ increment sequence number, wrapping if needed """
282 if self.our_seq_number == 0xFFFFFFFF:
283 self.our_seq_number = 0
285 self.our_seq_number += 1
287 def update(self, my_discriminator=None, your_discriminator=None,
288 desired_min_tx=None, required_min_rx=None,
289 required_min_echo_rx=None, detect_mult=None,
290 diag=None, state=None, auth_type=None):
291 """ update BFD parameters associated with session """
292 if my_discriminator is not None:
293 self.my_discriminator = my_discriminator
294 if your_discriminator is not None:
295 self.your_discriminator = your_discriminator
296 if required_min_rx is not None:
297 self.required_min_rx = required_min_rx
298 if required_min_echo_rx is not None:
299 self.required_min_echo_rx = required_min_echo_rx
300 if desired_min_tx is not None:
301 self.desired_min_tx = desired_min_tx
302 if detect_mult is not None:
303 self.detect_mult = detect_mult
306 if state is not None:
308 if auth_type is not None:
309 self.auth_type = auth_type
311 def fill_packet_fields(self, packet):
312 """ set packet fields with known values in packet """
314 if self.my_discriminator:
315 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
316 self.my_discriminator)
317 bfd.my_discriminator = self.my_discriminator
318 if self.your_discriminator:
319 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
320 self.your_discriminator)
321 bfd.your_discriminator = self.your_discriminator
322 if self.required_min_rx:
323 self.test.logger.debug(
324 "BFD: setting packet.required_min_rx_interval=%s",
325 self.required_min_rx)
326 bfd.required_min_rx_interval = self.required_min_rx
327 if self.required_min_echo_rx:
328 self.test.logger.debug(
329 "BFD: setting packet.required_min_echo_rx=%s",
330 self.required_min_echo_rx)
331 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
332 if self.desired_min_tx:
333 self.test.logger.debug(
334 "BFD: setting packet.desired_min_tx_interval=%s",
336 bfd.desired_min_tx_interval = self.desired_min_tx
338 self.test.logger.debug(
339 "BFD: setting packet.detect_mult=%s", self.detect_mult)
340 bfd.detect_mult = self.detect_mult
342 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
345 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
346 bfd.state = self.state
348 # this is used by a negative test-case
349 self.test.logger.debug("BFD: setting packet.auth_type=%s",
351 bfd.auth_type = self.auth_type
353 def create_packet(self):
354 """ create a BFD packet, reflecting the current state of session """
357 bfd.auth_type = self.sha1_key.auth_type
358 bfd.auth_len = BFD.sha1_auth_len
359 bfd.auth_key_id = self.bfd_key_id
360 bfd.auth_seq_num = self.our_seq_number
361 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
364 if self.af == AF_INET6:
365 packet = (Ether(src=self.interface.remote_mac,
366 dst=self.interface.local_mac) /
367 IPv6(src=self.interface.remote_ip6,
368 dst=self.interface.local_ip6,
370 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
373 packet = (Ether(src=self.interface.remote_mac,
374 dst=self.interface.local_mac) /
375 IP(src=self.interface.remote_ip4,
376 dst=self.interface.local_ip4,
378 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
380 self.test.logger.debug("BFD: Creating packet")
381 self.fill_packet_fields(packet)
383 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
384 "\0" * (20 - len(self.sha1_key.key))
385 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
386 hashlib.sha1(hash_material).hexdigest())
387 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
390 def send_packet(self, packet=None, interface=None):
391 """ send packet on interface, creating the packet if needed """
393 packet = self.create_packet()
394 if interface is None:
395 interface = self.test.pg0
396 self.test.logger.debug(ppp("Sending packet:", packet))
397 interface.add_stream(packet)
400 def verify_sha1_auth(self, packet):
401 """ Verify correctness of authentication in BFD layer. """
403 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
404 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
406 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
407 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
408 if self.vpp_seq_number is None:
409 self.vpp_seq_number = bfd.auth_seq_num
410 self.test.logger.debug("Received initial sequence number: %s" %
413 recvd_seq_num = bfd.auth_seq_num
414 self.test.logger.debug("Received followup sequence number: %s" %
416 if self.vpp_seq_number < 0xffffffff:
417 if self.sha1_key.auth_type == \
418 BFDAuthType.meticulous_keyed_sha1:
419 self.test.assert_equal(recvd_seq_num,
420 self.vpp_seq_number + 1,
421 "BFD sequence number")
423 self.test.assert_in_range(recvd_seq_num,
425 self.vpp_seq_number + 1,
426 "BFD sequence number")
428 if self.sha1_key.auth_type == \
429 BFDAuthType.meticulous_keyed_sha1:
430 self.test.assert_equal(recvd_seq_num, 0,
431 "BFD sequence number")
433 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
434 "BFD sequence number not one of "
435 "(%s, 0)" % self.vpp_seq_number)
436 self.vpp_seq_number = recvd_seq_num
437 # last 20 bytes represent the hash - so replace them with the key,
438 # pad the result with zeros and hash the result
439 hash_material = bfd.original[:-20] + self.sha1_key.key + \
440 "\0" * (20 - len(self.sha1_key.key))
441 expected_hash = hashlib.sha1(hash_material).hexdigest()
442 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
443 expected_hash, "Auth key hash")
445 def verify_bfd(self, packet):
446 """ Verify correctness of BFD layer. """
448 self.test.assert_equal(bfd.version, 1, "BFD version")
449 self.test.assert_equal(bfd.your_discriminator,
450 self.my_discriminator,
451 "BFD - your discriminator")
453 self.verify_sha1_auth(packet)
456 def bfd_session_up(test):
457 """ Bring BFD session up """
458 test.logger.info("BFD: Waiting for slow hello")
459 p = wait_for_bfd_packet(test, 2)
461 if hasattr(test, 'vpp_clock_offset'):
462 old_offset = test.vpp_clock_offset
463 test.vpp_clock_offset = time.time() - p.time
464 test.logger.debug("BFD: Calculated vpp clock offset: %s",
465 test.vpp_clock_offset)
467 test.assertAlmostEqual(
468 old_offset, test.vpp_clock_offset, delta=0.5,
469 msg="vpp clock offset not stable (new: %s, old: %s)" %
470 (test.vpp_clock_offset, old_offset))
471 test.logger.info("BFD: Sending Init")
472 test.test_session.update(my_discriminator=randint(0, 40000000),
473 your_discriminator=p[BFD].my_discriminator,
475 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
476 BFDAuthType.meticulous_keyed_sha1:
477 test.test_session.inc_seq_num()
478 test.test_session.send_packet()
479 test.logger.info("BFD: Waiting for event")
480 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
481 verify_event(test, e, expected_state=BFDState.up)
482 test.logger.info("BFD: Session is Up")
483 test.test_session.update(state=BFDState.up)
484 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
485 BFDAuthType.meticulous_keyed_sha1:
486 test.test_session.inc_seq_num()
487 test.test_session.send_packet()
488 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
491 def bfd_session_down(test):
492 """ Bring BFD session down """
493 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
494 test.test_session.update(state=BFDState.down)
495 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
496 BFDAuthType.meticulous_keyed_sha1:
497 test.test_session.inc_seq_num()
498 test.test_session.send_packet()
499 test.logger.info("BFD: Waiting for event")
500 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
501 verify_event(test, e, expected_state=BFDState.down)
502 test.logger.info("BFD: Session is Down")
503 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
506 def verify_bfd_session_config(test, session, state=None):
507 dump = session.get_bfd_udp_session_dump_entry()
508 test.assertIsNotNone(dump)
509 # since dump is not none, we have verified that sw_if_index and addresses
510 # are valid (in get_bfd_udp_session_dump_entry)
512 test.assert_equal(dump.state, state, "session state")
513 test.assert_equal(dump.required_min_rx, session.required_min_rx,
514 "required min rx interval")
515 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
516 "desired min tx interval")
517 test.assert_equal(dump.detect_mult, session.detect_mult,
519 if session.sha1_key is None:
520 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
522 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
523 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
525 test.assert_equal(dump.conf_key_id,
526 session.sha1_key.conf_key_id,
530 def verify_ip(test, packet):
531 """ Verify correctness of IP layer. """
532 if test.vpp_session.af == AF_INET6:
534 local_ip = test.pg0.local_ip6
535 remote_ip = test.pg0.remote_ip6
536 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
539 local_ip = test.pg0.local_ip4
540 remote_ip = test.pg0.remote_ip4
541 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
542 test.assert_equal(ip.src, local_ip, "IP source address")
543 test.assert_equal(ip.dst, remote_ip, "IP destination address")
546 def verify_udp(test, packet):
547 """ Verify correctness of UDP layer. """
549 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
550 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
554 def verify_event(test, event, expected_state):
555 """ Verify correctness of event values. """
557 test.logger.debug("BFD: Event: %s" % repr(e))
558 test.assert_equal(e.sw_if_index,
559 test.vpp_session.interface.sw_if_index,
560 "BFD interface index")
562 if test.vpp_session.af == AF_INET6:
564 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
565 if test.vpp_session.af == AF_INET:
566 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
567 "Local IPv4 address")
568 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
571 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
572 "Local IPv6 address")
573 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
575 test.assert_equal(e.state, expected_state, BFDState)
578 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
579 """ wait for BFD packet and verify its correctness
581 :param timeout: how long to wait
582 :param pcap_time_min: ignore packets with pcap timestamp lower than this
584 :returns: tuple (packet, time spent waiting for packet)
586 test.logger.info("BFD: Waiting for BFD packet")
587 deadline = time.time() + timeout
592 test.assert_in_range(counter, 0, 100, "number of packets ignored")
593 time_left = deadline - time.time()
595 raise CaptureTimeoutError("Packet did not arrive within timeout")
596 p = test.pg0.wait_for_packet(timeout=time_left)
597 test.logger.debug(ppp("BFD: Got packet:", p))
598 if pcap_time_min is not None and p.time < pcap_time_min:
599 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
600 "pcap time min %s):" %
601 (p.time, pcap_time_min), p))
606 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
608 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
611 test.test_session.verify_bfd(p)
615 class BFD4TestCase(VppTestCase):
616 """Bidirectional Forwarding Detection (BFD)"""
619 vpp_clock_offset = None
625 super(BFD4TestCase, cls).setUpClass()
627 cls.create_pg_interfaces([0])
628 cls.create_loopback_interfaces([0])
629 cls.loopback0 = cls.lo_interfaces[0]
630 cls.loopback0.config_ip4()
631 cls.loopback0.admin_up()
633 cls.pg0.configure_ipv4_neighbors()
635 cls.pg0.resolve_arp()
638 super(BFD4TestCase, cls).tearDownClass()
642 super(BFD4TestCase, self).setUp()
643 self.factory = AuthKeyFactory()
644 self.vapi.want_bfd_events()
645 self.pg0.enable_capture()
647 self.vpp_session = VppBFDUDPSession(self, self.pg0,
649 self.vpp_session.add_vpp_config()
650 self.vpp_session.admin_up()
651 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
653 self.vapi.want_bfd_events(enable_disable=0)
657 if not self.vpp_dead:
658 self.vapi.want_bfd_events(enable_disable=0)
659 self.vapi.collect_events() # clear the event queue
660 super(BFD4TestCase, self).tearDown()
662 def test_session_up(self):
663 """ bring BFD session up """
666 def test_session_up_by_ip(self):
667 """ bring BFD session up - first frame looked up by address pair """
668 self.logger.info("BFD: Sending Slow control frame")
669 self.test_session.update(my_discriminator=randint(0, 40000000))
670 self.test_session.send_packet()
671 self.pg0.enable_capture()
672 p = self.pg0.wait_for_packet(1)
673 self.assert_equal(p[BFD].your_discriminator,
674 self.test_session.my_discriminator,
675 "BFD - your discriminator")
676 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
677 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
679 self.logger.info("BFD: Waiting for event")
680 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
681 verify_event(self, e, expected_state=BFDState.init)
682 self.logger.info("BFD: Sending Up")
683 self.test_session.send_packet()
684 self.logger.info("BFD: Waiting for event")
685 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
686 verify_event(self, e, expected_state=BFDState.up)
687 self.logger.info("BFD: Session is Up")
688 self.test_session.update(state=BFDState.up)
689 self.test_session.send_packet()
690 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
692 def test_session_down(self):
693 """ bring BFD session down """
695 bfd_session_down(self)
697 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
698 def test_hold_up(self):
699 """ hold BFD session up """
701 for dummy in range(self.test_session.detect_mult * 2):
702 wait_for_bfd_packet(self)
703 self.test_session.send_packet()
704 self.assert_equal(len(self.vapi.collect_events()), 0,
705 "number of bfd events")
707 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
708 def test_slow_timer(self):
709 """ verify slow periodic control frames while session down """
711 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
712 prev_packet = wait_for_bfd_packet(self, 2)
713 for dummy in range(packet_count):
714 next_packet = wait_for_bfd_packet(self, 2)
715 time_diff = next_packet.time - prev_packet.time
716 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
717 # to work around timing issues
718 self.assert_in_range(
719 time_diff, 0.70, 1.05, "time between slow packets")
720 prev_packet = next_packet
722 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
723 def test_zero_remote_min_rx(self):
724 """ no packets when zero remote required min rx interval """
726 self.test_session.update(required_min_rx=0)
727 self.test_session.send_packet()
728 for dummy in range(self.test_session.detect_mult):
729 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
730 "sleep before transmitting bfd packet")
731 self.test_session.send_packet()
733 p = wait_for_bfd_packet(self, timeout=0)
734 self.logger.error(ppp("Received unexpected packet:", p))
735 except CaptureTimeoutError:
738 len(self.vapi.collect_events()), 0, "number of bfd events")
739 self.test_session.update(required_min_rx=300000)
740 for dummy in range(3):
741 self.test_session.send_packet()
743 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
745 len(self.vapi.collect_events()), 0, "number of bfd events")
747 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
748 def test_conn_down(self):
749 """ verify session goes down after inactivity """
751 detection_time = self.test_session.detect_mult *\
752 self.vpp_session.required_min_rx / USEC_IN_SEC
753 self.sleep(detection_time, "waiting for BFD session time-out")
754 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755 verify_event(self, e, expected_state=BFDState.down)
757 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
758 def test_large_required_min_rx(self):
759 """ large remote required min rx interval """
761 p = wait_for_bfd_packet(self)
763 self.test_session.update(required_min_rx=interval)
764 self.test_session.send_packet()
765 time_mark = time.time()
767 # busy wait here, trying to collect a packet or event, vpp is not
768 # allowed to send packets and the session will timeout first - so the
769 # Up->Down event must arrive before any packets do
770 while time.time() < time_mark + interval / USEC_IN_SEC:
772 p = wait_for_bfd_packet(self, timeout=0)
773 # if vpp managed to send a packet before we did the session
774 # session update, then that's fine, ignore it
775 if p.time < time_mark - self.vpp_clock_offset:
777 self.logger.error(ppp("Received unexpected packet:", p))
779 except CaptureTimeoutError:
781 events = self.vapi.collect_events()
783 verify_event(self, events[0], BFDState.down)
785 self.assert_equal(count, 0, "number of packets received")
787 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
788 def test_immediate_remote_min_rx_reduction(self):
789 """ immediately honor remote required min rx reduction """
790 self.vpp_session.remove_vpp_config()
791 self.vpp_session = VppBFDUDPSession(
792 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
793 self.pg0.enable_capture()
794 self.vpp_session.add_vpp_config()
795 self.test_session.update(desired_min_tx=1000000,
796 required_min_rx=1000000)
798 reference_packet = wait_for_bfd_packet(self)
799 time_mark = time.time()
801 self.test_session.update(required_min_rx=interval)
802 self.test_session.send_packet()
803 extra_time = time.time() - time_mark
804 p = wait_for_bfd_packet(self)
805 # first packet is allowed to be late by time we spent doing the update
806 # calculated in extra_time
807 self.assert_in_range(p.time - reference_packet.time,
808 .95 * 0.75 * interval / USEC_IN_SEC,
809 1.05 * interval / USEC_IN_SEC + extra_time,
810 "time between BFD packets")
812 for dummy in range(3):
813 p = wait_for_bfd_packet(self)
814 diff = p.time - reference_packet.time
815 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
816 1.05 * interval / USEC_IN_SEC,
817 "time between BFD packets")
820 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
821 def test_modify_req_min_rx_double(self):
822 """ modify session - double required min rx """
824 p = wait_for_bfd_packet(self)
825 self.test_session.update(desired_min_tx=10000,
826 required_min_rx=10000)
827 self.test_session.send_packet()
828 # double required min rx
829 self.vpp_session.modify_parameters(
830 required_min_rx=2 * self.vpp_session.required_min_rx)
831 p = wait_for_bfd_packet(
832 self, pcap_time_min=time.time() - self.vpp_clock_offset)
833 # poll bit needs to be set
834 self.assertIn("P", p.sprintf("%BFD.flags%"),
835 "Poll bit not set in BFD packet")
836 # finish poll sequence with final packet
837 final = self.test_session.create_packet()
838 final[BFD].flags = "F"
839 timeout = self.test_session.detect_mult * \
840 max(self.test_session.desired_min_tx,
841 self.vpp_session.required_min_rx) / USEC_IN_SEC
842 self.test_session.send_packet(final)
843 time_mark = time.time()
844 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
845 verify_event(self, e, expected_state=BFDState.down)
846 time_to_event = time.time() - time_mark
847 self.assert_in_range(time_to_event, .9 * timeout,
848 1.1 * timeout, "session timeout")
850 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
851 def test_modify_req_min_rx_halve(self):
852 """ modify session - halve required min rx """
853 self.vpp_session.modify_parameters(
854 required_min_rx=2 * self.vpp_session.required_min_rx)
856 p = wait_for_bfd_packet(self)
857 self.test_session.update(desired_min_tx=10000,
858 required_min_rx=10000)
859 self.test_session.send_packet()
860 p = wait_for_bfd_packet(
861 self, pcap_time_min=time.time() - self.vpp_clock_offset)
862 # halve required min rx
863 old_required_min_rx = self.vpp_session.required_min_rx
864 self.vpp_session.modify_parameters(
865 required_min_rx=0.5 * self.vpp_session.required_min_rx)
866 # now we wait 0.8*3*old-req-min-rx and the session should still be up
867 self.sleep(0.8 * self.vpp_session.detect_mult *
868 old_required_min_rx / USEC_IN_SEC,
869 "wait before finishing poll sequence")
870 self.assert_equal(len(self.vapi.collect_events()), 0,
871 "number of bfd events")
872 p = wait_for_bfd_packet(self)
873 # poll bit needs to be set
874 self.assertIn("P", p.sprintf("%BFD.flags%"),
875 "Poll bit not set in BFD packet")
876 # finish poll sequence with final packet
877 final = self.test_session.create_packet()
878 final[BFD].flags = "F"
879 self.test_session.send_packet(final)
880 # now the session should time out under new conditions
881 detection_time = self.test_session.detect_mult *\
882 self.vpp_session.required_min_rx / USEC_IN_SEC
884 e = self.vapi.wait_for_event(
885 2 * detection_time, "bfd_udp_session_details")
887 self.assert_in_range(after - before,
888 0.9 * detection_time,
889 1.1 * detection_time,
890 "time before bfd session goes down")
891 verify_event(self, e, expected_state=BFDState.down)
893 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
894 def test_modify_detect_mult(self):
895 """ modify detect multiplier """
897 p = wait_for_bfd_packet(self)
898 self.vpp_session.modify_parameters(detect_mult=1)
899 p = wait_for_bfd_packet(
900 self, pcap_time_min=time.time() - self.vpp_clock_offset)
901 self.assert_equal(self.vpp_session.detect_mult,
904 # poll bit must not be set
905 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
906 "Poll bit not set in BFD packet")
907 self.vpp_session.modify_parameters(detect_mult=10)
908 p = wait_for_bfd_packet(
909 self, pcap_time_min=time.time() - self.vpp_clock_offset)
910 self.assert_equal(self.vpp_session.detect_mult,
913 # poll bit must not be set
914 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
915 "Poll bit not set in BFD packet")
917 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
918 def test_queued_poll(self):
919 """ test poll sequence queueing """
921 p = wait_for_bfd_packet(self)
922 self.vpp_session.modify_parameters(
923 required_min_rx=2 * self.vpp_session.required_min_rx)
924 p = wait_for_bfd_packet(self)
925 poll_sequence_start = time.time()
926 poll_sequence_length_min = 0.5
927 send_final_after = time.time() + poll_sequence_length_min
928 # poll bit needs to be set
929 self.assertIn("P", p.sprintf("%BFD.flags%"),
930 "Poll bit not set in BFD packet")
931 self.assert_equal(p[BFD].required_min_rx_interval,
932 self.vpp_session.required_min_rx,
933 "BFD required min rx interval")
934 self.vpp_session.modify_parameters(
935 required_min_rx=2 * self.vpp_session.required_min_rx)
936 # 2nd poll sequence should be queued now
937 # don't send the reply back yet, wait for some time to emulate
938 # longer round-trip time
940 while time.time() < send_final_after:
941 self.test_session.send_packet()
942 p = wait_for_bfd_packet(self)
943 self.assert_equal(len(self.vapi.collect_events()), 0,
944 "number of bfd events")
945 self.assert_equal(p[BFD].required_min_rx_interval,
946 self.vpp_session.required_min_rx,
947 "BFD required min rx interval")
949 # poll bit must be set
950 self.assertIn("P", p.sprintf("%BFD.flags%"),
951 "Poll bit not set in BFD packet")
952 final = self.test_session.create_packet()
953 final[BFD].flags = "F"
954 self.test_session.send_packet(final)
955 # finish 1st with final
956 poll_sequence_length = time.time() - poll_sequence_start
957 # vpp must wait for some time before starting new poll sequence
958 poll_no_2_started = False
959 for dummy in range(2 * packet_count):
960 p = wait_for_bfd_packet(self)
961 self.assert_equal(len(self.vapi.collect_events()), 0,
962 "number of bfd events")
963 if "P" in p.sprintf("%BFD.flags%"):
964 poll_no_2_started = True
965 if time.time() < poll_sequence_start + poll_sequence_length:
966 raise Exception("VPP started 2nd poll sequence too soon")
967 final = self.test_session.create_packet()
968 final[BFD].flags = "F"
969 self.test_session.send_packet(final)
972 self.test_session.send_packet()
973 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
974 # finish 2nd with final
975 final = self.test_session.create_packet()
976 final[BFD].flags = "F"
977 self.test_session.send_packet(final)
978 p = wait_for_bfd_packet(self)
979 # poll bit must not be set
980 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
981 "Poll bit set in BFD packet")
983 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
984 def test_poll_response(self):
985 """ test correct response to control frame with poll bit set """
987 poll = self.test_session.create_packet()
988 poll[BFD].flags = "P"
989 self.test_session.send_packet(poll)
990 final = wait_for_bfd_packet(
991 self, pcap_time_min=time.time() - self.vpp_clock_offset)
992 self.assertIn("F", final.sprintf("%BFD.flags%"))
994 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
995 def test_no_periodic_if_remote_demand(self):
996 """ no periodic frames outside poll sequence if remote demand set """
998 demand = self.test_session.create_packet()
999 demand[BFD].flags = "D"
1000 self.test_session.send_packet(demand)
1001 transmit_time = 0.9 \
1002 * max(self.vpp_session.required_min_rx,
1003 self.test_session.desired_min_tx) \
1006 for dummy in range(self.test_session.detect_mult * 2):
1007 time.sleep(transmit_time)
1008 self.test_session.send_packet(demand)
1010 p = wait_for_bfd_packet(self, timeout=0)
1011 self.logger.error(ppp("Received unexpected packet:", p))
1013 except CaptureTimeoutError:
1015 events = self.vapi.collect_events()
1017 self.logger.error("Received unexpected event: %s", e)
1018 self.assert_equal(count, 0, "number of packets received")
1019 self.assert_equal(len(events), 0, "number of events received")
1021 def test_echo_looped_back(self):
1022 """ echo packets looped back """
1023 # don't need a session in this case..
1024 self.vpp_session.remove_vpp_config()
1025 self.pg0.enable_capture()
1026 echo_packet_count = 10
1027 # random source port low enough to increment a few times..
1028 udp_sport_tx = randint(1, 50000)
1029 udp_sport_rx = udp_sport_tx
1030 echo_packet = (Ether(src=self.pg0.remote_mac,
1031 dst=self.pg0.local_mac) /
1032 IP(src=self.pg0.remote_ip4,
1033 dst=self.pg0.remote_ip4) /
1034 UDP(dport=BFD.udp_dport_echo) /
1035 Raw("this should be looped back"))
1036 for dummy in range(echo_packet_count):
1037 self.sleep(.01, "delay between echo packets")
1038 echo_packet[UDP].sport = udp_sport_tx
1040 self.logger.debug(ppp("Sending packet:", echo_packet))
1041 self.pg0.add_stream(echo_packet)
1043 for dummy in range(echo_packet_count):
1044 p = self.pg0.wait_for_packet(1)
1045 self.logger.debug(ppp("Got packet:", p))
1047 self.assert_equal(self.pg0.remote_mac,
1048 ether.dst, "Destination MAC")
1049 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1051 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1052 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1054 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1055 "UDP destination port")
1056 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1058 # need to compare the hex payload here, otherwise BFD_vpp_echo
1060 self.assertEqual(str(p[UDP].payload),
1061 str(echo_packet[UDP].payload),
1062 "Received packet is not the echo packet sent")
1063 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1064 "ECHO packet identifier for test purposes)")
1066 def test_echo(self):
1067 """ echo function """
1068 bfd_session_up(self)
1069 self.test_session.update(required_min_echo_rx=50000)
1070 self.test_session.send_packet()
1071 detection_time = self.test_session.detect_mult *\
1072 self.vpp_session.required_min_rx / USEC_IN_SEC
1073 # echo shouldn't work without echo source set
1074 for dummy in range(3):
1075 sleep = 0.75 * detection_time
1076 self.sleep(sleep, "delay before sending bfd packet")
1077 self.test_session.send_packet()
1078 p = wait_for_bfd_packet(
1079 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1080 self.assert_equal(p[BFD].required_min_rx_interval,
1081 self.vpp_session.required_min_rx,
1082 "BFD required min rx interval")
1083 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1084 # should be turned on - loopback echo packets
1085 for dummy in range(3):
1086 loop_until = time.time() + 0.75 * detection_time
1087 while time.time() < loop_until:
1088 p = self.pg0.wait_for_packet(1)
1089 self.logger.debug(ppp("Got packet:", p))
1090 if p[UDP].dport == BFD.udp_dport_echo:
1092 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1093 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1094 "BFD ECHO src IP equal to loopback IP")
1095 self.logger.debug(ppp("Looping back packet:", p))
1096 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1097 "ECHO packet destination MAC address")
1098 p[Ether].dst = self.pg0.local_mac
1099 self.pg0.add_stream(p)
1101 elif p.haslayer(BFD):
1102 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1104 if "P" in p.sprintf("%BFD.flags%"):
1105 final = self.test_session.create_packet()
1106 final[BFD].flags = "F"
1107 self.test_session.send_packet(final)
1109 raise Exception(ppp("Received unknown packet:", p))
1111 self.assert_equal(len(self.vapi.collect_events()), 0,
1112 "number of bfd events")
1113 self.test_session.send_packet()
1115 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1116 def test_echo_fail(self):
1117 """ session goes down if echo function fails """
1118 bfd_session_up(self)
1119 self.test_session.update(required_min_echo_rx=50000)
1120 self.test_session.send_packet()
1121 detection_time = self.test_session.detect_mult *\
1122 self.vpp_session.required_min_rx / USEC_IN_SEC
1123 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1124 # echo function should be used now, but we will drop the echo packets
1125 verified_diag = False
1126 for dummy in range(3):
1127 loop_until = time.time() + 0.75 * detection_time
1128 while time.time() < loop_until:
1129 p = self.pg0.wait_for_packet(1)
1130 self.logger.debug(ppp("Got packet:", p))
1131 if p[UDP].dport == BFD.udp_dport_echo:
1134 elif p.haslayer(BFD):
1135 if "P" in p.sprintf("%BFD.flags%"):
1136 self.assertGreaterEqual(
1137 p[BFD].required_min_rx_interval,
1139 final = self.test_session.create_packet()
1140 final[BFD].flags = "F"
1141 self.test_session.send_packet(final)
1142 if p[BFD].state == BFDState.down:
1143 self.assert_equal(p[BFD].diag,
1144 BFDDiagCode.echo_function_failed,
1146 verified_diag = True
1148 raise Exception(ppp("Received unknown packet:", p))
1149 self.test_session.send_packet()
1150 events = self.vapi.collect_events()
1151 self.assert_equal(len(events), 1, "number of bfd events")
1152 self.assert_equal(events[0].state, BFDState.down, BFDState)
1153 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1155 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1156 def test_echo_stop(self):
1157 """ echo function stops if peer sets required min echo rx zero """
1158 bfd_session_up(self)
1159 self.test_session.update(required_min_echo_rx=50000)
1160 self.test_session.send_packet()
1161 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1162 # wait for first echo packet
1164 p = self.pg0.wait_for_packet(1)
1165 self.logger.debug(ppp("Got packet:", p))
1166 if p[UDP].dport == BFD.udp_dport_echo:
1167 self.logger.debug(ppp("Looping back packet:", p))
1168 p[Ether].dst = self.pg0.local_mac
1169 self.pg0.add_stream(p)
1172 elif p.haslayer(BFD):
1176 raise Exception(ppp("Received unknown packet:", p))
1177 self.test_session.update(required_min_echo_rx=0)
1178 self.test_session.send_packet()
1179 # echo packets shouldn't arrive anymore
1180 for dummy in range(5):
1181 wait_for_bfd_packet(
1182 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1183 self.test_session.send_packet()
1184 events = self.vapi.collect_events()
1185 self.assert_equal(len(events), 0, "number of bfd events")
1187 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1188 def test_echo_source_removed(self):
1189 """ echo function stops if echo source is removed """
1190 bfd_session_up(self)
1191 self.test_session.update(required_min_echo_rx=50000)
1192 self.test_session.send_packet()
1193 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1194 # wait for first echo packet
1196 p = self.pg0.wait_for_packet(1)
1197 self.logger.debug(ppp("Got packet:", p))
1198 if p[UDP].dport == BFD.udp_dport_echo:
1199 self.logger.debug(ppp("Looping back packet:", p))
1200 p[Ether].dst = self.pg0.local_mac
1201 self.pg0.add_stream(p)
1204 elif p.haslayer(BFD):
1208 raise Exception(ppp("Received unknown packet:", p))
1209 self.vapi.bfd_udp_del_echo_source()
1210 self.test_session.send_packet()
1211 # echo packets shouldn't arrive anymore
1212 for dummy in range(5):
1213 wait_for_bfd_packet(
1214 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1215 self.test_session.send_packet()
1216 events = self.vapi.collect_events()
1217 self.assert_equal(len(events), 0, "number of bfd events")
1219 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1220 def test_stale_echo(self):
1221 """ stale echo packets don't keep a session up """
1222 bfd_session_up(self)
1223 self.test_session.update(required_min_echo_rx=50000)
1224 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1225 self.test_session.send_packet()
1226 # should be turned on - loopback echo packets
1230 for dummy in range(10 * self.vpp_session.detect_mult):
1231 p = self.pg0.wait_for_packet(1)
1232 if p[UDP].dport == BFD.udp_dport_echo:
1233 if echo_packet is None:
1234 self.logger.debug(ppp("Got first echo packet:", p))
1236 timeout_at = time.time() + self.vpp_session.detect_mult * \
1237 self.test_session.required_min_echo_rx / USEC_IN_SEC
1239 self.logger.debug(ppp("Got followup echo packet:", p))
1240 self.logger.debug(ppp("Looping back first echo packet:", p))
1241 echo_packet[Ether].dst = self.pg0.local_mac
1242 self.pg0.add_stream(echo_packet)
1244 elif p.haslayer(BFD):
1245 self.logger.debug(ppp("Got packet:", p))
1246 if "P" in p.sprintf("%BFD.flags%"):
1247 final = self.test_session.create_packet()
1248 final[BFD].flags = "F"
1249 self.test_session.send_packet(final)
1250 if p[BFD].state == BFDState.down:
1251 self.assertIsNotNone(
1253 "Session went down before first echo packet received")
1255 self.assertGreaterEqual(
1257 "Session timeout at %s, but is expected at %s" %
1259 self.assert_equal(p[BFD].diag,
1260 BFDDiagCode.echo_function_failed,
1262 events = self.vapi.collect_events()
1263 self.assert_equal(len(events), 1, "number of bfd events")
1264 self.assert_equal(events[0].state, BFDState.down, BFDState)
1268 raise Exception(ppp("Received unknown packet:", p))
1269 self.test_session.send_packet()
1270 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1272 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1273 def test_invalid_echo_checksum(self):
1274 """ echo packets with invalid checksum don't keep a session up """
1275 bfd_session_up(self)
1276 self.test_session.update(required_min_echo_rx=50000)
1277 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1278 self.test_session.send_packet()
1279 # should be turned on - loopback echo packets
1282 for dummy in range(10 * self.vpp_session.detect_mult):
1283 p = self.pg0.wait_for_packet(1)
1284 if p[UDP].dport == BFD.udp_dport_echo:
1285 self.logger.debug(ppp("Got echo packet:", p))
1286 if timeout_at is None:
1287 timeout_at = time.time() + self.vpp_session.detect_mult * \
1288 self.test_session.required_min_echo_rx / USEC_IN_SEC
1289 p[BFD_vpp_echo].checksum = getrandbits(64)
1290 p[Ether].dst = self.pg0.local_mac
1291 self.logger.debug(ppp("Looping back modified echo packet:", p))
1292 self.pg0.add_stream(p)
1294 elif p.haslayer(BFD):
1295 self.logger.debug(ppp("Got packet:", p))
1296 if "P" in p.sprintf("%BFD.flags%"):
1297 final = self.test_session.create_packet()
1298 final[BFD].flags = "F"
1299 self.test_session.send_packet(final)
1300 if p[BFD].state == BFDState.down:
1301 self.assertIsNotNone(
1303 "Session went down before first echo packet received")
1305 self.assertGreaterEqual(
1307 "Session timeout at %s, but is expected at %s" %
1309 self.assert_equal(p[BFD].diag,
1310 BFDDiagCode.echo_function_failed,
1312 events = self.vapi.collect_events()
1313 self.assert_equal(len(events), 1, "number of bfd events")
1314 self.assert_equal(events[0].state, BFDState.down, BFDState)
1318 raise Exception(ppp("Received unknown packet:", p))
1319 self.test_session.send_packet()
1320 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1322 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1323 def test_admin_up_down(self):
1324 """ put session admin-up and admin-down """
1325 bfd_session_up(self)
1326 self.vpp_session.admin_down()
1327 self.pg0.enable_capture()
1328 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1329 verify_event(self, e, expected_state=BFDState.admin_down)
1330 for dummy in range(2):
1331 p = wait_for_bfd_packet(self)
1332 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1333 # try to bring session up - shouldn't be possible
1334 self.test_session.update(state=BFDState.init)
1335 self.test_session.send_packet()
1336 for dummy in range(2):
1337 p = wait_for_bfd_packet(self)
1338 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1339 self.vpp_session.admin_up()
1340 self.test_session.update(state=BFDState.down)
1341 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1342 verify_event(self, e, expected_state=BFDState.down)
1343 p = wait_for_bfd_packet(
1344 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1345 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1346 self.test_session.send_packet()
1347 p = wait_for_bfd_packet(
1348 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1349 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1350 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1351 verify_event(self, e, expected_state=BFDState.init)
1352 self.test_session.update(state=BFDState.up)
1353 self.test_session.send_packet()
1354 p = wait_for_bfd_packet(
1355 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1356 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1357 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1358 verify_event(self, e, expected_state=BFDState.up)
1360 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1361 def test_config_change_remote_demand(self):
1362 """ configuration change while peer in demand mode """
1363 bfd_session_up(self)
1364 demand = self.test_session.create_packet()
1365 demand[BFD].flags = "D"
1366 self.test_session.send_packet(demand)
1367 self.vpp_session.modify_parameters(
1368 required_min_rx=2 * self.vpp_session.required_min_rx)
1369 p = wait_for_bfd_packet(
1370 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1371 # poll bit must be set
1372 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1373 # terminate poll sequence
1374 final = self.test_session.create_packet()
1375 final[BFD].flags = "D+F"
1376 self.test_session.send_packet(final)
1377 # vpp should be quiet now again
1378 transmit_time = 0.9 \
1379 * max(self.vpp_session.required_min_rx,
1380 self.test_session.desired_min_tx) \
1383 for dummy in range(self.test_session.detect_mult * 2):
1384 time.sleep(transmit_time)
1385 self.test_session.send_packet(demand)
1387 p = wait_for_bfd_packet(self, timeout=0)
1388 self.logger.error(ppp("Received unexpected packet:", p))
1390 except CaptureTimeoutError:
1392 events = self.vapi.collect_events()
1394 self.logger.error("Received unexpected event: %s", e)
1395 self.assert_equal(count, 0, "number of packets received")
1396 self.assert_equal(len(events), 0, "number of events received")
1399 class BFD6TestCase(VppTestCase):
1400 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1403 vpp_clock_offset = None
1408 def setUpClass(cls):
1409 super(BFD6TestCase, cls).setUpClass()
1411 cls.create_pg_interfaces([0])
1412 cls.pg0.config_ip6()
1413 cls.pg0.configure_ipv6_neighbors()
1415 cls.pg0.resolve_ndp()
1416 cls.create_loopback_interfaces([0])
1417 cls.loopback0 = cls.lo_interfaces[0]
1418 cls.loopback0.config_ip6()
1419 cls.loopback0.admin_up()
1422 super(BFD6TestCase, cls).tearDownClass()
1426 super(BFD6TestCase, self).setUp()
1427 self.factory = AuthKeyFactory()
1428 self.vapi.want_bfd_events()
1429 self.pg0.enable_capture()
1431 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1432 self.pg0.remote_ip6,
1434 self.vpp_session.add_vpp_config()
1435 self.vpp_session.admin_up()
1436 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1437 self.logger.debug(self.vapi.cli("show adj nbr"))
1439 self.vapi.want_bfd_events(enable_disable=0)
1443 if not self.vpp_dead:
1444 self.vapi.want_bfd_events(enable_disable=0)
1445 self.vapi.collect_events() # clear the event queue
1446 super(BFD6TestCase, self).tearDown()
1448 def test_session_up(self):
1449 """ bring BFD session up """
1450 bfd_session_up(self)
1452 def test_session_up_by_ip(self):
1453 """ bring BFD session up - first frame looked up by address pair """
1454 self.logger.info("BFD: Sending Slow control frame")
1455 self.test_session.update(my_discriminator=randint(0, 40000000))
1456 self.test_session.send_packet()
1457 self.pg0.enable_capture()
1458 p = self.pg0.wait_for_packet(1)
1459 self.assert_equal(p[BFD].your_discriminator,
1460 self.test_session.my_discriminator,
1461 "BFD - your discriminator")
1462 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1463 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1465 self.logger.info("BFD: Waiting for event")
1466 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1467 verify_event(self, e, expected_state=BFDState.init)
1468 self.logger.info("BFD: Sending Up")
1469 self.test_session.send_packet()
1470 self.logger.info("BFD: Waiting for event")
1471 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1472 verify_event(self, e, expected_state=BFDState.up)
1473 self.logger.info("BFD: Session is Up")
1474 self.test_session.update(state=BFDState.up)
1475 self.test_session.send_packet()
1476 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1478 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1479 def test_hold_up(self):
1480 """ hold BFD session up """
1481 bfd_session_up(self)
1482 for dummy in range(self.test_session.detect_mult * 2):
1483 wait_for_bfd_packet(self)
1484 self.test_session.send_packet()
1485 self.assert_equal(len(self.vapi.collect_events()), 0,
1486 "number of bfd events")
1487 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1489 def test_echo_looped_back(self):
1490 """ echo packets looped back """
1491 # don't need a session in this case..
1492 self.vpp_session.remove_vpp_config()
1493 self.pg0.enable_capture()
1494 echo_packet_count = 10
1495 # random source port low enough to increment a few times..
1496 udp_sport_tx = randint(1, 50000)
1497 udp_sport_rx = udp_sport_tx
1498 echo_packet = (Ether(src=self.pg0.remote_mac,
1499 dst=self.pg0.local_mac) /
1500 IPv6(src=self.pg0.remote_ip6,
1501 dst=self.pg0.remote_ip6) /
1502 UDP(dport=BFD.udp_dport_echo) /
1503 Raw("this should be looped back"))
1504 for dummy in range(echo_packet_count):
1505 self.sleep(.01, "delay between echo packets")
1506 echo_packet[UDP].sport = udp_sport_tx
1508 self.logger.debug(ppp("Sending packet:", echo_packet))
1509 self.pg0.add_stream(echo_packet)
1511 for dummy in range(echo_packet_count):
1512 p = self.pg0.wait_for_packet(1)
1513 self.logger.debug(ppp("Got packet:", p))
1515 self.assert_equal(self.pg0.remote_mac,
1516 ether.dst, "Destination MAC")
1517 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1519 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1520 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1522 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1523 "UDP destination port")
1524 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1526 # need to compare the hex payload here, otherwise BFD_vpp_echo
1528 self.assertEqual(str(p[UDP].payload),
1529 str(echo_packet[UDP].payload),
1530 "Received packet is not the echo packet sent")
1531 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1532 "ECHO packet identifier for test purposes)")
1533 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1534 "ECHO packet identifier for test purposes)")
1536 def test_echo(self):
1537 """ echo function used """
1538 bfd_session_up(self)
1539 self.test_session.update(required_min_echo_rx=50000)
1540 self.test_session.send_packet()
1541 detection_time = self.test_session.detect_mult *\
1542 self.vpp_session.required_min_rx / USEC_IN_SEC
1543 # echo shouldn't work without echo source set
1544 for dummy in range(3):
1545 sleep = 0.75 * detection_time
1546 self.sleep(sleep, "delay before sending bfd packet")
1547 self.test_session.send_packet()
1548 p = wait_for_bfd_packet(
1549 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1550 self.assert_equal(p[BFD].required_min_rx_interval,
1551 self.vpp_session.required_min_rx,
1552 "BFD required min rx interval")
1553 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1554 # should be turned on - loopback echo packets
1555 for dummy in range(3):
1556 loop_until = time.time() + 0.75 * detection_time
1557 while time.time() < loop_until:
1558 p = self.pg0.wait_for_packet(1)
1559 self.logger.debug(ppp("Got packet:", p))
1560 if p[UDP].dport == BFD.udp_dport_echo:
1562 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1563 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1564 "BFD ECHO src IP equal to loopback IP")
1565 self.logger.debug(ppp("Looping back packet:", p))
1566 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1567 "ECHO packet destination MAC address")
1568 p[Ether].dst = self.pg0.local_mac
1569 self.pg0.add_stream(p)
1571 elif p.haslayer(BFD):
1572 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1574 if "P" in p.sprintf("%BFD.flags%"):
1575 final = self.test_session.create_packet()
1576 final[BFD].flags = "F"
1577 self.test_session.send_packet(final)
1579 raise Exception(ppp("Received unknown packet:", p))
1581 self.assert_equal(len(self.vapi.collect_events()), 0,
1582 "number of bfd events")
1583 self.test_session.send_packet()
1586 class BFDFIBTestCase(VppTestCase):
1587 """ BFD-FIB interactions (IPv6) """
1593 super(BFDFIBTestCase, self).setUp()
1594 self.create_pg_interfaces(range(1))
1596 self.vapi.want_bfd_events()
1597 self.pg0.enable_capture()
1599 for i in self.pg_interfaces:
1602 i.configure_ipv6_neighbors()
1605 if not self.vpp_dead:
1606 self.vapi.want_bfd_events(enable_disable=0)
1608 super(BFDFIBTestCase, self).tearDown()
1611 def pkt_is_not_data_traffic(p):
1612 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1613 if p.haslayer(BFD) or is_ipv6_misc(p):
1617 def test_session_with_fib(self):
1618 """ BFD-FIB interactions """
1620 # packets to match against both of the routes
1621 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1622 IPv6(src="3001::1", dst="2001::1") /
1623 UDP(sport=1234, dport=1234) /
1625 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1626 IPv6(src="3001::1", dst="2002::1") /
1627 UDP(sport=1234, dport=1234) /
1630 # A recursive and a non-recursive route via a next-hop that
1631 # will have a BFD session
1632 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1633 [VppRoutePath(self.pg0.remote_ip6,
1634 self.pg0.sw_if_index,
1637 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1638 [VppRoutePath(self.pg0.remote_ip6,
1642 ip_2001_s_64.add_vpp_config()
1643 ip_2002_s_64.add_vpp_config()
1645 # bring the session up now the routes are present
1646 self.vpp_session = VppBFDUDPSession(self,
1648 self.pg0.remote_ip6,
1650 self.vpp_session.add_vpp_config()
1651 self.vpp_session.admin_up()
1652 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1654 # session is up - traffic passes
1655 bfd_session_up(self)
1657 self.pg0.add_stream(p)
1660 captured = self.pg0.wait_for_packet(
1662 filter_out_fn=self.pkt_is_not_data_traffic)
1663 self.assertEqual(captured[IPv6].dst,
1666 # session is up - traffic is dropped
1667 bfd_session_down(self)
1669 self.pg0.add_stream(p)
1671 with self.assertRaises(CaptureTimeoutError):
1672 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1674 # session is up - traffic passes
1675 bfd_session_up(self)
1677 self.pg0.add_stream(p)
1680 captured = self.pg0.wait_for_packet(
1682 filter_out_fn=self.pkt_is_not_data_traffic)
1683 self.assertEqual(captured[IPv6].dst,
1687 class BFDSHA1TestCase(VppTestCase):
1688 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1691 vpp_clock_offset = None
1696 def setUpClass(cls):
1697 super(BFDSHA1TestCase, cls).setUpClass()
1699 cls.create_pg_interfaces([0])
1700 cls.pg0.config_ip4()
1702 cls.pg0.resolve_arp()
1705 super(BFDSHA1TestCase, cls).tearDownClass()
1709 super(BFDSHA1TestCase, self).setUp()
1710 self.factory = AuthKeyFactory()
1711 self.vapi.want_bfd_events()
1712 self.pg0.enable_capture()
1715 if not self.vpp_dead:
1716 self.vapi.want_bfd_events(enable_disable=0)
1717 self.vapi.collect_events() # clear the event queue
1718 super(BFDSHA1TestCase, self).tearDown()
1720 def test_session_up(self):
1721 """ bring BFD session up """
1722 key = self.factory.create_random_key(self)
1723 key.add_vpp_config()
1724 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1725 self.pg0.remote_ip4,
1727 self.vpp_session.add_vpp_config()
1728 self.vpp_session.admin_up()
1729 self.test_session = BFDTestSession(
1730 self, self.pg0, AF_INET, sha1_key=key,
1731 bfd_key_id=self.vpp_session.bfd_key_id)
1732 bfd_session_up(self)
1734 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1735 def test_hold_up(self):
1736 """ hold BFD session up """
1737 key = self.factory.create_random_key(self)
1738 key.add_vpp_config()
1739 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1740 self.pg0.remote_ip4,
1742 self.vpp_session.add_vpp_config()
1743 self.vpp_session.admin_up()
1744 self.test_session = BFDTestSession(
1745 self, self.pg0, AF_INET, sha1_key=key,
1746 bfd_key_id=self.vpp_session.bfd_key_id)
1747 bfd_session_up(self)
1748 for dummy in range(self.test_session.detect_mult * 2):
1749 wait_for_bfd_packet(self)
1750 self.test_session.send_packet()
1751 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1753 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1754 def test_hold_up_meticulous(self):
1755 """ hold BFD session up - meticulous auth """
1756 key = self.factory.create_random_key(
1757 self, BFDAuthType.meticulous_keyed_sha1)
1758 key.add_vpp_config()
1759 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1760 self.pg0.remote_ip4, sha1_key=key)
1761 self.vpp_session.add_vpp_config()
1762 self.vpp_session.admin_up()
1763 # specify sequence number so that it wraps
1764 self.test_session = BFDTestSession(
1765 self, self.pg0, AF_INET, sha1_key=key,
1766 bfd_key_id=self.vpp_session.bfd_key_id,
1767 our_seq_number=0xFFFFFFFF - 4)
1768 bfd_session_up(self)
1769 for dummy in range(30):
1770 wait_for_bfd_packet(self)
1771 self.test_session.inc_seq_num()
1772 self.test_session.send_packet()
1773 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1775 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1776 def test_send_bad_seq_number(self):
1777 """ session is not kept alive by msgs with bad sequence numbers"""
1778 key = self.factory.create_random_key(
1779 self, BFDAuthType.meticulous_keyed_sha1)
1780 key.add_vpp_config()
1781 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1782 self.pg0.remote_ip4, sha1_key=key)
1783 self.vpp_session.add_vpp_config()
1784 self.test_session = BFDTestSession(
1785 self, self.pg0, AF_INET, sha1_key=key,
1786 bfd_key_id=self.vpp_session.bfd_key_id)
1787 bfd_session_up(self)
1788 detection_time = self.test_session.detect_mult *\
1789 self.vpp_session.required_min_rx / USEC_IN_SEC
1790 send_until = time.time() + 2 * detection_time
1791 while time.time() < send_until:
1792 self.test_session.send_packet()
1793 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1794 "time between bfd packets")
1795 e = self.vapi.collect_events()
1796 # session should be down now, because the sequence numbers weren't
1798 self.assert_equal(len(e), 1, "number of bfd events")
1799 verify_event(self, e[0], expected_state=BFDState.down)
1801 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1802 legitimate_test_session,
1804 rogue_bfd_values=None):
1805 """ execute a rogue session interaction scenario
1807 1. create vpp session, add config
1808 2. bring the legitimate session up
1809 3. copy the bfd values from legitimate session to rogue session
1810 4. apply rogue_bfd_values to rogue session
1811 5. set rogue session state to down
1812 6. send message to take the session down from the rogue session
1813 7. assert that the legitimate session is unaffected
1816 self.vpp_session = vpp_bfd_udp_session
1817 self.vpp_session.add_vpp_config()
1818 self.test_session = legitimate_test_session
1819 # bring vpp session up
1820 bfd_session_up(self)
1821 # send packet from rogue session
1822 rogue_test_session.update(
1823 my_discriminator=self.test_session.my_discriminator,
1824 your_discriminator=self.test_session.your_discriminator,
1825 desired_min_tx=self.test_session.desired_min_tx,
1826 required_min_rx=self.test_session.required_min_rx,
1827 detect_mult=self.test_session.detect_mult,
1828 diag=self.test_session.diag,
1829 state=self.test_session.state,
1830 auth_type=self.test_session.auth_type)
1831 if rogue_bfd_values:
1832 rogue_test_session.update(**rogue_bfd_values)
1833 rogue_test_session.update(state=BFDState.down)
1834 rogue_test_session.send_packet()
1835 wait_for_bfd_packet(self)
1836 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1838 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1839 def test_mismatch_auth(self):
1840 """ session is not brought down by unauthenticated msg """
1841 key = self.factory.create_random_key(self)
1842 key.add_vpp_config()
1843 vpp_session = VppBFDUDPSession(
1844 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1845 legitimate_test_session = BFDTestSession(
1846 self, self.pg0, AF_INET, sha1_key=key,
1847 bfd_key_id=vpp_session.bfd_key_id)
1848 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1849 self.execute_rogue_session_scenario(vpp_session,
1850 legitimate_test_session,
1853 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1854 def test_mismatch_bfd_key_id(self):
1855 """ session is not brought down by msg with non-existent key-id """
1856 key = self.factory.create_random_key(self)
1857 key.add_vpp_config()
1858 vpp_session = VppBFDUDPSession(
1859 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1860 # pick a different random bfd key id
1862 while x == vpp_session.bfd_key_id:
1864 legitimate_test_session = BFDTestSession(
1865 self, self.pg0, AF_INET, sha1_key=key,
1866 bfd_key_id=vpp_session.bfd_key_id)
1867 rogue_test_session = BFDTestSession(
1868 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1869 self.execute_rogue_session_scenario(vpp_session,
1870 legitimate_test_session,
1873 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1874 def test_mismatched_auth_type(self):
1875 """ session is not brought down by msg with wrong auth type """
1876 key = self.factory.create_random_key(self)
1877 key.add_vpp_config()
1878 vpp_session = VppBFDUDPSession(
1879 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1880 legitimate_test_session = BFDTestSession(
1881 self, self.pg0, AF_INET, sha1_key=key,
1882 bfd_key_id=vpp_session.bfd_key_id)
1883 rogue_test_session = BFDTestSession(
1884 self, self.pg0, AF_INET, sha1_key=key,
1885 bfd_key_id=vpp_session.bfd_key_id)
1886 self.execute_rogue_session_scenario(
1887 vpp_session, legitimate_test_session, rogue_test_session,
1888 {'auth_type': BFDAuthType.keyed_md5})
1890 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1891 def test_restart(self):
1892 """ simulate remote peer restart and resynchronization """
1893 key = self.factory.create_random_key(
1894 self, BFDAuthType.meticulous_keyed_sha1)
1895 key.add_vpp_config()
1896 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1897 self.pg0.remote_ip4, sha1_key=key)
1898 self.vpp_session.add_vpp_config()
1899 self.test_session = BFDTestSession(
1900 self, self.pg0, AF_INET, sha1_key=key,
1901 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1902 bfd_session_up(self)
1903 # don't send any packets for 2*detection_time
1904 detection_time = self.test_session.detect_mult *\
1905 self.vpp_session.required_min_rx / USEC_IN_SEC
1906 self.sleep(2 * detection_time, "simulating peer restart")
1907 events = self.vapi.collect_events()
1908 self.assert_equal(len(events), 1, "number of bfd events")
1909 verify_event(self, events[0], expected_state=BFDState.down)
1910 self.test_session.update(state=BFDState.down)
1911 # reset sequence number
1912 self.test_session.our_seq_number = 0
1913 self.test_session.vpp_seq_number = None
1914 # now throw away any pending packets
1915 self.pg0.enable_capture()
1916 bfd_session_up(self)
1919 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1920 class BFDAuthOnOffTestCase(VppTestCase):
1921 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1928 def setUpClass(cls):
1929 super(BFDAuthOnOffTestCase, cls).setUpClass()
1931 cls.create_pg_interfaces([0])
1932 cls.pg0.config_ip4()
1934 cls.pg0.resolve_arp()
1937 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1941 super(BFDAuthOnOffTestCase, self).setUp()
1942 self.factory = AuthKeyFactory()
1943 self.vapi.want_bfd_events()
1944 self.pg0.enable_capture()
1947 if not self.vpp_dead:
1948 self.vapi.want_bfd_events(enable_disable=0)
1949 self.vapi.collect_events() # clear the event queue
1950 super(BFDAuthOnOffTestCase, self).tearDown()
1952 def test_auth_on_immediate(self):
1953 """ turn auth on without disturbing session state (immediate) """
1954 key = self.factory.create_random_key(self)
1955 key.add_vpp_config()
1956 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1957 self.pg0.remote_ip4)
1958 self.vpp_session.add_vpp_config()
1959 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1960 bfd_session_up(self)
1961 for dummy in range(self.test_session.detect_mult * 2):
1962 p = wait_for_bfd_packet(self)
1963 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1964 self.test_session.send_packet()
1965 self.vpp_session.activate_auth(key)
1966 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1967 self.test_session.sha1_key = key
1968 for dummy in range(self.test_session.detect_mult * 2):
1969 p = wait_for_bfd_packet(self)
1970 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1971 self.test_session.send_packet()
1972 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1973 self.assert_equal(len(self.vapi.collect_events()), 0,
1974 "number of bfd events")
1976 def test_auth_off_immediate(self):
1977 """ turn auth off without disturbing session state (immediate) """
1978 key = self.factory.create_random_key(self)
1979 key.add_vpp_config()
1980 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1981 self.pg0.remote_ip4, sha1_key=key)
1982 self.vpp_session.add_vpp_config()
1983 self.test_session = BFDTestSession(
1984 self, self.pg0, AF_INET, sha1_key=key,
1985 bfd_key_id=self.vpp_session.bfd_key_id)
1986 bfd_session_up(self)
1987 # self.vapi.want_bfd_events(enable_disable=0)
1988 for dummy in range(self.test_session.detect_mult * 2):
1989 p = wait_for_bfd_packet(self)
1990 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1991 self.test_session.inc_seq_num()
1992 self.test_session.send_packet()
1993 self.vpp_session.deactivate_auth()
1994 self.test_session.bfd_key_id = None
1995 self.test_session.sha1_key = None
1996 for dummy in range(self.test_session.detect_mult * 2):
1997 p = wait_for_bfd_packet(self)
1998 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1999 self.test_session.inc_seq_num()
2000 self.test_session.send_packet()
2001 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2002 self.assert_equal(len(self.vapi.collect_events()), 0,
2003 "number of bfd events")
2005 def test_auth_change_key_immediate(self):
2006 """ change auth key without disturbing session state (immediate) """
2007 key1 = self.factory.create_random_key(self)
2008 key1.add_vpp_config()
2009 key2 = self.factory.create_random_key(self)
2010 key2.add_vpp_config()
2011 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2012 self.pg0.remote_ip4, sha1_key=key1)
2013 self.vpp_session.add_vpp_config()
2014 self.test_session = BFDTestSession(
2015 self, self.pg0, AF_INET, sha1_key=key1,
2016 bfd_key_id=self.vpp_session.bfd_key_id)
2017 bfd_session_up(self)
2018 for dummy in range(self.test_session.detect_mult * 2):
2019 p = wait_for_bfd_packet(self)
2020 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2021 self.test_session.send_packet()
2022 self.vpp_session.activate_auth(key2)
2023 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2024 self.test_session.sha1_key = key2
2025 for dummy in range(self.test_session.detect_mult * 2):
2026 p = wait_for_bfd_packet(self)
2027 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2028 self.test_session.send_packet()
2029 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2030 self.assert_equal(len(self.vapi.collect_events()), 0,
2031 "number of bfd events")
2033 def test_auth_on_delayed(self):
2034 """ turn auth on without disturbing session state (delayed) """
2035 key = self.factory.create_random_key(self)
2036 key.add_vpp_config()
2037 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2038 self.pg0.remote_ip4)
2039 self.vpp_session.add_vpp_config()
2040 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2041 bfd_session_up(self)
2042 for dummy in range(self.test_session.detect_mult * 2):
2043 wait_for_bfd_packet(self)
2044 self.test_session.send_packet()
2045 self.vpp_session.activate_auth(key, delayed=True)
2046 for dummy in range(self.test_session.detect_mult * 2):
2047 p = wait_for_bfd_packet(self)
2048 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2049 self.test_session.send_packet()
2050 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2051 self.test_session.sha1_key = key
2052 self.test_session.send_packet()
2053 for dummy in range(self.test_session.detect_mult * 2):
2054 p = wait_for_bfd_packet(self)
2055 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2056 self.test_session.send_packet()
2057 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2058 self.assert_equal(len(self.vapi.collect_events()), 0,
2059 "number of bfd events")
2061 def test_auth_off_delayed(self):
2062 """ turn auth off without disturbing session state (delayed) """
2063 key = self.factory.create_random_key(self)
2064 key.add_vpp_config()
2065 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2066 self.pg0.remote_ip4, sha1_key=key)
2067 self.vpp_session.add_vpp_config()
2068 self.test_session = BFDTestSession(
2069 self, self.pg0, AF_INET, sha1_key=key,
2070 bfd_key_id=self.vpp_session.bfd_key_id)
2071 bfd_session_up(self)
2072 for dummy in range(self.test_session.detect_mult * 2):
2073 p = wait_for_bfd_packet(self)
2074 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2075 self.test_session.send_packet()
2076 self.vpp_session.deactivate_auth(delayed=True)
2077 for dummy in range(self.test_session.detect_mult * 2):
2078 p = wait_for_bfd_packet(self)
2079 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2080 self.test_session.send_packet()
2081 self.test_session.bfd_key_id = None
2082 self.test_session.sha1_key = None
2083 self.test_session.send_packet()
2084 for dummy in range(self.test_session.detect_mult * 2):
2085 p = wait_for_bfd_packet(self)
2086 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2087 self.test_session.send_packet()
2088 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2089 self.assert_equal(len(self.vapi.collect_events()), 0,
2090 "number of bfd events")
2092 def test_auth_change_key_delayed(self):
2093 """ change auth key without disturbing session state (delayed) """
2094 key1 = self.factory.create_random_key(self)
2095 key1.add_vpp_config()
2096 key2 = self.factory.create_random_key(self)
2097 key2.add_vpp_config()
2098 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2099 self.pg0.remote_ip4, sha1_key=key1)
2100 self.vpp_session.add_vpp_config()
2101 self.vpp_session.admin_up()
2102 self.test_session = BFDTestSession(
2103 self, self.pg0, AF_INET, sha1_key=key1,
2104 bfd_key_id=self.vpp_session.bfd_key_id)
2105 bfd_session_up(self)
2106 for dummy in range(self.test_session.detect_mult * 2):
2107 p = wait_for_bfd_packet(self)
2108 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2109 self.test_session.send_packet()
2110 self.vpp_session.activate_auth(key2, delayed=True)
2111 for dummy in range(self.test_session.detect_mult * 2):
2112 p = wait_for_bfd_packet(self)
2113 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2114 self.test_session.send_packet()
2115 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2116 self.test_session.sha1_key = key2
2117 self.test_session.send_packet()
2118 for dummy in range(self.test_session.detect_mult * 2):
2119 p = wait_for_bfd_packet(self)
2120 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2121 self.test_session.send_packet()
2122 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2123 self.assert_equal(len(self.vapi.collect_events()), 0,
2124 "number of bfd events")
2127 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2128 class BFDCLITestCase(VppTestCase):
2129 """Bidirectional Forwarding Detection (BFD) (CLI) """
2133 def setUpClass(cls):
2134 super(BFDCLITestCase, cls).setUpClass()
2137 cls.create_pg_interfaces((0,))
2138 cls.pg0.config_ip4()
2139 cls.pg0.config_ip6()
2140 cls.pg0.resolve_arp()
2141 cls.pg0.resolve_ndp()
2144 super(BFDCLITestCase, cls).tearDownClass()
2148 super(BFDCLITestCase, self).setUp()
2149 self.factory = AuthKeyFactory()
2150 self.pg0.enable_capture()
2154 self.vapi.want_bfd_events(enable_disable=0)
2155 except UnexpectedApiReturnValueError:
2156 # some tests aren't subscribed, so this is not an issue
2158 self.vapi.collect_events() # clear the event queue
2159 super(BFDCLITestCase, self).tearDown()
2161 def cli_verify_no_response(self, cli):
2162 """ execute a CLI, asserting that the response is empty """
2163 self.assert_equal(self.vapi.cli(cli),
2165 "CLI command response")
2167 def cli_verify_response(self, cli, expected):
2168 """ execute a CLI, asserting that the response matches expectation """
2169 self.assert_equal(self.vapi.cli(cli).strip(),
2171 "CLI command response")
2173 def test_show(self):
2174 """ show commands """
2175 k1 = self.factory.create_random_key(self)
2177 k2 = self.factory.create_random_key(
2178 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2180 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2182 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2185 self.logger.info(self.vapi.ppcli("show bfd keys"))
2186 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2187 self.logger.info(self.vapi.ppcli("show bfd"))
2189 def test_set_del_sha1_key(self):
2190 """ set/delete SHA1 auth key """
2191 k = self.factory.create_random_key(self)
2192 self.registry.register(k, self.logger)
2193 self.cli_verify_no_response(
2194 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2196 "".join("{:02x}".format(ord(c)) for c in k.key)))
2197 self.assertTrue(k.query_vpp_config())
2198 self.vpp_session = VppBFDUDPSession(
2199 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2200 self.vpp_session.add_vpp_config()
2201 self.test_session = \
2202 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2203 bfd_key_id=self.vpp_session.bfd_key_id)
2204 self.vapi.want_bfd_events()
2205 bfd_session_up(self)
2206 bfd_session_down(self)
2207 # try to replace the secret for the key - should fail because the key
2209 k2 = self.factory.create_random_key(self)
2210 self.cli_verify_response(
2211 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2213 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2214 "bfd key set: `bfd_auth_set_key' API call failed, "
2215 "rv=-103:BFD object in use")
2216 # manipulating the session using old secret should still work
2217 bfd_session_up(self)
2218 bfd_session_down(self)
2219 self.vpp_session.remove_vpp_config()
2220 self.cli_verify_no_response(
2221 "bfd key del conf-key-id %s" % k.conf_key_id)
2222 self.assertFalse(k.query_vpp_config())
2224 def test_set_del_meticulous_sha1_key(self):
2225 """ set/delete meticulous SHA1 auth key """
2226 k = self.factory.create_random_key(
2227 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2228 self.registry.register(k, self.logger)
2229 self.cli_verify_no_response(
2230 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2232 "".join("{:02x}".format(ord(c)) for c in k.key)))
2233 self.assertTrue(k.query_vpp_config())
2234 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2235 self.pg0.remote_ip6, af=AF_INET6,
2237 self.vpp_session.add_vpp_config()
2238 self.vpp_session.admin_up()
2239 self.test_session = \
2240 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2241 bfd_key_id=self.vpp_session.bfd_key_id)
2242 self.vapi.want_bfd_events()
2243 bfd_session_up(self)
2244 bfd_session_down(self)
2245 # try to replace the secret for the key - should fail because the key
2247 k2 = self.factory.create_random_key(self)
2248 self.cli_verify_response(
2249 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2251 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2252 "bfd key set: `bfd_auth_set_key' API call failed, "
2253 "rv=-103:BFD object in use")
2254 # manipulating the session using old secret should still work
2255 bfd_session_up(self)
2256 bfd_session_down(self)
2257 self.vpp_session.remove_vpp_config()
2258 self.cli_verify_no_response(
2259 "bfd key del conf-key-id %s" % k.conf_key_id)
2260 self.assertFalse(k.query_vpp_config())
2262 def test_add_mod_del_bfd_udp(self):
2263 """ create/modify/delete IPv4 BFD UDP session """
2264 vpp_session = VppBFDUDPSession(
2265 self, self.pg0, self.pg0.remote_ip4)
2266 self.registry.register(vpp_session, self.logger)
2267 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2268 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2269 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2270 self.pg0.remote_ip4,
2271 vpp_session.desired_min_tx,
2272 vpp_session.required_min_rx,
2273 vpp_session.detect_mult)
2274 self.cli_verify_no_response(cli_add_cmd)
2275 # 2nd add should fail
2276 self.cli_verify_response(
2278 "bfd udp session add: `bfd_add_add_session' API call"
2279 " failed, rv=-101:Duplicate BFD object")
2280 verify_bfd_session_config(self, vpp_session)
2281 mod_session = VppBFDUDPSession(
2282 self, self.pg0, self.pg0.remote_ip4,
2283 required_min_rx=2 * vpp_session.required_min_rx,
2284 desired_min_tx=3 * vpp_session.desired_min_tx,
2285 detect_mult=4 * vpp_session.detect_mult)
2286 self.cli_verify_no_response(
2287 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2288 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2289 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2290 mod_session.desired_min_tx, mod_session.required_min_rx,
2291 mod_session.detect_mult))
2292 verify_bfd_session_config(self, mod_session)
2293 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2294 "peer-addr %s" % (self.pg0.name,
2295 self.pg0.local_ip4, self.pg0.remote_ip4)
2296 self.cli_verify_no_response(cli_del_cmd)
2297 # 2nd del is expected to fail
2298 self.cli_verify_response(
2299 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2300 " failed, rv=-102:No such BFD object")
2301 self.assertFalse(vpp_session.query_vpp_config())
2303 def test_add_mod_del_bfd_udp6(self):
2304 """ create/modify/delete IPv6 BFD UDP session """
2305 vpp_session = VppBFDUDPSession(
2306 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2307 self.registry.register(vpp_session, self.logger)
2308 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2309 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2310 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2311 self.pg0.remote_ip6,
2312 vpp_session.desired_min_tx,
2313 vpp_session.required_min_rx,
2314 vpp_session.detect_mult)
2315 self.cli_verify_no_response(cli_add_cmd)
2316 # 2nd add should fail
2317 self.cli_verify_response(
2319 "bfd udp session add: `bfd_add_add_session' API call"
2320 " failed, rv=-101:Duplicate BFD object")
2321 verify_bfd_session_config(self, vpp_session)
2322 mod_session = VppBFDUDPSession(
2323 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2324 required_min_rx=2 * vpp_session.required_min_rx,
2325 desired_min_tx=3 * vpp_session.desired_min_tx,
2326 detect_mult=4 * vpp_session.detect_mult)
2327 self.cli_verify_no_response(
2328 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2329 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2330 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2331 mod_session.desired_min_tx,
2332 mod_session.required_min_rx, mod_session.detect_mult))
2333 verify_bfd_session_config(self, mod_session)
2334 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2335 "peer-addr %s" % (self.pg0.name,
2336 self.pg0.local_ip6, self.pg0.remote_ip6)
2337 self.cli_verify_no_response(cli_del_cmd)
2338 # 2nd del is expected to fail
2339 self.cli_verify_response(
2341 "bfd udp session del: `bfd_udp_del_session' API call"
2342 " failed, rv=-102:No such BFD object")
2343 self.assertFalse(vpp_session.query_vpp_config())
2345 def test_add_mod_del_bfd_udp_auth(self):
2346 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2347 key = self.factory.create_random_key(self)
2348 key.add_vpp_config()
2349 vpp_session = VppBFDUDPSession(
2350 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2351 self.registry.register(vpp_session, self.logger)
2352 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2353 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2354 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2355 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2356 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2357 vpp_session.detect_mult, key.conf_key_id,
2358 vpp_session.bfd_key_id)
2359 self.cli_verify_no_response(cli_add_cmd)
2360 # 2nd add should fail
2361 self.cli_verify_response(
2363 "bfd udp session add: `bfd_add_add_session' API call"
2364 " failed, rv=-101:Duplicate BFD object")
2365 verify_bfd_session_config(self, vpp_session)
2366 mod_session = VppBFDUDPSession(
2367 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2368 bfd_key_id=vpp_session.bfd_key_id,
2369 required_min_rx=2 * vpp_session.required_min_rx,
2370 desired_min_tx=3 * vpp_session.desired_min_tx,
2371 detect_mult=4 * vpp_session.detect_mult)
2372 self.cli_verify_no_response(
2373 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2374 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2375 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2376 mod_session.desired_min_tx,
2377 mod_session.required_min_rx, mod_session.detect_mult))
2378 verify_bfd_session_config(self, mod_session)
2379 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2380 "peer-addr %s" % (self.pg0.name,
2381 self.pg0.local_ip4, self.pg0.remote_ip4)
2382 self.cli_verify_no_response(cli_del_cmd)
2383 # 2nd del is expected to fail
2384 self.cli_verify_response(
2386 "bfd udp session del: `bfd_udp_del_session' API call"
2387 " failed, rv=-102:No such BFD object")
2388 self.assertFalse(vpp_session.query_vpp_config())
2390 def test_add_mod_del_bfd_udp6_auth(self):
2391 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2392 key = self.factory.create_random_key(
2393 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2394 key.add_vpp_config()
2395 vpp_session = VppBFDUDPSession(
2396 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2397 self.registry.register(vpp_session, self.logger)
2398 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2399 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2400 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2401 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2402 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2403 vpp_session.detect_mult, key.conf_key_id,
2404 vpp_session.bfd_key_id)
2405 self.cli_verify_no_response(cli_add_cmd)
2406 # 2nd add should fail
2407 self.cli_verify_response(
2409 "bfd udp session add: `bfd_add_add_session' API call"
2410 " failed, rv=-101:Duplicate BFD object")
2411 verify_bfd_session_config(self, vpp_session)
2412 mod_session = VppBFDUDPSession(
2413 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2414 bfd_key_id=vpp_session.bfd_key_id,
2415 required_min_rx=2 * vpp_session.required_min_rx,
2416 desired_min_tx=3 * vpp_session.desired_min_tx,
2417 detect_mult=4 * vpp_session.detect_mult)
2418 self.cli_verify_no_response(
2419 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2420 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2421 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2422 mod_session.desired_min_tx,
2423 mod_session.required_min_rx, mod_session.detect_mult))
2424 verify_bfd_session_config(self, mod_session)
2425 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2426 "peer-addr %s" % (self.pg0.name,
2427 self.pg0.local_ip6, self.pg0.remote_ip6)
2428 self.cli_verify_no_response(cli_del_cmd)
2429 # 2nd del is expected to fail
2430 self.cli_verify_response(
2432 "bfd udp session del: `bfd_udp_del_session' API call"
2433 " failed, rv=-102:No such BFD object")
2434 self.assertFalse(vpp_session.query_vpp_config())
2436 def test_auth_on_off(self):
2437 """ turn authentication on and off """
2438 key = self.factory.create_random_key(
2439 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2440 key.add_vpp_config()
2441 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2442 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2444 session.add_vpp_config()
2446 "bfd udp session auth activate interface %s local-addr %s "\
2447 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2448 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2449 key.conf_key_id, auth_session.bfd_key_id)
2450 self.cli_verify_no_response(cli_activate)
2451 verify_bfd_session_config(self, auth_session)
2452 self.cli_verify_no_response(cli_activate)
2453 verify_bfd_session_config(self, auth_session)
2455 "bfd udp session auth deactivate interface %s local-addr %s "\
2457 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2458 self.cli_verify_no_response(cli_deactivate)
2459 verify_bfd_session_config(self, session)
2460 self.cli_verify_no_response(cli_deactivate)
2461 verify_bfd_session_config(self, session)
2463 def test_auth_on_off_delayed(self):
2464 """ turn authentication on and off (delayed) """
2465 key = self.factory.create_random_key(
2466 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2467 key.add_vpp_config()
2468 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2469 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2471 session.add_vpp_config()
2473 "bfd udp session auth activate interface %s local-addr %s "\
2474 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2475 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2476 key.conf_key_id, auth_session.bfd_key_id)
2477 self.cli_verify_no_response(cli_activate)
2478 verify_bfd_session_config(self, auth_session)
2479 self.cli_verify_no_response(cli_activate)
2480 verify_bfd_session_config(self, auth_session)
2482 "bfd udp session auth deactivate interface %s local-addr %s "\
2483 "peer-addr %s delayed yes"\
2484 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2485 self.cli_verify_no_response(cli_deactivate)
2486 verify_bfd_session_config(self, session)
2487 self.cli_verify_no_response(cli_deactivate)
2488 verify_bfd_session_config(self, session)
2490 def test_admin_up_down(self):
2491 """ put session admin-up and admin-down """
2492 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2493 session.add_vpp_config()
2495 "bfd udp session set-flags admin down interface %s local-addr %s "\
2497 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2499 "bfd udp session set-flags admin up interface %s local-addr %s "\
2501 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2502 self.cli_verify_no_response(cli_down)
2503 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2504 self.cli_verify_no_response(cli_up)
2505 verify_bfd_session_config(self, session, state=BFDState.down)
2507 def test_set_del_udp_echo_source(self):
2508 """ set/del udp echo source """
2509 self.create_loopback_interfaces([0])
2510 self.loopback0 = self.lo_interfaces[0]
2511 self.loopback0.admin_up()
2512 self.cli_verify_response("show bfd echo-source",
2513 "UDP echo source is not set.")
2514 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2515 self.cli_verify_no_response(cli_set)
2516 self.cli_verify_response("show bfd echo-source",
2517 "UDP echo source is: %s\n"
2518 "IPv4 address usable as echo source: none\n"
2519 "IPv6 address usable as echo source: none" %
2520 self.loopback0.name)
2521 self.loopback0.config_ip4()
2522 unpacked = unpack("!L", self.loopback0.local_ip4n)
2523 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2524 self.cli_verify_response("show bfd echo-source",
2525 "UDP echo source is: %s\n"
2526 "IPv4 address usable as echo source: %s\n"
2527 "IPv6 address usable as echo source: none" %
2528 (self.loopback0.name, echo_ip4))
2529 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2530 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2531 unpacked[2], unpacked[3] ^ 1))
2532 self.loopback0.config_ip6()
2533 self.cli_verify_response("show bfd echo-source",
2534 "UDP echo source is: %s\n"
2535 "IPv4 address usable as echo source: %s\n"
2536 "IPv6 address usable as echo source: %s" %
2537 (self.loopback0.name, echo_ip4, echo_ip6))
2538 cli_del = "bfd udp echo-source del"
2539 self.cli_verify_no_response(cli_del)
2540 self.cli_verify_response("show bfd echo-source",
2541 "UDP echo source is not set.")
2543 if __name__ == '__main__':
2544 unittest.main(testRunner=VppTestRunner)