4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError
21 from vpp_papi_provider import UnexpectedApiReturnValueError
26 class AuthKeyFactory(object):
27 """Factory class for creating auth keys with unique conf key ID"""
30 self._conf_key_ids = {}
32 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
33 """ create a random key with unique conf key id """
34 conf_key_id = randint(0, 0xFFFFFFFF)
35 while conf_key_id in self._conf_key_ids:
36 conf_key_id = randint(0, 0xFFFFFFFF)
37 self._conf_key_ids[conf_key_id] = 1
38 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
39 return VppBFDAuthKey(test=test, auth_type=auth_type,
40 conf_key_id=conf_key_id, key=key)
43 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
44 class BFDAPITestCase(VppTestCase):
45 """Bidirectional Forwarding Detection (BFD) - API"""
52 super(BFDAPITestCase, cls).setUpClass()
55 cls.create_pg_interfaces(range(2))
56 for i in cls.pg_interfaces:
62 super(BFDAPITestCase, cls).tearDownClass()
66 super(BFDAPITestCase, self).setUp()
67 self.factory = AuthKeyFactory()
69 def test_add_bfd(self):
70 """ create a BFD session """
71 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
72 session.add_vpp_config()
73 self.logger.debug("Session state is %s", session.state)
74 session.remove_vpp_config()
75 session.add_vpp_config()
76 self.logger.debug("Session state is %s", session.state)
77 session.remove_vpp_config()
79 def test_double_add(self):
80 """ create the same BFD session twice (negative case) """
81 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
82 session.add_vpp_config()
84 with self.vapi.expect_negative_api_retval():
85 session.add_vpp_config()
87 session.remove_vpp_config()
89 def test_add_bfd6(self):
90 """ create IPv6 BFD session """
91 session = VppBFDUDPSession(
92 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
93 session.add_vpp_config()
94 self.logger.debug("Session state is %s", session.state)
95 session.remove_vpp_config()
96 session.add_vpp_config()
97 self.logger.debug("Session state is %s", session.state)
98 session.remove_vpp_config()
100 def test_mod_bfd(self):
101 """ modify BFD session parameters """
102 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
103 desired_min_tx=50000,
104 required_min_rx=10000,
106 session.add_vpp_config()
107 s = session.get_bfd_udp_session_dump_entry()
108 self.assert_equal(session.desired_min_tx,
110 "desired min transmit interval")
111 self.assert_equal(session.required_min_rx,
113 "required min receive interval")
114 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
115 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
116 required_min_rx=session.required_min_rx * 2,
117 detect_mult=session.detect_mult * 2)
118 s = session.get_bfd_udp_session_dump_entry()
119 self.assert_equal(session.desired_min_tx,
121 "desired min transmit interval")
122 self.assert_equal(session.required_min_rx,
124 "required min receive interval")
125 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
127 def test_add_sha1_keys(self):
128 """ add SHA1 keys """
130 keys = [self.factory.create_random_key(
131 self) for i in range(0, key_count)]
133 self.assertFalse(key.query_vpp_config())
137 self.assertTrue(key.query_vpp_config())
139 indexes = range(key_count)
144 key.remove_vpp_config()
146 for j in range(key_count):
149 self.assertFalse(key.query_vpp_config())
151 self.assertTrue(key.query_vpp_config())
152 # should be removed now
154 self.assertFalse(key.query_vpp_config())
155 # add back and remove again
159 self.assertTrue(key.query_vpp_config())
161 key.remove_vpp_config()
163 self.assertFalse(key.query_vpp_config())
165 def test_add_bfd_sha1(self):
166 """ create a BFD session (SHA1) """
167 key = self.factory.create_random_key(self)
169 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
171 session.add_vpp_config()
172 self.logger.debug("Session state is %s", session.state)
173 session.remove_vpp_config()
174 session.add_vpp_config()
175 self.logger.debug("Session state is %s", session.state)
176 session.remove_vpp_config()
178 def test_double_add_sha1(self):
179 """ create the same BFD session twice (negative case) (SHA1) """
180 key = self.factory.create_random_key(self)
182 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184 session.add_vpp_config()
185 with self.assertRaises(Exception):
186 session.add_vpp_config()
188 def test_add_auth_nonexistent_key(self):
189 """ create BFD session using non-existent SHA1 (negative case) """
190 session = VppBFDUDPSession(
191 self, self.pg0, self.pg0.remote_ip4,
192 sha1_key=self.factory.create_random_key(self))
193 with self.assertRaises(Exception):
194 session.add_vpp_config()
196 def test_shared_sha1_key(self):
197 """ share single SHA1 key between multiple BFD sessions """
198 key = self.factory.create_random_key(self)
201 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
203 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
204 sha1_key=key, af=AF_INET6),
205 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
207 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
208 sha1_key=key, af=AF_INET6)]
213 e = key.get_bfd_auth_keys_dump_entry()
214 self.assert_equal(e.use_count, len(sessions) - removed,
215 "Use count for shared key")
216 s.remove_vpp_config()
218 e = key.get_bfd_auth_keys_dump_entry()
219 self.assert_equal(e.use_count, len(sessions) - removed,
220 "Use count for shared key")
222 def test_activate_auth(self):
223 """ activate SHA1 authentication """
224 key = self.factory.create_random_key(self)
226 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
227 session.add_vpp_config()
228 session.activate_auth(key)
230 def test_deactivate_auth(self):
231 """ deactivate SHA1 authentication """
232 key = self.factory.create_random_key(self)
234 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
235 session.add_vpp_config()
236 session.activate_auth(key)
237 session.deactivate_auth()
239 def test_change_key(self):
240 """ change SHA1 key """
241 key1 = self.factory.create_random_key(self)
242 key2 = self.factory.create_random_key(self)
243 while key2.conf_key_id == key1.conf_key_id:
244 key2 = self.factory.create_random_key(self)
245 key1.add_vpp_config()
246 key2.add_vpp_config()
247 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
249 session.add_vpp_config()
250 session.activate_auth(key2)
253 class BFDTestSession(object):
254 """ BFD session as seen from test framework side """
256 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
257 bfd_key_id=None, our_seq_number=None):
260 self.sha1_key = sha1_key
261 self.bfd_key_id = bfd_key_id
262 self.interface = interface
263 self.udp_sport = randint(49152, 65535)
264 if our_seq_number is None:
265 self.our_seq_number = randint(0, 40000000)
267 self.our_seq_number = our_seq_number
268 self.vpp_seq_number = None
269 self.my_discriminator = 0
270 self.desired_min_tx = 100000
271 self.required_min_rx = 100000
272 self.required_min_echo_rx = None
273 self.detect_mult = detect_mult
274 self.diag = BFDDiagCode.no_diagnostic
275 self.your_discriminator = None
276 self.state = BFDState.down
277 self.auth_type = BFDAuthType.no_auth
279 def inc_seq_num(self):
280 """ increment sequence number, wrapping if needed """
281 if self.our_seq_number == 0xFFFFFFFF:
282 self.our_seq_number = 0
284 self.our_seq_number += 1
286 def update(self, my_discriminator=None, your_discriminator=None,
287 desired_min_tx=None, required_min_rx=None,
288 required_min_echo_rx=None, detect_mult=None,
289 diag=None, state=None, auth_type=None):
290 """ update BFD parameters associated with session """
291 if my_discriminator is not None:
292 self.my_discriminator = my_discriminator
293 if your_discriminator is not None:
294 self.your_discriminator = your_discriminator
295 if required_min_rx is not None:
296 self.required_min_rx = required_min_rx
297 if required_min_echo_rx is not None:
298 self.required_min_echo_rx = required_min_echo_rx
299 if desired_min_tx is not None:
300 self.desired_min_tx = desired_min_tx
301 if detect_mult is not None:
302 self.detect_mult = detect_mult
305 if state is not None:
307 if auth_type is not None:
308 self.auth_type = auth_type
310 def fill_packet_fields(self, packet):
311 """ set packet fields with known values in packet """
313 if self.my_discriminator:
314 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
315 self.my_discriminator)
316 bfd.my_discriminator = self.my_discriminator
317 if self.your_discriminator:
318 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
319 self.your_discriminator)
320 bfd.your_discriminator = self.your_discriminator
321 if self.required_min_rx:
322 self.test.logger.debug(
323 "BFD: setting packet.required_min_rx_interval=%s",
324 self.required_min_rx)
325 bfd.required_min_rx_interval = self.required_min_rx
326 if self.required_min_echo_rx:
327 self.test.logger.debug(
328 "BFD: setting packet.required_min_echo_rx=%s",
329 self.required_min_echo_rx)
330 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
331 if self.desired_min_tx:
332 self.test.logger.debug(
333 "BFD: setting packet.desired_min_tx_interval=%s",
335 bfd.desired_min_tx_interval = self.desired_min_tx
337 self.test.logger.debug(
338 "BFD: setting packet.detect_mult=%s", self.detect_mult)
339 bfd.detect_mult = self.detect_mult
341 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
344 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
345 bfd.state = self.state
347 # this is used by a negative test-case
348 self.test.logger.debug("BFD: setting packet.auth_type=%s",
350 bfd.auth_type = self.auth_type
352 def create_packet(self):
353 """ create a BFD packet, reflecting the current state of session """
356 bfd.auth_type = self.sha1_key.auth_type
357 bfd.auth_len = BFD.sha1_auth_len
358 bfd.auth_key_id = self.bfd_key_id
359 bfd.auth_seq_num = self.our_seq_number
360 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
363 if self.af == AF_INET6:
364 packet = (Ether(src=self.interface.remote_mac,
365 dst=self.interface.local_mac) /
366 IPv6(src=self.interface.remote_ip6,
367 dst=self.interface.local_ip6,
369 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
372 packet = (Ether(src=self.interface.remote_mac,
373 dst=self.interface.local_mac) /
374 IP(src=self.interface.remote_ip4,
375 dst=self.interface.local_ip4,
377 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
379 self.test.logger.debug("BFD: Creating packet")
380 self.fill_packet_fields(packet)
382 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
383 "\0" * (20 - len(self.sha1_key.key))
384 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
385 hashlib.sha1(hash_material).hexdigest())
386 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
389 def send_packet(self, packet=None, interface=None):
390 """ send packet on interface, creating the packet if needed """
392 packet = self.create_packet()
393 if interface is None:
394 interface = self.test.pg0
395 self.test.logger.debug(ppp("Sending packet:", packet))
396 interface.add_stream(packet)
399 def verify_sha1_auth(self, packet):
400 """ Verify correctness of authentication in BFD layer. """
402 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
403 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
405 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
406 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
407 if self.vpp_seq_number is None:
408 self.vpp_seq_number = bfd.auth_seq_num
409 self.test.logger.debug("Received initial sequence number: %s" %
412 recvd_seq_num = bfd.auth_seq_num
413 self.test.logger.debug("Received followup sequence number: %s" %
415 if self.vpp_seq_number < 0xffffffff:
416 if self.sha1_key.auth_type == \
417 BFDAuthType.meticulous_keyed_sha1:
418 self.test.assert_equal(recvd_seq_num,
419 self.vpp_seq_number + 1,
420 "BFD sequence number")
422 self.test.assert_in_range(recvd_seq_num,
424 self.vpp_seq_number + 1,
425 "BFD sequence number")
427 if self.sha1_key.auth_type == \
428 BFDAuthType.meticulous_keyed_sha1:
429 self.test.assert_equal(recvd_seq_num, 0,
430 "BFD sequence number")
432 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
433 "BFD sequence number not one of "
434 "(%s, 0)" % self.vpp_seq_number)
435 self.vpp_seq_number = recvd_seq_num
436 # last 20 bytes represent the hash - so replace them with the key,
437 # pad the result with zeros and hash the result
438 hash_material = bfd.original[:-20] + self.sha1_key.key + \
439 "\0" * (20 - len(self.sha1_key.key))
440 expected_hash = hashlib.sha1(hash_material).hexdigest()
441 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
442 expected_hash, "Auth key hash")
444 def verify_bfd(self, packet):
445 """ Verify correctness of BFD layer. """
447 self.test.assert_equal(bfd.version, 1, "BFD version")
448 self.test.assert_equal(bfd.your_discriminator,
449 self.my_discriminator,
450 "BFD - your discriminator")
452 self.verify_sha1_auth(packet)
455 def bfd_session_up(test):
456 """ Bring BFD session up """
457 test.logger.info("BFD: Waiting for slow hello")
458 p = wait_for_bfd_packet(test, 2)
460 if hasattr(test, 'vpp_clock_offset'):
461 old_offset = test.vpp_clock_offset
462 test.vpp_clock_offset = time.time() - p.time
463 test.logger.debug("BFD: Calculated vpp clock offset: %s",
464 test.vpp_clock_offset)
466 test.assertAlmostEqual(
467 old_offset, test.vpp_clock_offset, delta=0.5,
468 msg="vpp clock offset not stable (new: %s, old: %s)" %
469 (test.vpp_clock_offset, old_offset))
470 test.logger.info("BFD: Sending Init")
471 test.test_session.update(my_discriminator=randint(0, 40000000),
472 your_discriminator=p[BFD].my_discriminator,
474 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
475 BFDAuthType.meticulous_keyed_sha1:
476 test.test_session.inc_seq_num()
477 test.test_session.send_packet()
478 test.logger.info("BFD: Waiting for event")
479 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
480 verify_event(test, e, expected_state=BFDState.up)
481 test.logger.info("BFD: Session is Up")
482 test.test_session.update(state=BFDState.up)
483 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
484 BFDAuthType.meticulous_keyed_sha1:
485 test.test_session.inc_seq_num()
486 test.test_session.send_packet()
487 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
490 def bfd_session_down(test):
491 """ Bring BFD session down """
492 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
493 test.test_session.update(state=BFDState.down)
494 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
495 BFDAuthType.meticulous_keyed_sha1:
496 test.test_session.inc_seq_num()
497 test.test_session.send_packet()
498 test.logger.info("BFD: Waiting for event")
499 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
500 verify_event(test, e, expected_state=BFDState.down)
501 test.logger.info("BFD: Session is Down")
502 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
505 def verify_bfd_session_config(test, session, state=None):
506 dump = session.get_bfd_udp_session_dump_entry()
507 test.assertIsNotNone(dump)
508 # since dump is not none, we have verified that sw_if_index and addresses
509 # are valid (in get_bfd_udp_session_dump_entry)
511 test.assert_equal(dump.state, state, "session state")
512 test.assert_equal(dump.required_min_rx, session.required_min_rx,
513 "required min rx interval")
514 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
515 "desired min tx interval")
516 test.assert_equal(dump.detect_mult, session.detect_mult,
518 if session.sha1_key is None:
519 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
521 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
522 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
524 test.assert_equal(dump.conf_key_id,
525 session.sha1_key.conf_key_id,
529 def verify_ip(test, packet):
530 """ Verify correctness of IP layer. """
531 if test.vpp_session.af == AF_INET6:
533 local_ip = test.pg0.local_ip6
534 remote_ip = test.pg0.remote_ip6
535 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
538 local_ip = test.pg0.local_ip4
539 remote_ip = test.pg0.remote_ip4
540 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
541 test.assert_equal(ip.src, local_ip, "IP source address")
542 test.assert_equal(ip.dst, remote_ip, "IP destination address")
545 def verify_udp(test, packet):
546 """ Verify correctness of UDP layer. """
548 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
549 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
553 def verify_event(test, event, expected_state):
554 """ Verify correctness of event values. """
556 test.logger.debug("BFD: Event: %s" % repr(e))
557 test.assert_equal(e.sw_if_index,
558 test.vpp_session.interface.sw_if_index,
559 "BFD interface index")
561 if test.vpp_session.af == AF_INET6:
563 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
564 if test.vpp_session.af == AF_INET:
565 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
566 "Local IPv4 address")
567 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
570 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
571 "Local IPv6 address")
572 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
574 test.assert_equal(e.state, expected_state, BFDState)
577 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
578 """ wait for BFD packet and verify its correctness
580 :param timeout: how long to wait
581 :param pcap_time_min: ignore packets with pcap timestamp lower than this
583 :returns: tuple (packet, time spent waiting for packet)
585 test.logger.info("BFD: Waiting for BFD packet")
586 deadline = time.time() + timeout
591 test.assert_in_range(counter, 0, 100, "number of packets ignored")
592 time_left = deadline - time.time()
594 raise CaptureTimeoutError("Packet did not arrive within timeout")
595 p = test.pg0.wait_for_packet(timeout=time_left)
596 test.logger.debug(ppp("BFD: Got packet:", p))
597 if pcap_time_min is not None and p.time < pcap_time_min:
598 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
599 "pcap time min %s):" %
600 (p.time, pcap_time_min), p))
605 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
607 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
610 test.test_session.verify_bfd(p)
614 class BFD4TestCase(VppTestCase):
615 """Bidirectional Forwarding Detection (BFD)"""
618 vpp_clock_offset = None
624 super(BFD4TestCase, cls).setUpClass()
626 cls.create_pg_interfaces([0])
627 cls.create_loopback_interfaces([0])
628 cls.loopback0 = cls.lo_interfaces[0]
629 cls.loopback0.config_ip4()
630 cls.loopback0.admin_up()
632 cls.pg0.configure_ipv4_neighbors()
634 cls.pg0.resolve_arp()
637 super(BFD4TestCase, cls).tearDownClass()
641 super(BFD4TestCase, self).setUp()
642 self.factory = AuthKeyFactory()
643 self.vapi.want_bfd_events()
644 self.pg0.enable_capture()
646 self.vpp_session = VppBFDUDPSession(self, self.pg0,
648 self.vpp_session.add_vpp_config()
649 self.vpp_session.admin_up()
650 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
652 self.vapi.want_bfd_events(enable_disable=0)
656 if not self.vpp_dead:
657 self.vapi.want_bfd_events(enable_disable=0)
658 self.vapi.collect_events() # clear the event queue
659 super(BFD4TestCase, self).tearDown()
661 def test_session_up(self):
662 """ bring BFD session up """
665 def test_session_up_by_ip(self):
666 """ bring BFD session up - first frame looked up by address pair """
667 self.logger.info("BFD: Sending Slow control frame")
668 self.test_session.update(my_discriminator=randint(0, 40000000))
669 self.test_session.send_packet()
670 self.pg0.enable_capture()
671 p = self.pg0.wait_for_packet(1)
672 self.assert_equal(p[BFD].your_discriminator,
673 self.test_session.my_discriminator,
674 "BFD - your discriminator")
675 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
676 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
678 self.logger.info("BFD: Waiting for event")
679 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
680 verify_event(self, e, expected_state=BFDState.init)
681 self.logger.info("BFD: Sending Up")
682 self.test_session.send_packet()
683 self.logger.info("BFD: Waiting for event")
684 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
685 verify_event(self, e, expected_state=BFDState.up)
686 self.logger.info("BFD: Session is Up")
687 self.test_session.update(state=BFDState.up)
688 self.test_session.send_packet()
689 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
691 def test_session_down(self):
692 """ bring BFD session down """
694 bfd_session_down(self)
696 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
697 def test_hold_up(self):
698 """ hold BFD session up """
700 for dummy in range(self.test_session.detect_mult * 2):
701 wait_for_bfd_packet(self)
702 self.test_session.send_packet()
703 self.assert_equal(len(self.vapi.collect_events()), 0,
704 "number of bfd events")
706 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
707 def test_slow_timer(self):
708 """ verify slow periodic control frames while session down """
710 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
711 prev_packet = wait_for_bfd_packet(self, 2)
712 for dummy in range(packet_count):
713 next_packet = wait_for_bfd_packet(self, 2)
714 time_diff = next_packet.time - prev_packet.time
715 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
716 # to work around timing issues
717 self.assert_in_range(
718 time_diff, 0.70, 1.05, "time between slow packets")
719 prev_packet = next_packet
721 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
722 def test_zero_remote_min_rx(self):
723 """ no packets when zero remote required min rx interval """
725 self.test_session.update(required_min_rx=0)
726 self.test_session.send_packet()
727 for dummy in range(self.test_session.detect_mult):
728 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
729 "sleep before transmitting bfd packet")
730 self.test_session.send_packet()
732 p = wait_for_bfd_packet(self, timeout=0)
733 self.logger.error(ppp("Received unexpected packet:", p))
734 except CaptureTimeoutError:
737 len(self.vapi.collect_events()), 0, "number of bfd events")
738 self.test_session.update(required_min_rx=100000)
739 for dummy in range(3):
740 self.test_session.send_packet()
742 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
744 len(self.vapi.collect_events()), 0, "number of bfd events")
746 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
747 def test_conn_down(self):
748 """ verify session goes down after inactivity """
750 detection_time = self.test_session.detect_mult *\
751 self.vpp_session.required_min_rx / USEC_IN_SEC
752 self.sleep(detection_time, "waiting for BFD session time-out")
753 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
754 verify_event(self, e, expected_state=BFDState.down)
756 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
757 def test_large_required_min_rx(self):
758 """ large remote required min rx interval """
760 p = wait_for_bfd_packet(self)
762 self.test_session.update(required_min_rx=interval)
763 self.test_session.send_packet()
764 time_mark = time.time()
766 # busy wait here, trying to collect a packet or event, vpp is not
767 # allowed to send packets and the session will timeout first - so the
768 # Up->Down event must arrive before any packets do
769 while time.time() < time_mark + interval / USEC_IN_SEC:
771 p = wait_for_bfd_packet(self, timeout=0)
772 # if vpp managed to send a packet before we did the session
773 # session update, then that's fine, ignore it
774 if p.time < time_mark - self.vpp_clock_offset:
776 self.logger.error(ppp("Received unexpected packet:", p))
778 except CaptureTimeoutError:
780 events = self.vapi.collect_events()
782 verify_event(self, events[0], BFDState.down)
784 self.assert_equal(count, 0, "number of packets received")
786 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
787 def test_immediate_remote_min_rx_reduction(self):
788 """ immediately honor remote required min rx reduction """
789 self.vpp_session.remove_vpp_config()
790 self.vpp_session = VppBFDUDPSession(
791 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
792 self.pg0.enable_capture()
793 self.vpp_session.add_vpp_config()
794 self.test_session.update(desired_min_tx=1000000,
795 required_min_rx=1000000)
797 reference_packet = wait_for_bfd_packet(self)
798 time_mark = time.time()
800 self.test_session.update(required_min_rx=interval)
801 self.test_session.send_packet()
802 extra_time = time.time() - time_mark
803 p = wait_for_bfd_packet(self)
804 # first packet is allowed to be late by time we spent doing the update
805 # calculated in extra_time
806 self.assert_in_range(p.time - reference_packet.time,
807 .95 * 0.75 * interval / USEC_IN_SEC,
808 1.05 * interval / USEC_IN_SEC + extra_time,
809 "time between BFD packets")
811 for dummy in range(3):
812 p = wait_for_bfd_packet(self)
813 diff = p.time - reference_packet.time
814 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
815 1.05 * interval / USEC_IN_SEC,
816 "time between BFD packets")
819 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
820 def test_modify_req_min_rx_double(self):
821 """ modify session - double required min rx """
823 p = wait_for_bfd_packet(self)
824 self.test_session.update(desired_min_tx=10000,
825 required_min_rx=10000)
826 self.test_session.send_packet()
827 # double required min rx
828 self.vpp_session.modify_parameters(
829 required_min_rx=2 * self.vpp_session.required_min_rx)
830 p = wait_for_bfd_packet(
831 self, pcap_time_min=time.time() - self.vpp_clock_offset)
832 # poll bit needs to be set
833 self.assertIn("P", p.sprintf("%BFD.flags%"),
834 "Poll bit not set in BFD packet")
835 # finish poll sequence with final packet
836 final = self.test_session.create_packet()
837 final[BFD].flags = "F"
838 timeout = self.test_session.detect_mult * \
839 max(self.test_session.desired_min_tx,
840 self.vpp_session.required_min_rx) / USEC_IN_SEC
841 self.test_session.send_packet(final)
842 time_mark = time.time()
843 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
844 verify_event(self, e, expected_state=BFDState.down)
845 time_to_event = time.time() - time_mark
846 self.assert_in_range(time_to_event, .9 * timeout,
847 1.1 * timeout, "session timeout")
849 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
850 def test_modify_req_min_rx_halve(self):
851 """ modify session - halve required min rx """
852 self.vpp_session.modify_parameters(
853 required_min_rx=2 * self.vpp_session.required_min_rx)
855 p = wait_for_bfd_packet(self)
856 self.test_session.update(desired_min_tx=10000,
857 required_min_rx=10000)
858 self.test_session.send_packet()
859 p = wait_for_bfd_packet(
860 self, pcap_time_min=time.time() - self.vpp_clock_offset)
861 # halve required min rx
862 old_required_min_rx = self.vpp_session.required_min_rx
863 self.vpp_session.modify_parameters(
864 required_min_rx=0.5 * self.vpp_session.required_min_rx)
865 # now we wait 0.8*3*old-req-min-rx and the session should still be up
866 self.sleep(0.8 * self.vpp_session.detect_mult *
867 old_required_min_rx / USEC_IN_SEC)
868 self.assert_equal(len(self.vapi.collect_events()), 0,
869 "number of bfd events")
870 p = wait_for_bfd_packet(self)
871 # poll bit needs to be set
872 self.assertIn("P", p.sprintf("%BFD.flags%"),
873 "Poll bit not set in BFD packet")
874 # finish poll sequence with final packet
875 final = self.test_session.create_packet()
876 final[BFD].flags = "F"
877 self.test_session.send_packet(final)
878 # now the session should time out under new conditions
880 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
882 detection_time = self.test_session.detect_mult *\
883 self.vpp_session.required_min_rx / USEC_IN_SEC
884 self.assert_in_range(after - before,
885 0.9 * detection_time,
886 1.1 * detection_time,
887 "time before bfd session goes down")
888 verify_event(self, e, expected_state=BFDState.down)
890 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
891 def test_modify_detect_mult(self):
892 """ modify detect multiplier """
894 p = wait_for_bfd_packet(self)
895 self.vpp_session.modify_parameters(detect_mult=1)
896 p = wait_for_bfd_packet(
897 self, pcap_time_min=time.time() - self.vpp_clock_offset)
898 self.assert_equal(self.vpp_session.detect_mult,
901 # poll bit must not be set
902 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
903 "Poll bit not set in BFD packet")
904 self.vpp_session.modify_parameters(detect_mult=10)
905 p = wait_for_bfd_packet(
906 self, pcap_time_min=time.time() - self.vpp_clock_offset)
907 self.assert_equal(self.vpp_session.detect_mult,
910 # poll bit must not be set
911 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
912 "Poll bit not set in BFD packet")
914 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
915 def test_queued_poll(self):
916 """ test poll sequence queueing """
918 p = wait_for_bfd_packet(self)
919 self.vpp_session.modify_parameters(
920 required_min_rx=2 * self.vpp_session.required_min_rx)
921 p = wait_for_bfd_packet(self)
922 poll_sequence_start = time.time()
923 poll_sequence_length_min = 0.5
924 send_final_after = time.time() + poll_sequence_length_min
925 # poll bit needs to be set
926 self.assertIn("P", p.sprintf("%BFD.flags%"),
927 "Poll bit not set in BFD packet")
928 self.assert_equal(p[BFD].required_min_rx_interval,
929 self.vpp_session.required_min_rx,
930 "BFD required min rx interval")
931 self.vpp_session.modify_parameters(
932 required_min_rx=2 * self.vpp_session.required_min_rx)
933 # 2nd poll sequence should be queued now
934 # don't send the reply back yet, wait for some time to emulate
935 # longer round-trip time
937 while time.time() < send_final_after:
938 self.test_session.send_packet()
939 p = wait_for_bfd_packet(self)
940 self.assert_equal(len(self.vapi.collect_events()), 0,
941 "number of bfd events")
942 self.assert_equal(p[BFD].required_min_rx_interval,
943 self.vpp_session.required_min_rx,
944 "BFD required min rx interval")
946 # poll bit must be set
947 self.assertIn("P", p.sprintf("%BFD.flags%"),
948 "Poll bit not set in BFD packet")
949 final = self.test_session.create_packet()
950 final[BFD].flags = "F"
951 self.test_session.send_packet(final)
952 # finish 1st with final
953 poll_sequence_length = time.time() - poll_sequence_start
954 # vpp must wait for some time before starting new poll sequence
955 poll_no_2_started = False
956 for dummy in range(2 * packet_count):
957 p = wait_for_bfd_packet(self)
958 self.assert_equal(len(self.vapi.collect_events()), 0,
959 "number of bfd events")
960 if "P" in p.sprintf("%BFD.flags%"):
961 poll_no_2_started = True
962 if time.time() < poll_sequence_start + poll_sequence_length:
963 raise Exception("VPP started 2nd poll sequence too soon")
964 final = self.test_session.create_packet()
965 final[BFD].flags = "F"
966 self.test_session.send_packet(final)
969 self.test_session.send_packet()
970 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
971 # finish 2nd with final
972 final = self.test_session.create_packet()
973 final[BFD].flags = "F"
974 self.test_session.send_packet(final)
975 p = wait_for_bfd_packet(self)
976 # poll bit must not be set
977 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
978 "Poll bit set in BFD packet")
980 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
981 def test_poll_response(self):
982 """ test correct response to control frame with poll bit set """
984 poll = self.test_session.create_packet()
985 poll[BFD].flags = "P"
986 self.test_session.send_packet(poll)
987 final = wait_for_bfd_packet(
988 self, pcap_time_min=time.time() - self.vpp_clock_offset)
989 self.assertIn("F", final.sprintf("%BFD.flags%"))
991 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
992 def test_no_periodic_if_remote_demand(self):
993 """ no periodic frames outside poll sequence if remote demand set """
995 demand = self.test_session.create_packet()
996 demand[BFD].flags = "D"
997 self.test_session.send_packet(demand)
998 transmit_time = 0.9 \
999 * max(self.vpp_session.required_min_rx,
1000 self.test_session.desired_min_tx) \
1003 for dummy in range(self.test_session.detect_mult * 2):
1004 time.sleep(transmit_time)
1005 self.test_session.send_packet(demand)
1007 p = wait_for_bfd_packet(self, timeout=0)
1008 self.logger.error(ppp("Received unexpected packet:", p))
1010 except CaptureTimeoutError:
1012 events = self.vapi.collect_events()
1014 self.logger.error("Received unexpected event: %s", e)
1015 self.assert_equal(count, 0, "number of packets received")
1016 self.assert_equal(len(events), 0, "number of events received")
1018 def test_echo_looped_back(self):
1019 """ echo packets looped back """
1020 # don't need a session in this case..
1021 self.vpp_session.remove_vpp_config()
1022 self.pg0.enable_capture()
1023 echo_packet_count = 10
1024 # random source port low enough to increment a few times..
1025 udp_sport_tx = randint(1, 50000)
1026 udp_sport_rx = udp_sport_tx
1027 echo_packet = (Ether(src=self.pg0.remote_mac,
1028 dst=self.pg0.local_mac) /
1029 IP(src=self.pg0.remote_ip4,
1030 dst=self.pg0.remote_ip4) /
1031 UDP(dport=BFD.udp_dport_echo) /
1032 Raw("this should be looped back"))
1033 for dummy in range(echo_packet_count):
1034 self.sleep(.01, "delay between echo packets")
1035 echo_packet[UDP].sport = udp_sport_tx
1037 self.logger.debug(ppp("Sending packet:", echo_packet))
1038 self.pg0.add_stream(echo_packet)
1040 for dummy in range(echo_packet_count):
1041 p = self.pg0.wait_for_packet(1)
1042 self.logger.debug(ppp("Got packet:", p))
1044 self.assert_equal(self.pg0.remote_mac,
1045 ether.dst, "Destination MAC")
1046 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1048 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1049 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1051 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1052 "UDP destination port")
1053 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1055 # need to compare the hex payload here, otherwise BFD_vpp_echo
1057 self.assertEqual(str(p[UDP].payload),
1058 str(echo_packet[UDP].payload),
1059 "Received packet is not the echo packet sent")
1060 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1061 "ECHO packet identifier for test purposes)")
1063 def test_echo(self):
1064 """ echo function """
1065 bfd_session_up(self)
1066 self.test_session.update(required_min_echo_rx=50000)
1067 self.test_session.send_packet()
1068 detection_time = self.test_session.detect_mult *\
1069 self.vpp_session.required_min_rx / USEC_IN_SEC
1070 # echo shouldn't work without echo source set
1071 for dummy in range(3):
1072 sleep = 0.75 * detection_time
1073 self.sleep(sleep, "delay before sending bfd packet")
1074 self.test_session.send_packet()
1075 p = wait_for_bfd_packet(
1076 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1077 self.assert_equal(p[BFD].required_min_rx_interval,
1078 self.vpp_session.required_min_rx,
1079 "BFD required min rx interval")
1080 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1081 # should be turned on - loopback echo packets
1082 for dummy in range(3):
1083 loop_until = time.time() + 0.75 * detection_time
1084 while time.time() < loop_until:
1085 p = self.pg0.wait_for_packet(1)
1086 self.logger.debug(ppp("Got packet:", p))
1087 if p[UDP].dport == BFD.udp_dport_echo:
1089 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1090 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1091 "BFD ECHO src IP equal to loopback IP")
1092 self.logger.debug(ppp("Looping back packet:", p))
1093 self.pg0.add_stream(p)
1095 elif p.haslayer(BFD):
1096 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1098 if "P" in p.sprintf("%BFD.flags%"):
1099 final = self.test_session.create_packet()
1100 final[BFD].flags = "F"
1101 self.test_session.send_packet(final)
1103 raise Exception(ppp("Received unknown packet:", p))
1105 self.assert_equal(len(self.vapi.collect_events()), 0,
1106 "number of bfd events")
1107 self.test_session.send_packet()
1109 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1110 def test_echo_fail(self):
1111 """ session goes down if echo function fails """
1112 bfd_session_up(self)
1113 self.test_session.update(required_min_echo_rx=50000)
1114 self.test_session.send_packet()
1115 detection_time = self.test_session.detect_mult *\
1116 self.vpp_session.required_min_rx / USEC_IN_SEC
1117 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1118 # echo function should be used now, but we will drop the echo packets
1119 verified_diag = False
1120 for dummy in range(3):
1121 loop_until = time.time() + 0.75 * detection_time
1122 while time.time() < loop_until:
1123 p = self.pg0.wait_for_packet(1)
1124 self.logger.debug(ppp("Got packet:", p))
1125 if p[UDP].dport == BFD.udp_dport_echo:
1128 elif p.haslayer(BFD):
1129 if "P" in p.sprintf("%BFD.flags%"):
1130 self.assertGreaterEqual(
1131 p[BFD].required_min_rx_interval,
1133 final = self.test_session.create_packet()
1134 final[BFD].flags = "F"
1135 self.test_session.send_packet(final)
1136 if p[BFD].state == BFDState.down:
1137 self.assert_equal(p[BFD].diag,
1138 BFDDiagCode.echo_function_failed,
1140 verified_diag = True
1142 raise Exception(ppp("Received unknown packet:", p))
1143 self.test_session.send_packet()
1144 events = self.vapi.collect_events()
1145 self.assert_equal(len(events), 1, "number of bfd events")
1146 self.assert_equal(events[0].state, BFDState.down, BFDState)
1147 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1149 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1150 def test_echo_stop(self):
1151 """ echo function stops if peer sets required min echo rx zero """
1152 bfd_session_up(self)
1153 self.test_session.update(required_min_echo_rx=50000)
1154 self.test_session.send_packet()
1155 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1156 # wait for first echo packet
1158 p = self.pg0.wait_for_packet(1)
1159 self.logger.debug(ppp("Got packet:", p))
1160 if p[UDP].dport == BFD.udp_dport_echo:
1161 self.logger.debug(ppp("Looping back packet:", p))
1162 self.pg0.add_stream(p)
1165 elif p.haslayer(BFD):
1169 raise Exception(ppp("Received unknown packet:", p))
1170 self.test_session.update(required_min_echo_rx=0)
1171 self.test_session.send_packet()
1172 # echo packets shouldn't arrive anymore
1173 for dummy in range(5):
1174 wait_for_bfd_packet(
1175 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1176 self.test_session.send_packet()
1177 events = self.vapi.collect_events()
1178 self.assert_equal(len(events), 0, "number of bfd events")
1180 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1181 def test_echo_source_removed(self):
1182 """ echo function stops if echo source is removed """
1183 bfd_session_up(self)
1184 self.test_session.update(required_min_echo_rx=50000)
1185 self.test_session.send_packet()
1186 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1187 # wait for first echo packet
1189 p = self.pg0.wait_for_packet(1)
1190 self.logger.debug(ppp("Got packet:", p))
1191 if p[UDP].dport == BFD.udp_dport_echo:
1192 self.logger.debug(ppp("Looping back packet:", p))
1193 self.pg0.add_stream(p)
1196 elif p.haslayer(BFD):
1200 raise Exception(ppp("Received unknown packet:", p))
1201 self.vapi.bfd_udp_del_echo_source()
1202 self.test_session.send_packet()
1203 # echo packets shouldn't arrive anymore
1204 for dummy in range(5):
1205 wait_for_bfd_packet(
1206 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1207 self.test_session.send_packet()
1208 events = self.vapi.collect_events()
1209 self.assert_equal(len(events), 0, "number of bfd events")
1211 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1212 def test_stale_echo(self):
1213 """ stale echo packets don't keep a session up """
1214 bfd_session_up(self)
1215 self.test_session.update(required_min_echo_rx=50000)
1216 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1217 self.test_session.send_packet()
1218 # should be turned on - loopback echo packets
1222 for dummy in range(10 * self.vpp_session.detect_mult):
1223 p = self.pg0.wait_for_packet(1)
1224 if p[UDP].dport == BFD.udp_dport_echo:
1225 if echo_packet is None:
1226 self.logger.debug(ppp("Got first echo packet:", p))
1228 timeout_at = time.time() + self.vpp_session.detect_mult * \
1229 self.test_session.required_min_echo_rx / USEC_IN_SEC
1231 self.logger.debug(ppp("Got followup echo packet:", p))
1232 self.logger.debug(ppp("Looping back first echo packet:", p))
1233 self.pg0.add_stream(echo_packet)
1235 elif p.haslayer(BFD):
1236 self.logger.debug(ppp("Got packet:", p))
1237 if "P" in p.sprintf("%BFD.flags%"):
1238 final = self.test_session.create_packet()
1239 final[BFD].flags = "F"
1240 self.test_session.send_packet(final)
1241 if p[BFD].state == BFDState.down:
1242 self.assertIsNotNone(
1244 "Session went down before first echo packet received")
1246 self.assertGreaterEqual(
1248 "Session timeout at %s, but is expected at %s" %
1250 self.assert_equal(p[BFD].diag,
1251 BFDDiagCode.echo_function_failed,
1253 events = self.vapi.collect_events()
1254 self.assert_equal(len(events), 1, "number of bfd events")
1255 self.assert_equal(events[0].state, BFDState.down, BFDState)
1259 raise Exception(ppp("Received unknown packet:", p))
1260 self.test_session.send_packet()
1261 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1263 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1264 def test_invalid_echo_checksum(self):
1265 """ echo packets with invalid checksum don't keep a session up """
1266 bfd_session_up(self)
1267 self.test_session.update(required_min_echo_rx=50000)
1268 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1269 self.test_session.send_packet()
1270 # should be turned on - loopback echo packets
1273 for dummy in range(10 * self.vpp_session.detect_mult):
1274 p = self.pg0.wait_for_packet(1)
1275 if p[UDP].dport == BFD.udp_dport_echo:
1276 self.logger.debug(ppp("Got echo packet:", p))
1277 if timeout_at is None:
1278 timeout_at = time.time() + self.vpp_session.detect_mult * \
1279 self.test_session.required_min_echo_rx / USEC_IN_SEC
1280 p[BFD_vpp_echo].checksum = getrandbits(64)
1281 self.logger.debug(ppp("Looping back modified echo packet:", p))
1282 self.pg0.add_stream(p)
1284 elif p.haslayer(BFD):
1285 self.logger.debug(ppp("Got packet:", p))
1286 if "P" in p.sprintf("%BFD.flags%"):
1287 final = self.test_session.create_packet()
1288 final[BFD].flags = "F"
1289 self.test_session.send_packet(final)
1290 if p[BFD].state == BFDState.down:
1291 self.assertIsNotNone(
1293 "Session went down before first echo packet received")
1295 self.assertGreaterEqual(
1297 "Session timeout at %s, but is expected at %s" %
1299 self.assert_equal(p[BFD].diag,
1300 BFDDiagCode.echo_function_failed,
1302 events = self.vapi.collect_events()
1303 self.assert_equal(len(events), 1, "number of bfd events")
1304 self.assert_equal(events[0].state, BFDState.down, BFDState)
1308 raise Exception(ppp("Received unknown packet:", p))
1309 self.test_session.send_packet()
1310 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1312 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1313 def test_admin_up_down(self):
1314 """ put session admin-up and admin-down """
1315 bfd_session_up(self)
1316 self.vpp_session.admin_down()
1317 self.pg0.enable_capture()
1318 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1319 verify_event(self, e, expected_state=BFDState.admin_down)
1320 for dummy in range(2):
1321 p = wait_for_bfd_packet(self)
1322 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1323 # try to bring session up - shouldn't be possible
1324 self.test_session.update(state=BFDState.init)
1325 self.test_session.send_packet()
1326 for dummy in range(2):
1327 p = wait_for_bfd_packet(self)
1328 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1329 self.vpp_session.admin_up()
1330 self.test_session.update(state=BFDState.down)
1331 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1332 verify_event(self, e, expected_state=BFDState.down)
1333 p = wait_for_bfd_packet(
1334 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1335 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1336 self.test_session.send_packet()
1337 p = wait_for_bfd_packet(
1338 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1339 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1340 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1341 verify_event(self, e, expected_state=BFDState.init)
1342 self.test_session.update(state=BFDState.up)
1343 self.test_session.send_packet()
1344 p = wait_for_bfd_packet(
1345 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1346 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1347 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1348 verify_event(self, e, expected_state=BFDState.up)
1350 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1351 def test_config_change_remote_demand(self):
1352 """ configuration change while peer in demand mode """
1353 bfd_session_up(self)
1354 demand = self.test_session.create_packet()
1355 demand[BFD].flags = "D"
1356 self.test_session.send_packet(demand)
1357 self.vpp_session.modify_parameters(
1358 required_min_rx=2 * self.vpp_session.required_min_rx)
1359 p = wait_for_bfd_packet(
1360 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1361 # poll bit must be set
1362 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1363 # terminate poll sequence
1364 final = self.test_session.create_packet()
1365 final[BFD].flags = "D+F"
1366 self.test_session.send_packet(final)
1367 # vpp should be quiet now again
1368 transmit_time = 0.9 \
1369 * max(self.vpp_session.required_min_rx,
1370 self.test_session.desired_min_tx) \
1373 for dummy in range(self.test_session.detect_mult * 2):
1374 time.sleep(transmit_time)
1375 self.test_session.send_packet(demand)
1377 p = wait_for_bfd_packet(self, timeout=0)
1378 self.logger.error(ppp("Received unexpected packet:", p))
1380 except CaptureTimeoutError:
1382 events = self.vapi.collect_events()
1384 self.logger.error("Received unexpected event: %s", e)
1385 self.assert_equal(count, 0, "number of packets received")
1386 self.assert_equal(len(events), 0, "number of events received")
1389 class BFD6TestCase(VppTestCase):
1390 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1393 vpp_clock_offset = None
1398 def setUpClass(cls):
1399 super(BFD6TestCase, cls).setUpClass()
1401 cls.create_pg_interfaces([0])
1402 cls.pg0.config_ip6()
1403 cls.pg0.configure_ipv6_neighbors()
1405 cls.pg0.resolve_ndp()
1406 cls.create_loopback_interfaces([0])
1407 cls.loopback0 = cls.lo_interfaces[0]
1408 cls.loopback0.config_ip6()
1409 cls.loopback0.admin_up()
1412 super(BFD6TestCase, cls).tearDownClass()
1416 super(BFD6TestCase, self).setUp()
1417 self.factory = AuthKeyFactory()
1418 self.vapi.want_bfd_events()
1419 self.pg0.enable_capture()
1421 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1422 self.pg0.remote_ip6,
1424 self.vpp_session.add_vpp_config()
1425 self.vpp_session.admin_up()
1426 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1427 self.logger.debug(self.vapi.cli("show adj nbr"))
1429 self.vapi.want_bfd_events(enable_disable=0)
1433 if not self.vpp_dead:
1434 self.vapi.want_bfd_events(enable_disable=0)
1435 self.vapi.collect_events() # clear the event queue
1436 super(BFD6TestCase, self).tearDown()
1438 def test_session_up(self):
1439 """ bring BFD session up """
1440 bfd_session_up(self)
1442 def test_session_up_by_ip(self):
1443 """ bring BFD session up - first frame looked up by address pair """
1444 self.logger.info("BFD: Sending Slow control frame")
1445 self.test_session.update(my_discriminator=randint(0, 40000000))
1446 self.test_session.send_packet()
1447 self.pg0.enable_capture()
1448 p = self.pg0.wait_for_packet(1)
1449 self.assert_equal(p[BFD].your_discriminator,
1450 self.test_session.my_discriminator,
1451 "BFD - your discriminator")
1452 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1453 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1455 self.logger.info("BFD: Waiting for event")
1456 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1457 verify_event(self, e, expected_state=BFDState.init)
1458 self.logger.info("BFD: Sending Up")
1459 self.test_session.send_packet()
1460 self.logger.info("BFD: Waiting for event")
1461 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1462 verify_event(self, e, expected_state=BFDState.up)
1463 self.logger.info("BFD: Session is Up")
1464 self.test_session.update(state=BFDState.up)
1465 self.test_session.send_packet()
1466 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1468 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1469 def test_hold_up(self):
1470 """ hold BFD session up """
1471 bfd_session_up(self)
1472 for dummy in range(self.test_session.detect_mult * 2):
1473 wait_for_bfd_packet(self)
1474 self.test_session.send_packet()
1475 self.assert_equal(len(self.vapi.collect_events()), 0,
1476 "number of bfd events")
1477 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1479 def test_echo_looped_back(self):
1480 """ echo packets looped back """
1481 # don't need a session in this case..
1482 self.vpp_session.remove_vpp_config()
1483 self.pg0.enable_capture()
1484 echo_packet_count = 10
1485 # random source port low enough to increment a few times..
1486 udp_sport_tx = randint(1, 50000)
1487 udp_sport_rx = udp_sport_tx
1488 echo_packet = (Ether(src=self.pg0.remote_mac,
1489 dst=self.pg0.local_mac) /
1490 IPv6(src=self.pg0.remote_ip6,
1491 dst=self.pg0.remote_ip6) /
1492 UDP(dport=BFD.udp_dport_echo) /
1493 Raw("this should be looped back"))
1494 for dummy in range(echo_packet_count):
1495 self.sleep(.01, "delay between echo packets")
1496 echo_packet[UDP].sport = udp_sport_tx
1498 self.logger.debug(ppp("Sending packet:", echo_packet))
1499 self.pg0.add_stream(echo_packet)
1501 for dummy in range(echo_packet_count):
1502 p = self.pg0.wait_for_packet(1)
1503 self.logger.debug(ppp("Got packet:", p))
1505 self.assert_equal(self.pg0.remote_mac,
1506 ether.dst, "Destination MAC")
1507 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1509 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1510 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1512 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1513 "UDP destination port")
1514 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1516 # need to compare the hex payload here, otherwise BFD_vpp_echo
1518 self.assertEqual(str(p[UDP].payload),
1519 str(echo_packet[UDP].payload),
1520 "Received packet is not the echo packet sent")
1521 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1522 "ECHO packet identifier for test purposes)")
1523 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1524 "ECHO packet identifier for test purposes)")
1526 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1527 def test_echo(self):
1528 """ echo function used """
1529 bfd_session_up(self)
1530 self.test_session.update(required_min_echo_rx=50000)
1531 self.test_session.send_packet()
1532 detection_time = self.test_session.detect_mult *\
1533 self.vpp_session.required_min_rx / USEC_IN_SEC
1534 # echo shouldn't work without echo source set
1535 for dummy in range(3):
1536 sleep = 0.75 * detection_time
1537 self.sleep(sleep, "delay before sending bfd packet")
1538 self.test_session.send_packet()
1539 p = wait_for_bfd_packet(
1540 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1541 self.assert_equal(p[BFD].required_min_rx_interval,
1542 self.vpp_session.required_min_rx,
1543 "BFD required min rx interval")
1544 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1545 # should be turned on - loopback echo packets
1546 for dummy in range(3):
1547 loop_until = time.time() + 0.75 * detection_time
1548 while time.time() < loop_until:
1549 p = self.pg0.wait_for_packet(1)
1550 self.logger.debug(ppp("Got packet:", p))
1551 if p[UDP].dport == BFD.udp_dport_echo:
1553 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1554 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1555 "BFD ECHO src IP equal to loopback IP")
1556 self.logger.debug(ppp("Looping back packet:", p))
1557 self.pg0.add_stream(p)
1559 elif p.haslayer(BFD):
1560 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1562 if "P" in p.sprintf("%BFD.flags%"):
1563 final = self.test_session.create_packet()
1564 final[BFD].flags = "F"
1565 self.test_session.send_packet(final)
1567 raise Exception(ppp("Received unknown packet:", p))
1569 self.assert_equal(len(self.vapi.collect_events()), 0,
1570 "number of bfd events")
1571 self.test_session.send_packet()
1574 class BFDSHA1TestCase(VppTestCase):
1575 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1578 vpp_clock_offset = None
1583 def setUpClass(cls):
1584 super(BFDSHA1TestCase, cls).setUpClass()
1586 cls.create_pg_interfaces([0])
1587 cls.pg0.config_ip4()
1589 cls.pg0.resolve_arp()
1592 super(BFDSHA1TestCase, cls).tearDownClass()
1596 super(BFDSHA1TestCase, self).setUp()
1597 self.factory = AuthKeyFactory()
1598 self.vapi.want_bfd_events()
1599 self.pg0.enable_capture()
1602 if not self.vpp_dead:
1603 self.vapi.want_bfd_events(enable_disable=0)
1604 self.vapi.collect_events() # clear the event queue
1605 super(BFDSHA1TestCase, self).tearDown()
1607 def test_session_up(self):
1608 """ bring BFD session up """
1609 key = self.factory.create_random_key(self)
1610 key.add_vpp_config()
1611 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1612 self.pg0.remote_ip4,
1614 self.vpp_session.add_vpp_config()
1615 self.vpp_session.admin_up()
1616 self.test_session = BFDTestSession(
1617 self, self.pg0, AF_INET, sha1_key=key,
1618 bfd_key_id=self.vpp_session.bfd_key_id)
1619 bfd_session_up(self)
1621 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1622 def test_hold_up(self):
1623 """ hold BFD session up """
1624 key = self.factory.create_random_key(self)
1625 key.add_vpp_config()
1626 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1627 self.pg0.remote_ip4,
1629 self.vpp_session.add_vpp_config()
1630 self.vpp_session.admin_up()
1631 self.test_session = BFDTestSession(
1632 self, self.pg0, AF_INET, sha1_key=key,
1633 bfd_key_id=self.vpp_session.bfd_key_id)
1634 bfd_session_up(self)
1635 for dummy in range(self.test_session.detect_mult * 2):
1636 wait_for_bfd_packet(self)
1637 self.test_session.send_packet()
1638 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1640 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1641 def test_hold_up_meticulous(self):
1642 """ hold BFD session up - meticulous auth """
1643 key = self.factory.create_random_key(
1644 self, BFDAuthType.meticulous_keyed_sha1)
1645 key.add_vpp_config()
1646 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1647 self.pg0.remote_ip4, sha1_key=key)
1648 self.vpp_session.add_vpp_config()
1649 self.vpp_session.admin_up()
1650 # specify sequence number so that it wraps
1651 self.test_session = BFDTestSession(
1652 self, self.pg0, AF_INET, sha1_key=key,
1653 bfd_key_id=self.vpp_session.bfd_key_id,
1654 our_seq_number=0xFFFFFFFF - 4)
1655 bfd_session_up(self)
1656 for dummy in range(30):
1657 wait_for_bfd_packet(self)
1658 self.test_session.inc_seq_num()
1659 self.test_session.send_packet()
1660 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1662 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1663 def test_send_bad_seq_number(self):
1664 """ session is not kept alive by msgs with bad sequence numbers"""
1665 key = self.factory.create_random_key(
1666 self, BFDAuthType.meticulous_keyed_sha1)
1667 key.add_vpp_config()
1668 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1669 self.pg0.remote_ip4, sha1_key=key)
1670 self.vpp_session.add_vpp_config()
1671 self.test_session = BFDTestSession(
1672 self, self.pg0, AF_INET, sha1_key=key,
1673 bfd_key_id=self.vpp_session.bfd_key_id)
1674 bfd_session_up(self)
1675 detection_time = self.test_session.detect_mult *\
1676 self.vpp_session.required_min_rx / USEC_IN_SEC
1677 send_until = time.time() + 2 * detection_time
1678 while time.time() < send_until:
1679 self.test_session.send_packet()
1680 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1681 "time between bfd packets")
1682 e = self.vapi.collect_events()
1683 # session should be down now, because the sequence numbers weren't
1685 self.assert_equal(len(e), 1, "number of bfd events")
1686 verify_event(self, e[0], expected_state=BFDState.down)
1688 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1689 legitimate_test_session,
1691 rogue_bfd_values=None):
1692 """ execute a rogue session interaction scenario
1694 1. create vpp session, add config
1695 2. bring the legitimate session up
1696 3. copy the bfd values from legitimate session to rogue session
1697 4. apply rogue_bfd_values to rogue session
1698 5. set rogue session state to down
1699 6. send message to take the session down from the rogue session
1700 7. assert that the legitimate session is unaffected
1703 self.vpp_session = vpp_bfd_udp_session
1704 self.vpp_session.add_vpp_config()
1705 self.test_session = legitimate_test_session
1706 # bring vpp session up
1707 bfd_session_up(self)
1708 # send packet from rogue session
1709 rogue_test_session.update(
1710 my_discriminator=self.test_session.my_discriminator,
1711 your_discriminator=self.test_session.your_discriminator,
1712 desired_min_tx=self.test_session.desired_min_tx,
1713 required_min_rx=self.test_session.required_min_rx,
1714 detect_mult=self.test_session.detect_mult,
1715 diag=self.test_session.diag,
1716 state=self.test_session.state,
1717 auth_type=self.test_session.auth_type)
1718 if rogue_bfd_values:
1719 rogue_test_session.update(**rogue_bfd_values)
1720 rogue_test_session.update(state=BFDState.down)
1721 rogue_test_session.send_packet()
1722 wait_for_bfd_packet(self)
1723 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1725 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1726 def test_mismatch_auth(self):
1727 """ session is not brought down by unauthenticated msg """
1728 key = self.factory.create_random_key(self)
1729 key.add_vpp_config()
1730 vpp_session = VppBFDUDPSession(
1731 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1732 legitimate_test_session = BFDTestSession(
1733 self, self.pg0, AF_INET, sha1_key=key,
1734 bfd_key_id=vpp_session.bfd_key_id)
1735 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1736 self.execute_rogue_session_scenario(vpp_session,
1737 legitimate_test_session,
1740 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1741 def test_mismatch_bfd_key_id(self):
1742 """ session is not brought down by msg with non-existent key-id """
1743 key = self.factory.create_random_key(self)
1744 key.add_vpp_config()
1745 vpp_session = VppBFDUDPSession(
1746 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1747 # pick a different random bfd key id
1749 while x == vpp_session.bfd_key_id:
1751 legitimate_test_session = BFDTestSession(
1752 self, self.pg0, AF_INET, sha1_key=key,
1753 bfd_key_id=vpp_session.bfd_key_id)
1754 rogue_test_session = BFDTestSession(
1755 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1756 self.execute_rogue_session_scenario(vpp_session,
1757 legitimate_test_session,
1760 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1761 def test_mismatched_auth_type(self):
1762 """ session is not brought down by msg with wrong auth type """
1763 key = self.factory.create_random_key(self)
1764 key.add_vpp_config()
1765 vpp_session = VppBFDUDPSession(
1766 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1767 legitimate_test_session = BFDTestSession(
1768 self, self.pg0, AF_INET, sha1_key=key,
1769 bfd_key_id=vpp_session.bfd_key_id)
1770 rogue_test_session = BFDTestSession(
1771 self, self.pg0, AF_INET, sha1_key=key,
1772 bfd_key_id=vpp_session.bfd_key_id)
1773 self.execute_rogue_session_scenario(
1774 vpp_session, legitimate_test_session, rogue_test_session,
1775 {'auth_type': BFDAuthType.keyed_md5})
1777 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1778 def test_restart(self):
1779 """ simulate remote peer restart and resynchronization """
1780 key = self.factory.create_random_key(
1781 self, BFDAuthType.meticulous_keyed_sha1)
1782 key.add_vpp_config()
1783 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1784 self.pg0.remote_ip4, sha1_key=key)
1785 self.vpp_session.add_vpp_config()
1786 self.test_session = BFDTestSession(
1787 self, self.pg0, AF_INET, sha1_key=key,
1788 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1789 bfd_session_up(self)
1790 # don't send any packets for 2*detection_time
1791 detection_time = self.test_session.detect_mult *\
1792 self.vpp_session.required_min_rx / USEC_IN_SEC
1793 self.sleep(2 * detection_time, "simulating peer restart")
1794 events = self.vapi.collect_events()
1795 self.assert_equal(len(events), 1, "number of bfd events")
1796 verify_event(self, events[0], expected_state=BFDState.down)
1797 self.test_session.update(state=BFDState.down)
1798 # reset sequence number
1799 self.test_session.our_seq_number = 0
1800 self.test_session.vpp_seq_number = None
1801 # now throw away any pending packets
1802 self.pg0.enable_capture()
1803 bfd_session_up(self)
1806 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1807 class BFDAuthOnOffTestCase(VppTestCase):
1808 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1815 def setUpClass(cls):
1816 super(BFDAuthOnOffTestCase, cls).setUpClass()
1818 cls.create_pg_interfaces([0])
1819 cls.pg0.config_ip4()
1821 cls.pg0.resolve_arp()
1824 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1828 super(BFDAuthOnOffTestCase, self).setUp()
1829 self.factory = AuthKeyFactory()
1830 self.vapi.want_bfd_events()
1831 self.pg0.enable_capture()
1834 if not self.vpp_dead:
1835 self.vapi.want_bfd_events(enable_disable=0)
1836 self.vapi.collect_events() # clear the event queue
1837 super(BFDAuthOnOffTestCase, self).tearDown()
1839 def test_auth_on_immediate(self):
1840 """ turn auth on without disturbing session state (immediate) """
1841 key = self.factory.create_random_key(self)
1842 key.add_vpp_config()
1843 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1844 self.pg0.remote_ip4)
1845 self.vpp_session.add_vpp_config()
1846 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1847 bfd_session_up(self)
1848 for dummy in range(self.test_session.detect_mult * 2):
1849 p = wait_for_bfd_packet(self)
1850 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1851 self.test_session.send_packet()
1852 self.vpp_session.activate_auth(key)
1853 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1854 self.test_session.sha1_key = key
1855 for dummy in range(self.test_session.detect_mult * 2):
1856 p = wait_for_bfd_packet(self)
1857 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1858 self.test_session.send_packet()
1859 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1860 self.assert_equal(len(self.vapi.collect_events()), 0,
1861 "number of bfd events")
1863 def test_auth_off_immediate(self):
1864 """ turn auth off without disturbing session state (immediate) """
1865 key = self.factory.create_random_key(self)
1866 key.add_vpp_config()
1867 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1868 self.pg0.remote_ip4, sha1_key=key)
1869 self.vpp_session.add_vpp_config()
1870 self.test_session = BFDTestSession(
1871 self, self.pg0, AF_INET, sha1_key=key,
1872 bfd_key_id=self.vpp_session.bfd_key_id)
1873 bfd_session_up(self)
1874 # self.vapi.want_bfd_events(enable_disable=0)
1875 for dummy in range(self.test_session.detect_mult * 2):
1876 p = wait_for_bfd_packet(self)
1877 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1878 self.test_session.inc_seq_num()
1879 self.test_session.send_packet()
1880 self.vpp_session.deactivate_auth()
1881 self.test_session.bfd_key_id = None
1882 self.test_session.sha1_key = None
1883 for dummy in range(self.test_session.detect_mult * 2):
1884 p = wait_for_bfd_packet(self)
1885 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1886 self.test_session.inc_seq_num()
1887 self.test_session.send_packet()
1888 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1889 self.assert_equal(len(self.vapi.collect_events()), 0,
1890 "number of bfd events")
1892 def test_auth_change_key_immediate(self):
1893 """ change auth key without disturbing session state (immediate) """
1894 key1 = self.factory.create_random_key(self)
1895 key1.add_vpp_config()
1896 key2 = self.factory.create_random_key(self)
1897 key2.add_vpp_config()
1898 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1899 self.pg0.remote_ip4, sha1_key=key1)
1900 self.vpp_session.add_vpp_config()
1901 self.test_session = BFDTestSession(
1902 self, self.pg0, AF_INET, sha1_key=key1,
1903 bfd_key_id=self.vpp_session.bfd_key_id)
1904 bfd_session_up(self)
1905 for dummy in range(self.test_session.detect_mult * 2):
1906 p = wait_for_bfd_packet(self)
1907 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1908 self.test_session.send_packet()
1909 self.vpp_session.activate_auth(key2)
1910 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1911 self.test_session.sha1_key = key2
1912 for dummy in range(self.test_session.detect_mult * 2):
1913 p = wait_for_bfd_packet(self)
1914 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1915 self.test_session.send_packet()
1916 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1917 self.assert_equal(len(self.vapi.collect_events()), 0,
1918 "number of bfd events")
1920 def test_auth_on_delayed(self):
1921 """ turn auth on without disturbing session state (delayed) """
1922 key = self.factory.create_random_key(self)
1923 key.add_vpp_config()
1924 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1925 self.pg0.remote_ip4)
1926 self.vpp_session.add_vpp_config()
1927 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1928 bfd_session_up(self)
1929 for dummy in range(self.test_session.detect_mult * 2):
1930 wait_for_bfd_packet(self)
1931 self.test_session.send_packet()
1932 self.vpp_session.activate_auth(key, delayed=True)
1933 for dummy in range(self.test_session.detect_mult * 2):
1934 p = wait_for_bfd_packet(self)
1935 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1936 self.test_session.send_packet()
1937 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1938 self.test_session.sha1_key = key
1939 self.test_session.send_packet()
1940 for dummy in range(self.test_session.detect_mult * 2):
1941 p = wait_for_bfd_packet(self)
1942 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1943 self.test_session.send_packet()
1944 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1945 self.assert_equal(len(self.vapi.collect_events()), 0,
1946 "number of bfd events")
1948 def test_auth_off_delayed(self):
1949 """ turn auth off without disturbing session state (delayed) """
1950 key = self.factory.create_random_key(self)
1951 key.add_vpp_config()
1952 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1953 self.pg0.remote_ip4, sha1_key=key)
1954 self.vpp_session.add_vpp_config()
1955 self.test_session = BFDTestSession(
1956 self, self.pg0, AF_INET, sha1_key=key,
1957 bfd_key_id=self.vpp_session.bfd_key_id)
1958 bfd_session_up(self)
1959 for dummy in range(self.test_session.detect_mult * 2):
1960 p = wait_for_bfd_packet(self)
1961 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1962 self.test_session.send_packet()
1963 self.vpp_session.deactivate_auth(delayed=True)
1964 for dummy in range(self.test_session.detect_mult * 2):
1965 p = wait_for_bfd_packet(self)
1966 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1967 self.test_session.send_packet()
1968 self.test_session.bfd_key_id = None
1969 self.test_session.sha1_key = None
1970 self.test_session.send_packet()
1971 for dummy in range(self.test_session.detect_mult * 2):
1972 p = wait_for_bfd_packet(self)
1973 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1974 self.test_session.send_packet()
1975 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1976 self.assert_equal(len(self.vapi.collect_events()), 0,
1977 "number of bfd events")
1979 def test_auth_change_key_delayed(self):
1980 """ change auth key without disturbing session state (delayed) """
1981 key1 = self.factory.create_random_key(self)
1982 key1.add_vpp_config()
1983 key2 = self.factory.create_random_key(self)
1984 key2.add_vpp_config()
1985 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1986 self.pg0.remote_ip4, sha1_key=key1)
1987 self.vpp_session.add_vpp_config()
1988 self.vpp_session.admin_up()
1989 self.test_session = BFDTestSession(
1990 self, self.pg0, AF_INET, sha1_key=key1,
1991 bfd_key_id=self.vpp_session.bfd_key_id)
1992 bfd_session_up(self)
1993 for dummy in range(self.test_session.detect_mult * 2):
1994 p = wait_for_bfd_packet(self)
1995 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1996 self.test_session.send_packet()
1997 self.vpp_session.activate_auth(key2, delayed=True)
1998 for dummy in range(self.test_session.detect_mult * 2):
1999 p = wait_for_bfd_packet(self)
2000 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2001 self.test_session.send_packet()
2002 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2003 self.test_session.sha1_key = key2
2004 self.test_session.send_packet()
2005 for dummy in range(self.test_session.detect_mult * 2):
2006 p = wait_for_bfd_packet(self)
2007 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2008 self.test_session.send_packet()
2009 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2010 self.assert_equal(len(self.vapi.collect_events()), 0,
2011 "number of bfd events")
2014 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2015 class BFDCLITestCase(VppTestCase):
2016 """Bidirectional Forwarding Detection (BFD) (CLI) """
2020 def setUpClass(cls):
2021 super(BFDCLITestCase, cls).setUpClass()
2024 cls.create_pg_interfaces((0,))
2025 cls.pg0.config_ip4()
2026 cls.pg0.config_ip6()
2027 cls.pg0.resolve_arp()
2028 cls.pg0.resolve_ndp()
2031 super(BFDCLITestCase, cls).tearDownClass()
2035 super(BFDCLITestCase, self).setUp()
2036 self.factory = AuthKeyFactory()
2037 self.pg0.enable_capture()
2041 self.vapi.want_bfd_events(enable_disable=0)
2042 except UnexpectedApiReturnValueError:
2043 # some tests aren't subscribed, so this is not an issue
2045 self.vapi.collect_events() # clear the event queue
2046 super(BFDCLITestCase, self).tearDown()
2048 def cli_verify_no_response(self, cli):
2049 """ execute a CLI, asserting that the response is empty """
2050 self.assert_equal(self.vapi.cli(cli),
2052 "CLI command response")
2054 def cli_verify_response(self, cli, expected):
2055 """ execute a CLI, asserting that the response matches expectation """
2056 self.assert_equal(self.vapi.cli(cli).strip(),
2058 "CLI command response")
2060 def test_show(self):
2061 """ show commands """
2062 k1 = self.factory.create_random_key(self)
2064 k2 = self.factory.create_random_key(
2065 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2067 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2069 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2072 self.logger.info(self.vapi.ppcli("show bfd keys"))
2073 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2074 self.logger.info(self.vapi.ppcli("show bfd"))
2076 def test_set_del_sha1_key(self):
2077 """ set/delete SHA1 auth key """
2078 k = self.factory.create_random_key(self)
2079 self.registry.register(k, self.logger)
2080 self.cli_verify_no_response(
2081 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2083 "".join("{:02x}".format(ord(c)) for c in k.key)))
2084 self.assertTrue(k.query_vpp_config())
2085 self.vpp_session = VppBFDUDPSession(
2086 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2087 self.vpp_session.add_vpp_config()
2088 self.test_session = \
2089 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2090 bfd_key_id=self.vpp_session.bfd_key_id)
2091 self.vapi.want_bfd_events()
2092 bfd_session_up(self)
2093 bfd_session_down(self)
2094 # try to replace the secret for the key - should fail because the key
2096 k2 = self.factory.create_random_key(self)
2097 self.cli_verify_response(
2098 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2100 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2101 "bfd key set: `bfd_auth_set_key' API call failed, "
2102 "rv=-103:BFD object in use")
2103 # manipulating the session using old secret should still work
2104 bfd_session_up(self)
2105 bfd_session_down(self)
2106 self.vpp_session.remove_vpp_config()
2107 self.cli_verify_no_response(
2108 "bfd key del conf-key-id %s" % k.conf_key_id)
2109 self.assertFalse(k.query_vpp_config())
2111 def test_set_del_meticulous_sha1_key(self):
2112 """ set/delete meticulous SHA1 auth key """
2113 k = self.factory.create_random_key(
2114 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2115 self.registry.register(k, self.logger)
2116 self.cli_verify_no_response(
2117 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2119 "".join("{:02x}".format(ord(c)) for c in k.key)))
2120 self.assertTrue(k.query_vpp_config())
2121 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2122 self.pg0.remote_ip6, af=AF_INET6,
2124 self.vpp_session.add_vpp_config()
2125 self.vpp_session.admin_up()
2126 self.test_session = \
2127 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2128 bfd_key_id=self.vpp_session.bfd_key_id)
2129 self.vapi.want_bfd_events()
2130 bfd_session_up(self)
2131 bfd_session_down(self)
2132 # try to replace the secret for the key - should fail because the key
2134 k2 = self.factory.create_random_key(self)
2135 self.cli_verify_response(
2136 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2138 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2139 "bfd key set: `bfd_auth_set_key' API call failed, "
2140 "rv=-103:BFD object in use")
2141 # manipulating the session using old secret should still work
2142 bfd_session_up(self)
2143 bfd_session_down(self)
2144 self.vpp_session.remove_vpp_config()
2145 self.cli_verify_no_response(
2146 "bfd key del conf-key-id %s" % k.conf_key_id)
2147 self.assertFalse(k.query_vpp_config())
2149 def test_add_mod_del_bfd_udp(self):
2150 """ create/modify/delete IPv4 BFD UDP session """
2151 vpp_session = VppBFDUDPSession(
2152 self, self.pg0, self.pg0.remote_ip4)
2153 self.registry.register(vpp_session, self.logger)
2154 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2155 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2156 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2157 self.pg0.remote_ip4,
2158 vpp_session.desired_min_tx,
2159 vpp_session.required_min_rx,
2160 vpp_session.detect_mult)
2161 self.cli_verify_no_response(cli_add_cmd)
2162 # 2nd add should fail
2163 self.cli_verify_response(
2165 "bfd udp session add: `bfd_add_add_session' API call"
2166 " failed, rv=-101:Duplicate BFD object")
2167 verify_bfd_session_config(self, vpp_session)
2168 mod_session = VppBFDUDPSession(
2169 self, self.pg0, self.pg0.remote_ip4,
2170 required_min_rx=2 * vpp_session.required_min_rx,
2171 desired_min_tx=3 * vpp_session.desired_min_tx,
2172 detect_mult=4 * vpp_session.detect_mult)
2173 self.cli_verify_no_response(
2174 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2175 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2176 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2177 mod_session.desired_min_tx, mod_session.required_min_rx,
2178 mod_session.detect_mult))
2179 verify_bfd_session_config(self, mod_session)
2180 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2181 "peer-addr %s" % (self.pg0.name,
2182 self.pg0.local_ip4, self.pg0.remote_ip4)
2183 self.cli_verify_no_response(cli_del_cmd)
2184 # 2nd del is expected to fail
2185 self.cli_verify_response(
2186 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2187 " failed, rv=-102:No such BFD object")
2188 self.assertFalse(vpp_session.query_vpp_config())
2190 def test_add_mod_del_bfd_udp6(self):
2191 """ create/modify/delete IPv6 BFD UDP session """
2192 vpp_session = VppBFDUDPSession(
2193 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2194 self.registry.register(vpp_session, self.logger)
2195 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2196 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2197 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2198 self.pg0.remote_ip6,
2199 vpp_session.desired_min_tx,
2200 vpp_session.required_min_rx,
2201 vpp_session.detect_mult)
2202 self.cli_verify_no_response(cli_add_cmd)
2203 # 2nd add should fail
2204 self.cli_verify_response(
2206 "bfd udp session add: `bfd_add_add_session' API call"
2207 " failed, rv=-101:Duplicate BFD object")
2208 verify_bfd_session_config(self, vpp_session)
2209 mod_session = VppBFDUDPSession(
2210 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2211 required_min_rx=2 * vpp_session.required_min_rx,
2212 desired_min_tx=3 * vpp_session.desired_min_tx,
2213 detect_mult=4 * vpp_session.detect_mult)
2214 self.cli_verify_no_response(
2215 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2216 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2217 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2218 mod_session.desired_min_tx,
2219 mod_session.required_min_rx, mod_session.detect_mult))
2220 verify_bfd_session_config(self, mod_session)
2221 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2222 "peer-addr %s" % (self.pg0.name,
2223 self.pg0.local_ip6, self.pg0.remote_ip6)
2224 self.cli_verify_no_response(cli_del_cmd)
2225 # 2nd del is expected to fail
2226 self.cli_verify_response(
2228 "bfd udp session del: `bfd_udp_del_session' API call"
2229 " failed, rv=-102:No such BFD object")
2230 self.assertFalse(vpp_session.query_vpp_config())
2232 def test_add_mod_del_bfd_udp_auth(self):
2233 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2234 key = self.factory.create_random_key(self)
2235 key.add_vpp_config()
2236 vpp_session = VppBFDUDPSession(
2237 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2238 self.registry.register(vpp_session, self.logger)
2239 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2240 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2241 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2242 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2243 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2244 vpp_session.detect_mult, key.conf_key_id,
2245 vpp_session.bfd_key_id)
2246 self.cli_verify_no_response(cli_add_cmd)
2247 # 2nd add should fail
2248 self.cli_verify_response(
2250 "bfd udp session add: `bfd_add_add_session' API call"
2251 " failed, rv=-101:Duplicate BFD object")
2252 verify_bfd_session_config(self, vpp_session)
2253 mod_session = VppBFDUDPSession(
2254 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2255 bfd_key_id=vpp_session.bfd_key_id,
2256 required_min_rx=2 * vpp_session.required_min_rx,
2257 desired_min_tx=3 * vpp_session.desired_min_tx,
2258 detect_mult=4 * vpp_session.detect_mult)
2259 self.cli_verify_no_response(
2260 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2261 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2262 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2263 mod_session.desired_min_tx,
2264 mod_session.required_min_rx, mod_session.detect_mult))
2265 verify_bfd_session_config(self, mod_session)
2266 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2267 "peer-addr %s" % (self.pg0.name,
2268 self.pg0.local_ip4, self.pg0.remote_ip4)
2269 self.cli_verify_no_response(cli_del_cmd)
2270 # 2nd del is expected to fail
2271 self.cli_verify_response(
2273 "bfd udp session del: `bfd_udp_del_session' API call"
2274 " failed, rv=-102:No such BFD object")
2275 self.assertFalse(vpp_session.query_vpp_config())
2277 def test_add_mod_del_bfd_udp6_auth(self):
2278 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2279 key = self.factory.create_random_key(
2280 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2281 key.add_vpp_config()
2282 vpp_session = VppBFDUDPSession(
2283 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2284 self.registry.register(vpp_session, self.logger)
2285 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2286 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2287 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2288 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2289 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2290 vpp_session.detect_mult, key.conf_key_id,
2291 vpp_session.bfd_key_id)
2292 self.cli_verify_no_response(cli_add_cmd)
2293 # 2nd add should fail
2294 self.cli_verify_response(
2296 "bfd udp session add: `bfd_add_add_session' API call"
2297 " failed, rv=-101:Duplicate BFD object")
2298 verify_bfd_session_config(self, vpp_session)
2299 mod_session = VppBFDUDPSession(
2300 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2301 bfd_key_id=vpp_session.bfd_key_id,
2302 required_min_rx=2 * vpp_session.required_min_rx,
2303 desired_min_tx=3 * vpp_session.desired_min_tx,
2304 detect_mult=4 * vpp_session.detect_mult)
2305 self.cli_verify_no_response(
2306 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2307 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2308 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2309 mod_session.desired_min_tx,
2310 mod_session.required_min_rx, mod_session.detect_mult))
2311 verify_bfd_session_config(self, mod_session)
2312 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2313 "peer-addr %s" % (self.pg0.name,
2314 self.pg0.local_ip6, self.pg0.remote_ip6)
2315 self.cli_verify_no_response(cli_del_cmd)
2316 # 2nd del is expected to fail
2317 self.cli_verify_response(
2319 "bfd udp session del: `bfd_udp_del_session' API call"
2320 " failed, rv=-102:No such BFD object")
2321 self.assertFalse(vpp_session.query_vpp_config())
2323 def test_auth_on_off(self):
2324 """ turn authentication on and off """
2325 key = self.factory.create_random_key(
2326 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2327 key.add_vpp_config()
2328 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2329 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2331 session.add_vpp_config()
2333 "bfd udp session auth activate interface %s local-addr %s "\
2334 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2335 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2336 key.conf_key_id, auth_session.bfd_key_id)
2337 self.cli_verify_no_response(cli_activate)
2338 verify_bfd_session_config(self, auth_session)
2339 self.cli_verify_no_response(cli_activate)
2340 verify_bfd_session_config(self, auth_session)
2342 "bfd udp session auth deactivate interface %s local-addr %s "\
2344 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2345 self.cli_verify_no_response(cli_deactivate)
2346 verify_bfd_session_config(self, session)
2347 self.cli_verify_no_response(cli_deactivate)
2348 verify_bfd_session_config(self, session)
2350 def test_auth_on_off_delayed(self):
2351 """ turn authentication on and off (delayed) """
2352 key = self.factory.create_random_key(
2353 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2354 key.add_vpp_config()
2355 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2356 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2358 session.add_vpp_config()
2360 "bfd udp session auth activate interface %s local-addr %s "\
2361 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2362 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2363 key.conf_key_id, auth_session.bfd_key_id)
2364 self.cli_verify_no_response(cli_activate)
2365 verify_bfd_session_config(self, auth_session)
2366 self.cli_verify_no_response(cli_activate)
2367 verify_bfd_session_config(self, auth_session)
2369 "bfd udp session auth deactivate interface %s local-addr %s "\
2370 "peer-addr %s delayed yes"\
2371 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2372 self.cli_verify_no_response(cli_deactivate)
2373 verify_bfd_session_config(self, session)
2374 self.cli_verify_no_response(cli_deactivate)
2375 verify_bfd_session_config(self, session)
2377 def test_admin_up_down(self):
2378 """ put session admin-up and admin-down """
2379 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2380 session.add_vpp_config()
2382 "bfd udp session set-flags admin down interface %s local-addr %s "\
2384 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2386 "bfd udp session set-flags admin up interface %s local-addr %s "\
2388 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2389 self.cli_verify_no_response(cli_down)
2390 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2391 self.cli_verify_no_response(cli_up)
2392 verify_bfd_session_config(self, session, state=BFDState.down)
2394 def test_set_del_udp_echo_source(self):
2395 """ set/del udp echo source """
2396 self.create_loopback_interfaces([0])
2397 self.loopback0 = self.lo_interfaces[0]
2398 self.loopback0.admin_up()
2399 self.cli_verify_response("show bfd echo-source",
2400 "UDP echo source is not set.")
2401 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2402 self.cli_verify_no_response(cli_set)
2403 self.cli_verify_response("show bfd echo-source",
2404 "UDP echo source is: %s\n"
2405 "IPv4 address usable as echo source: none\n"
2406 "IPv6 address usable as echo source: none" %
2407 self.loopback0.name)
2408 self.loopback0.config_ip4()
2409 unpacked = unpack("!L", self.loopback0.local_ip4n)
2410 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2411 self.cli_verify_response("show bfd echo-source",
2412 "UDP echo source is: %s\n"
2413 "IPv4 address usable as echo source: %s\n"
2414 "IPv6 address usable as echo source: none" %
2415 (self.loopback0.name, echo_ip4))
2416 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2417 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2418 unpacked[2], unpacked[3] ^ 1))
2419 self.loopback0.config_ip6()
2420 self.cli_verify_response("show bfd echo-source",
2421 "UDP echo source is: %s\n"
2422 "IPv4 address usable as echo source: %s\n"
2423 "IPv6 address usable as echo source: %s" %
2424 (self.loopback0.name, echo_ip4, echo_ip6))
2425 cli_del = "bfd udp echo-source del"
2426 self.cli_verify_no_response(cli_del)
2427 self.cli_verify_response("show bfd echo-source",
2428 "UDP echo source is not set.")
2430 if __name__ == '__main__':
2431 unittest.main(testRunner=VppTestRunner)