4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError
21 from vpp_papi_provider import UnexpectedApiReturnValueError
26 class AuthKeyFactory(object):
27 """Factory class for creating auth keys with unique conf key ID"""
30 self._conf_key_ids = {}
32 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
33 """ create a random key with unique conf key id """
34 conf_key_id = randint(0, 0xFFFFFFFF)
35 while conf_key_id in self._conf_key_ids:
36 conf_key_id = randint(0, 0xFFFFFFFF)
37 self._conf_key_ids[conf_key_id] = 1
38 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
39 return VppBFDAuthKey(test=test, auth_type=auth_type,
40 conf_key_id=conf_key_id, key=key)
43 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
44 class BFDAPITestCase(VppTestCase):
45 """Bidirectional Forwarding Detection (BFD) - API"""
52 super(BFDAPITestCase, cls).setUpClass()
55 cls.create_pg_interfaces(range(2))
56 for i in cls.pg_interfaces:
62 super(BFDAPITestCase, cls).tearDownClass()
66 super(BFDAPITestCase, self).setUp()
67 self.factory = AuthKeyFactory()
69 def test_add_bfd(self):
70 """ create a BFD session """
71 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
72 session.add_vpp_config()
73 self.logger.debug("Session state is %s", session.state)
74 session.remove_vpp_config()
75 session.add_vpp_config()
76 self.logger.debug("Session state is %s", session.state)
77 session.remove_vpp_config()
79 def test_double_add(self):
80 """ create the same BFD session twice (negative case) """
81 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
82 session.add_vpp_config()
84 with self.vapi.expect_negative_api_retval():
85 session.add_vpp_config()
87 session.remove_vpp_config()
89 def test_add_bfd6(self):
90 """ create IPv6 BFD session """
91 session = VppBFDUDPSession(
92 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
93 session.add_vpp_config()
94 self.logger.debug("Session state is %s", session.state)
95 session.remove_vpp_config()
96 session.add_vpp_config()
97 self.logger.debug("Session state is %s", session.state)
98 session.remove_vpp_config()
100 def test_mod_bfd(self):
101 """ modify BFD session parameters """
102 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
103 desired_min_tx=50000,
104 required_min_rx=10000,
106 session.add_vpp_config()
107 s = session.get_bfd_udp_session_dump_entry()
108 self.assert_equal(session.desired_min_tx,
110 "desired min transmit interval")
111 self.assert_equal(session.required_min_rx,
113 "required min receive interval")
114 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
115 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
116 required_min_rx=session.required_min_rx * 2,
117 detect_mult=session.detect_mult * 2)
118 s = session.get_bfd_udp_session_dump_entry()
119 self.assert_equal(session.desired_min_tx,
121 "desired min transmit interval")
122 self.assert_equal(session.required_min_rx,
124 "required min receive interval")
125 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
127 def test_add_sha1_keys(self):
128 """ add SHA1 keys """
130 keys = [self.factory.create_random_key(
131 self) for i in range(0, key_count)]
133 self.assertFalse(key.query_vpp_config())
137 self.assertTrue(key.query_vpp_config())
139 indexes = range(key_count)
144 key.remove_vpp_config()
146 for j in range(key_count):
149 self.assertFalse(key.query_vpp_config())
151 self.assertTrue(key.query_vpp_config())
152 # should be removed now
154 self.assertFalse(key.query_vpp_config())
155 # add back and remove again
159 self.assertTrue(key.query_vpp_config())
161 key.remove_vpp_config()
163 self.assertFalse(key.query_vpp_config())
165 def test_add_bfd_sha1(self):
166 """ create a BFD session (SHA1) """
167 key = self.factory.create_random_key(self)
169 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
171 session.add_vpp_config()
172 self.logger.debug("Session state is %s", session.state)
173 session.remove_vpp_config()
174 session.add_vpp_config()
175 self.logger.debug("Session state is %s", session.state)
176 session.remove_vpp_config()
178 def test_double_add_sha1(self):
179 """ create the same BFD session twice (negative case) (SHA1) """
180 key = self.factory.create_random_key(self)
182 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184 session.add_vpp_config()
185 with self.assertRaises(Exception):
186 session.add_vpp_config()
188 def test_add_auth_nonexistent_key(self):
189 """ create BFD session using non-existent SHA1 (negative case) """
190 session = VppBFDUDPSession(
191 self, self.pg0, self.pg0.remote_ip4,
192 sha1_key=self.factory.create_random_key(self))
193 with self.assertRaises(Exception):
194 session.add_vpp_config()
196 def test_shared_sha1_key(self):
197 """ share single SHA1 key between multiple BFD sessions """
198 key = self.factory.create_random_key(self)
201 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
203 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
204 sha1_key=key, af=AF_INET6),
205 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
207 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
208 sha1_key=key, af=AF_INET6)]
213 e = key.get_bfd_auth_keys_dump_entry()
214 self.assert_equal(e.use_count, len(sessions) - removed,
215 "Use count for shared key")
216 s.remove_vpp_config()
218 e = key.get_bfd_auth_keys_dump_entry()
219 self.assert_equal(e.use_count, len(sessions) - removed,
220 "Use count for shared key")
222 def test_activate_auth(self):
223 """ activate SHA1 authentication """
224 key = self.factory.create_random_key(self)
226 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
227 session.add_vpp_config()
228 session.activate_auth(key)
230 def test_deactivate_auth(self):
231 """ deactivate SHA1 authentication """
232 key = self.factory.create_random_key(self)
234 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
235 session.add_vpp_config()
236 session.activate_auth(key)
237 session.deactivate_auth()
239 def test_change_key(self):
240 """ change SHA1 key """
241 key1 = self.factory.create_random_key(self)
242 key2 = self.factory.create_random_key(self)
243 while key2.conf_key_id == key1.conf_key_id:
244 key2 = self.factory.create_random_key(self)
245 key1.add_vpp_config()
246 key2.add_vpp_config()
247 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
249 session.add_vpp_config()
250 session.activate_auth(key2)
253 class BFDTestSession(object):
254 """ BFD session as seen from test framework side """
256 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
257 bfd_key_id=None, our_seq_number=None):
260 self.sha1_key = sha1_key
261 self.bfd_key_id = bfd_key_id
262 self.interface = interface
263 self.udp_sport = randint(49152, 65535)
264 if our_seq_number is None:
265 self.our_seq_number = randint(0, 40000000)
267 self.our_seq_number = our_seq_number
268 self.vpp_seq_number = None
269 self.my_discriminator = 0
270 self.desired_min_tx = 300000
271 self.required_min_rx = 300000
272 self.required_min_echo_rx = None
273 self.detect_mult = detect_mult
274 self.diag = BFDDiagCode.no_diagnostic
275 self.your_discriminator = None
276 self.state = BFDState.down
277 self.auth_type = BFDAuthType.no_auth
279 def inc_seq_num(self):
280 """ increment sequence number, wrapping if needed """
281 if self.our_seq_number == 0xFFFFFFFF:
282 self.our_seq_number = 0
284 self.our_seq_number += 1
286 def update(self, my_discriminator=None, your_discriminator=None,
287 desired_min_tx=None, required_min_rx=None,
288 required_min_echo_rx=None, detect_mult=None,
289 diag=None, state=None, auth_type=None):
290 """ update BFD parameters associated with session """
291 if my_discriminator is not None:
292 self.my_discriminator = my_discriminator
293 if your_discriminator is not None:
294 self.your_discriminator = your_discriminator
295 if required_min_rx is not None:
296 self.required_min_rx = required_min_rx
297 if required_min_echo_rx is not None:
298 self.required_min_echo_rx = required_min_echo_rx
299 if desired_min_tx is not None:
300 self.desired_min_tx = desired_min_tx
301 if detect_mult is not None:
302 self.detect_mult = detect_mult
305 if state is not None:
307 if auth_type is not None:
308 self.auth_type = auth_type
310 def fill_packet_fields(self, packet):
311 """ set packet fields with known values in packet """
313 if self.my_discriminator:
314 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
315 self.my_discriminator)
316 bfd.my_discriminator = self.my_discriminator
317 if self.your_discriminator:
318 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
319 self.your_discriminator)
320 bfd.your_discriminator = self.your_discriminator
321 if self.required_min_rx:
322 self.test.logger.debug(
323 "BFD: setting packet.required_min_rx_interval=%s",
324 self.required_min_rx)
325 bfd.required_min_rx_interval = self.required_min_rx
326 if self.required_min_echo_rx:
327 self.test.logger.debug(
328 "BFD: setting packet.required_min_echo_rx=%s",
329 self.required_min_echo_rx)
330 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
331 if self.desired_min_tx:
332 self.test.logger.debug(
333 "BFD: setting packet.desired_min_tx_interval=%s",
335 bfd.desired_min_tx_interval = self.desired_min_tx
337 self.test.logger.debug(
338 "BFD: setting packet.detect_mult=%s", self.detect_mult)
339 bfd.detect_mult = self.detect_mult
341 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
344 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
345 bfd.state = self.state
347 # this is used by a negative test-case
348 self.test.logger.debug("BFD: setting packet.auth_type=%s",
350 bfd.auth_type = self.auth_type
352 def create_packet(self):
353 """ create a BFD packet, reflecting the current state of session """
356 bfd.auth_type = self.sha1_key.auth_type
357 bfd.auth_len = BFD.sha1_auth_len
358 bfd.auth_key_id = self.bfd_key_id
359 bfd.auth_seq_num = self.our_seq_number
360 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
363 if self.af == AF_INET6:
364 packet = (Ether(src=self.interface.remote_mac,
365 dst=self.interface.local_mac) /
366 IPv6(src=self.interface.remote_ip6,
367 dst=self.interface.local_ip6,
369 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
372 packet = (Ether(src=self.interface.remote_mac,
373 dst=self.interface.local_mac) /
374 IP(src=self.interface.remote_ip4,
375 dst=self.interface.local_ip4,
377 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
379 self.test.logger.debug("BFD: Creating packet")
380 self.fill_packet_fields(packet)
382 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
383 "\0" * (20 - len(self.sha1_key.key))
384 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
385 hashlib.sha1(hash_material).hexdigest())
386 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
389 def send_packet(self, packet=None, interface=None):
390 """ send packet on interface, creating the packet if needed """
392 packet = self.create_packet()
393 if interface is None:
394 interface = self.test.pg0
395 self.test.logger.debug(ppp("Sending packet:", packet))
396 interface.add_stream(packet)
399 def verify_sha1_auth(self, packet):
400 """ Verify correctness of authentication in BFD layer. """
402 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
403 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
405 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
406 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
407 if self.vpp_seq_number is None:
408 self.vpp_seq_number = bfd.auth_seq_num
409 self.test.logger.debug("Received initial sequence number: %s" %
412 recvd_seq_num = bfd.auth_seq_num
413 self.test.logger.debug("Received followup sequence number: %s" %
415 if self.vpp_seq_number < 0xffffffff:
416 if self.sha1_key.auth_type == \
417 BFDAuthType.meticulous_keyed_sha1:
418 self.test.assert_equal(recvd_seq_num,
419 self.vpp_seq_number + 1,
420 "BFD sequence number")
422 self.test.assert_in_range(recvd_seq_num,
424 self.vpp_seq_number + 1,
425 "BFD sequence number")
427 if self.sha1_key.auth_type == \
428 BFDAuthType.meticulous_keyed_sha1:
429 self.test.assert_equal(recvd_seq_num, 0,
430 "BFD sequence number")
432 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
433 "BFD sequence number not one of "
434 "(%s, 0)" % self.vpp_seq_number)
435 self.vpp_seq_number = recvd_seq_num
436 # last 20 bytes represent the hash - so replace them with the key,
437 # pad the result with zeros and hash the result
438 hash_material = bfd.original[:-20] + self.sha1_key.key + \
439 "\0" * (20 - len(self.sha1_key.key))
440 expected_hash = hashlib.sha1(hash_material).hexdigest()
441 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
442 expected_hash, "Auth key hash")
444 def verify_bfd(self, packet):
445 """ Verify correctness of BFD layer. """
447 self.test.assert_equal(bfd.version, 1, "BFD version")
448 self.test.assert_equal(bfd.your_discriminator,
449 self.my_discriminator,
450 "BFD - your discriminator")
452 self.verify_sha1_auth(packet)
455 def bfd_session_up(test):
456 """ Bring BFD session up """
457 test.logger.info("BFD: Waiting for slow hello")
458 p = wait_for_bfd_packet(test, 2)
460 if hasattr(test, 'vpp_clock_offset'):
461 old_offset = test.vpp_clock_offset
462 test.vpp_clock_offset = time.time() - p.time
463 test.logger.debug("BFD: Calculated vpp clock offset: %s",
464 test.vpp_clock_offset)
466 test.assertAlmostEqual(
467 old_offset, test.vpp_clock_offset, delta=0.5,
468 msg="vpp clock offset not stable (new: %s, old: %s)" %
469 (test.vpp_clock_offset, old_offset))
470 test.logger.info("BFD: Sending Init")
471 test.test_session.update(my_discriminator=randint(0, 40000000),
472 your_discriminator=p[BFD].my_discriminator,
474 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
475 BFDAuthType.meticulous_keyed_sha1:
476 test.test_session.inc_seq_num()
477 test.test_session.send_packet()
478 test.logger.info("BFD: Waiting for event")
479 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
480 verify_event(test, e, expected_state=BFDState.up)
481 test.logger.info("BFD: Session is Up")
482 test.test_session.update(state=BFDState.up)
483 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
484 BFDAuthType.meticulous_keyed_sha1:
485 test.test_session.inc_seq_num()
486 test.test_session.send_packet()
487 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
490 def bfd_session_down(test):
491 """ Bring BFD session down """
492 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
493 test.test_session.update(state=BFDState.down)
494 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
495 BFDAuthType.meticulous_keyed_sha1:
496 test.test_session.inc_seq_num()
497 test.test_session.send_packet()
498 test.logger.info("BFD: Waiting for event")
499 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
500 verify_event(test, e, expected_state=BFDState.down)
501 test.logger.info("BFD: Session is Down")
502 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
505 def verify_bfd_session_config(test, session, state=None):
506 dump = session.get_bfd_udp_session_dump_entry()
507 test.assertIsNotNone(dump)
508 # since dump is not none, we have verified that sw_if_index and addresses
509 # are valid (in get_bfd_udp_session_dump_entry)
511 test.assert_equal(dump.state, state, "session state")
512 test.assert_equal(dump.required_min_rx, session.required_min_rx,
513 "required min rx interval")
514 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
515 "desired min tx interval")
516 test.assert_equal(dump.detect_mult, session.detect_mult,
518 if session.sha1_key is None:
519 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
521 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
522 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
524 test.assert_equal(dump.conf_key_id,
525 session.sha1_key.conf_key_id,
529 def verify_ip(test, packet):
530 """ Verify correctness of IP layer. """
531 if test.vpp_session.af == AF_INET6:
533 local_ip = test.pg0.local_ip6
534 remote_ip = test.pg0.remote_ip6
535 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
538 local_ip = test.pg0.local_ip4
539 remote_ip = test.pg0.remote_ip4
540 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
541 test.assert_equal(ip.src, local_ip, "IP source address")
542 test.assert_equal(ip.dst, remote_ip, "IP destination address")
545 def verify_udp(test, packet):
546 """ Verify correctness of UDP layer. """
548 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
549 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
553 def verify_event(test, event, expected_state):
554 """ Verify correctness of event values. """
556 test.logger.debug("BFD: Event: %s" % repr(e))
557 test.assert_equal(e.sw_if_index,
558 test.vpp_session.interface.sw_if_index,
559 "BFD interface index")
561 if test.vpp_session.af == AF_INET6:
563 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
564 if test.vpp_session.af == AF_INET:
565 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
566 "Local IPv4 address")
567 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
570 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
571 "Local IPv6 address")
572 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
574 test.assert_equal(e.state, expected_state, BFDState)
577 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
578 """ wait for BFD packet and verify its correctness
580 :param timeout: how long to wait
581 :param pcap_time_min: ignore packets with pcap timestamp lower than this
583 :returns: tuple (packet, time spent waiting for packet)
585 test.logger.info("BFD: Waiting for BFD packet")
586 deadline = time.time() + timeout
591 test.assert_in_range(counter, 0, 100, "number of packets ignored")
592 time_left = deadline - time.time()
594 raise CaptureTimeoutError("Packet did not arrive within timeout")
595 p = test.pg0.wait_for_packet(timeout=time_left)
596 test.logger.debug(ppp("BFD: Got packet:", p))
597 if pcap_time_min is not None and p.time < pcap_time_min:
598 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
599 "pcap time min %s):" %
600 (p.time, pcap_time_min), p))
605 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
607 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
610 test.test_session.verify_bfd(p)
614 class BFD4TestCase(VppTestCase):
615 """Bidirectional Forwarding Detection (BFD)"""
618 vpp_clock_offset = None
624 super(BFD4TestCase, cls).setUpClass()
626 cls.create_pg_interfaces([0])
627 cls.create_loopback_interfaces([0])
628 cls.loopback0 = cls.lo_interfaces[0]
629 cls.loopback0.config_ip4()
630 cls.loopback0.admin_up()
632 cls.pg0.configure_ipv4_neighbors()
634 cls.pg0.resolve_arp()
637 super(BFD4TestCase, cls).tearDownClass()
641 super(BFD4TestCase, self).setUp()
642 self.factory = AuthKeyFactory()
643 self.vapi.want_bfd_events()
644 self.pg0.enable_capture()
646 self.vpp_session = VppBFDUDPSession(self, self.pg0,
648 self.vpp_session.add_vpp_config()
649 self.vpp_session.admin_up()
650 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
652 self.vapi.want_bfd_events(enable_disable=0)
656 if not self.vpp_dead:
657 self.vapi.want_bfd_events(enable_disable=0)
658 self.vapi.collect_events() # clear the event queue
659 super(BFD4TestCase, self).tearDown()
661 def test_session_up(self):
662 """ bring BFD session up """
665 def test_session_up_by_ip(self):
666 """ bring BFD session up - first frame looked up by address pair """
667 self.logger.info("BFD: Sending Slow control frame")
668 self.test_session.update(my_discriminator=randint(0, 40000000))
669 self.test_session.send_packet()
670 self.pg0.enable_capture()
671 p = self.pg0.wait_for_packet(1)
672 self.assert_equal(p[BFD].your_discriminator,
673 self.test_session.my_discriminator,
674 "BFD - your discriminator")
675 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
676 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
678 self.logger.info("BFD: Waiting for event")
679 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
680 verify_event(self, e, expected_state=BFDState.init)
681 self.logger.info("BFD: Sending Up")
682 self.test_session.send_packet()
683 self.logger.info("BFD: Waiting for event")
684 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
685 verify_event(self, e, expected_state=BFDState.up)
686 self.logger.info("BFD: Session is Up")
687 self.test_session.update(state=BFDState.up)
688 self.test_session.send_packet()
689 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
691 def test_session_down(self):
692 """ bring BFD session down """
694 bfd_session_down(self)
696 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
697 def test_hold_up(self):
698 """ hold BFD session up """
700 for dummy in range(self.test_session.detect_mult * 2):
701 wait_for_bfd_packet(self)
702 self.test_session.send_packet()
703 self.assert_equal(len(self.vapi.collect_events()), 0,
704 "number of bfd events")
706 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
707 def test_slow_timer(self):
708 """ verify slow periodic control frames while session down """
710 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
711 prev_packet = wait_for_bfd_packet(self, 2)
712 for dummy in range(packet_count):
713 next_packet = wait_for_bfd_packet(self, 2)
714 time_diff = next_packet.time - prev_packet.time
715 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
716 # to work around timing issues
717 self.assert_in_range(
718 time_diff, 0.70, 1.05, "time between slow packets")
719 prev_packet = next_packet
721 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
722 def test_zero_remote_min_rx(self):
723 """ no packets when zero remote required min rx interval """
725 self.test_session.update(required_min_rx=0)
726 self.test_session.send_packet()
727 for dummy in range(self.test_session.detect_mult):
728 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
729 "sleep before transmitting bfd packet")
730 self.test_session.send_packet()
732 p = wait_for_bfd_packet(self, timeout=0)
733 self.logger.error(ppp("Received unexpected packet:", p))
734 except CaptureTimeoutError:
737 len(self.vapi.collect_events()), 0, "number of bfd events")
738 self.test_session.update(required_min_rx=300000)
739 for dummy in range(3):
740 self.test_session.send_packet()
742 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
744 len(self.vapi.collect_events()), 0, "number of bfd events")
746 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
747 def test_conn_down(self):
748 """ verify session goes down after inactivity """
750 detection_time = self.test_session.detect_mult *\
751 self.vpp_session.required_min_rx / USEC_IN_SEC
752 self.sleep(detection_time, "waiting for BFD session time-out")
753 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
754 verify_event(self, e, expected_state=BFDState.down)
756 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
757 def test_large_required_min_rx(self):
758 """ large remote required min rx interval """
760 p = wait_for_bfd_packet(self)
762 self.test_session.update(required_min_rx=interval)
763 self.test_session.send_packet()
764 time_mark = time.time()
766 # busy wait here, trying to collect a packet or event, vpp is not
767 # allowed to send packets and the session will timeout first - so the
768 # Up->Down event must arrive before any packets do
769 while time.time() < time_mark + interval / USEC_IN_SEC:
771 p = wait_for_bfd_packet(self, timeout=0)
772 # if vpp managed to send a packet before we did the session
773 # session update, then that's fine, ignore it
774 if p.time < time_mark - self.vpp_clock_offset:
776 self.logger.error(ppp("Received unexpected packet:", p))
778 except CaptureTimeoutError:
780 events = self.vapi.collect_events()
782 verify_event(self, events[0], BFDState.down)
784 self.assert_equal(count, 0, "number of packets received")
786 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
787 def test_immediate_remote_min_rx_reduction(self):
788 """ immediately honor remote required min rx reduction """
789 self.vpp_session.remove_vpp_config()
790 self.vpp_session = VppBFDUDPSession(
791 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
792 self.pg0.enable_capture()
793 self.vpp_session.add_vpp_config()
794 self.test_session.update(desired_min_tx=1000000,
795 required_min_rx=1000000)
797 reference_packet = wait_for_bfd_packet(self)
798 time_mark = time.time()
800 self.test_session.update(required_min_rx=interval)
801 self.test_session.send_packet()
802 extra_time = time.time() - time_mark
803 p = wait_for_bfd_packet(self)
804 # first packet is allowed to be late by time we spent doing the update
805 # calculated in extra_time
806 self.assert_in_range(p.time - reference_packet.time,
807 .95 * 0.75 * interval / USEC_IN_SEC,
808 1.05 * interval / USEC_IN_SEC + extra_time,
809 "time between BFD packets")
811 for dummy in range(3):
812 p = wait_for_bfd_packet(self)
813 diff = p.time - reference_packet.time
814 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
815 1.05 * interval / USEC_IN_SEC,
816 "time between BFD packets")
819 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
820 def test_modify_req_min_rx_double(self):
821 """ modify session - double required min rx """
823 p = wait_for_bfd_packet(self)
824 self.test_session.update(desired_min_tx=10000,
825 required_min_rx=10000)
826 self.test_session.send_packet()
827 # double required min rx
828 self.vpp_session.modify_parameters(
829 required_min_rx=2 * self.vpp_session.required_min_rx)
830 p = wait_for_bfd_packet(
831 self, pcap_time_min=time.time() - self.vpp_clock_offset)
832 # poll bit needs to be set
833 self.assertIn("P", p.sprintf("%BFD.flags%"),
834 "Poll bit not set in BFD packet")
835 # finish poll sequence with final packet
836 final = self.test_session.create_packet()
837 final[BFD].flags = "F"
838 timeout = self.test_session.detect_mult * \
839 max(self.test_session.desired_min_tx,
840 self.vpp_session.required_min_rx) / USEC_IN_SEC
841 self.test_session.send_packet(final)
842 time_mark = time.time()
843 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
844 verify_event(self, e, expected_state=BFDState.down)
845 time_to_event = time.time() - time_mark
846 self.assert_in_range(time_to_event, .9 * timeout,
847 1.1 * timeout, "session timeout")
849 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
850 def test_modify_req_min_rx_halve(self):
851 """ modify session - halve required min rx """
852 self.vpp_session.modify_parameters(
853 required_min_rx=2 * self.vpp_session.required_min_rx)
855 p = wait_for_bfd_packet(self)
856 self.test_session.update(desired_min_tx=10000,
857 required_min_rx=10000)
858 self.test_session.send_packet()
859 p = wait_for_bfd_packet(
860 self, pcap_time_min=time.time() - self.vpp_clock_offset)
861 # halve required min rx
862 old_required_min_rx = self.vpp_session.required_min_rx
863 self.vpp_session.modify_parameters(
864 required_min_rx=0.5 * self.vpp_session.required_min_rx)
865 # now we wait 0.8*3*old-req-min-rx and the session should still be up
866 self.sleep(0.8 * self.vpp_session.detect_mult *
867 old_required_min_rx / USEC_IN_SEC,
868 "wait before finishing poll sequence")
869 self.assert_equal(len(self.vapi.collect_events()), 0,
870 "number of bfd events")
871 p = wait_for_bfd_packet(self)
872 # poll bit needs to be set
873 self.assertIn("P", p.sprintf("%BFD.flags%"),
874 "Poll bit not set in BFD packet")
875 # finish poll sequence with final packet
876 final = self.test_session.create_packet()
877 final[BFD].flags = "F"
878 self.test_session.send_packet(final)
879 # now the session should time out under new conditions
880 detection_time = self.test_session.detect_mult *\
881 self.vpp_session.required_min_rx / USEC_IN_SEC
883 e = self.vapi.wait_for_event(
884 2 * detection_time, "bfd_udp_session_details")
886 self.assert_in_range(after - before,
887 0.9 * detection_time,
888 1.1 * detection_time,
889 "time before bfd session goes down")
890 verify_event(self, e, expected_state=BFDState.down)
892 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
893 def test_modify_detect_mult(self):
894 """ modify detect multiplier """
896 p = wait_for_bfd_packet(self)
897 self.vpp_session.modify_parameters(detect_mult=1)
898 p = wait_for_bfd_packet(
899 self, pcap_time_min=time.time() - self.vpp_clock_offset)
900 self.assert_equal(self.vpp_session.detect_mult,
903 # poll bit must not be set
904 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
905 "Poll bit not set in BFD packet")
906 self.vpp_session.modify_parameters(detect_mult=10)
907 p = wait_for_bfd_packet(
908 self, pcap_time_min=time.time() - self.vpp_clock_offset)
909 self.assert_equal(self.vpp_session.detect_mult,
912 # poll bit must not be set
913 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
914 "Poll bit not set in BFD packet")
916 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
917 def test_queued_poll(self):
918 """ test poll sequence queueing """
920 p = wait_for_bfd_packet(self)
921 self.vpp_session.modify_parameters(
922 required_min_rx=2 * self.vpp_session.required_min_rx)
923 p = wait_for_bfd_packet(self)
924 poll_sequence_start = time.time()
925 poll_sequence_length_min = 0.5
926 send_final_after = time.time() + poll_sequence_length_min
927 # poll bit needs to be set
928 self.assertIn("P", p.sprintf("%BFD.flags%"),
929 "Poll bit not set in BFD packet")
930 self.assert_equal(p[BFD].required_min_rx_interval,
931 self.vpp_session.required_min_rx,
932 "BFD required min rx interval")
933 self.vpp_session.modify_parameters(
934 required_min_rx=2 * self.vpp_session.required_min_rx)
935 # 2nd poll sequence should be queued now
936 # don't send the reply back yet, wait for some time to emulate
937 # longer round-trip time
939 while time.time() < send_final_after:
940 self.test_session.send_packet()
941 p = wait_for_bfd_packet(self)
942 self.assert_equal(len(self.vapi.collect_events()), 0,
943 "number of bfd events")
944 self.assert_equal(p[BFD].required_min_rx_interval,
945 self.vpp_session.required_min_rx,
946 "BFD required min rx interval")
948 # poll bit must be set
949 self.assertIn("P", p.sprintf("%BFD.flags%"),
950 "Poll bit not set in BFD packet")
951 final = self.test_session.create_packet()
952 final[BFD].flags = "F"
953 self.test_session.send_packet(final)
954 # finish 1st with final
955 poll_sequence_length = time.time() - poll_sequence_start
956 # vpp must wait for some time before starting new poll sequence
957 poll_no_2_started = False
958 for dummy in range(2 * packet_count):
959 p = wait_for_bfd_packet(self)
960 self.assert_equal(len(self.vapi.collect_events()), 0,
961 "number of bfd events")
962 if "P" in p.sprintf("%BFD.flags%"):
963 poll_no_2_started = True
964 if time.time() < poll_sequence_start + poll_sequence_length:
965 raise Exception("VPP started 2nd poll sequence too soon")
966 final = self.test_session.create_packet()
967 final[BFD].flags = "F"
968 self.test_session.send_packet(final)
971 self.test_session.send_packet()
972 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
973 # finish 2nd with final
974 final = self.test_session.create_packet()
975 final[BFD].flags = "F"
976 self.test_session.send_packet(final)
977 p = wait_for_bfd_packet(self)
978 # poll bit must not be set
979 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
980 "Poll bit set in BFD packet")
982 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
983 def test_poll_response(self):
984 """ test correct response to control frame with poll bit set """
986 poll = self.test_session.create_packet()
987 poll[BFD].flags = "P"
988 self.test_session.send_packet(poll)
989 final = wait_for_bfd_packet(
990 self, pcap_time_min=time.time() - self.vpp_clock_offset)
991 self.assertIn("F", final.sprintf("%BFD.flags%"))
993 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
994 def test_no_periodic_if_remote_demand(self):
995 """ no periodic frames outside poll sequence if remote demand set """
997 demand = self.test_session.create_packet()
998 demand[BFD].flags = "D"
999 self.test_session.send_packet(demand)
1000 transmit_time = 0.9 \
1001 * max(self.vpp_session.required_min_rx,
1002 self.test_session.desired_min_tx) \
1005 for dummy in range(self.test_session.detect_mult * 2):
1006 time.sleep(transmit_time)
1007 self.test_session.send_packet(demand)
1009 p = wait_for_bfd_packet(self, timeout=0)
1010 self.logger.error(ppp("Received unexpected packet:", p))
1012 except CaptureTimeoutError:
1014 events = self.vapi.collect_events()
1016 self.logger.error("Received unexpected event: %s", e)
1017 self.assert_equal(count, 0, "number of packets received")
1018 self.assert_equal(len(events), 0, "number of events received")
1020 def test_echo_looped_back(self):
1021 """ echo packets looped back """
1022 # don't need a session in this case..
1023 self.vpp_session.remove_vpp_config()
1024 self.pg0.enable_capture()
1025 echo_packet_count = 10
1026 # random source port low enough to increment a few times..
1027 udp_sport_tx = randint(1, 50000)
1028 udp_sport_rx = udp_sport_tx
1029 echo_packet = (Ether(src=self.pg0.remote_mac,
1030 dst=self.pg0.local_mac) /
1031 IP(src=self.pg0.remote_ip4,
1032 dst=self.pg0.remote_ip4) /
1033 UDP(dport=BFD.udp_dport_echo) /
1034 Raw("this should be looped back"))
1035 for dummy in range(echo_packet_count):
1036 self.sleep(.01, "delay between echo packets")
1037 echo_packet[UDP].sport = udp_sport_tx
1039 self.logger.debug(ppp("Sending packet:", echo_packet))
1040 self.pg0.add_stream(echo_packet)
1042 for dummy in range(echo_packet_count):
1043 p = self.pg0.wait_for_packet(1)
1044 self.logger.debug(ppp("Got packet:", p))
1046 self.assert_equal(self.pg0.remote_mac,
1047 ether.dst, "Destination MAC")
1048 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1050 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1051 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1053 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1054 "UDP destination port")
1055 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1057 # need to compare the hex payload here, otherwise BFD_vpp_echo
1059 self.assertEqual(str(p[UDP].payload),
1060 str(echo_packet[UDP].payload),
1061 "Received packet is not the echo packet sent")
1062 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1063 "ECHO packet identifier for test purposes)")
1065 def test_echo(self):
1066 """ echo function """
1067 bfd_session_up(self)
1068 self.test_session.update(required_min_echo_rx=50000)
1069 self.test_session.send_packet()
1070 detection_time = self.test_session.detect_mult *\
1071 self.vpp_session.required_min_rx / USEC_IN_SEC
1072 # echo shouldn't work without echo source set
1073 for dummy in range(3):
1074 sleep = 0.75 * detection_time
1075 self.sleep(sleep, "delay before sending bfd packet")
1076 self.test_session.send_packet()
1077 p = wait_for_bfd_packet(
1078 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1079 self.assert_equal(p[BFD].required_min_rx_interval,
1080 self.vpp_session.required_min_rx,
1081 "BFD required min rx interval")
1082 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1083 # should be turned on - loopback echo packets
1084 for dummy in range(3):
1085 loop_until = time.time() + 0.75 * detection_time
1086 while time.time() < loop_until:
1087 p = self.pg0.wait_for_packet(1)
1088 self.logger.debug(ppp("Got packet:", p))
1089 if p[UDP].dport == BFD.udp_dport_echo:
1091 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1092 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1093 "BFD ECHO src IP equal to loopback IP")
1094 self.logger.debug(ppp("Looping back packet:", p))
1095 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1096 "ECHO packet destination MAC address")
1097 p[Ether].dst = self.pg0.local_mac
1098 self.pg0.add_stream(p)
1100 elif p.haslayer(BFD):
1101 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1103 if "P" in p.sprintf("%BFD.flags%"):
1104 final = self.test_session.create_packet()
1105 final[BFD].flags = "F"
1106 self.test_session.send_packet(final)
1108 raise Exception(ppp("Received unknown packet:", p))
1110 self.assert_equal(len(self.vapi.collect_events()), 0,
1111 "number of bfd events")
1112 self.test_session.send_packet()
1114 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1115 def test_echo_fail(self):
1116 """ session goes down if echo function fails """
1117 bfd_session_up(self)
1118 self.test_session.update(required_min_echo_rx=50000)
1119 self.test_session.send_packet()
1120 detection_time = self.test_session.detect_mult *\
1121 self.vpp_session.required_min_rx / USEC_IN_SEC
1122 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1123 # echo function should be used now, but we will drop the echo packets
1124 verified_diag = False
1125 for dummy in range(3):
1126 loop_until = time.time() + 0.75 * detection_time
1127 while time.time() < loop_until:
1128 p = self.pg0.wait_for_packet(1)
1129 self.logger.debug(ppp("Got packet:", p))
1130 if p[UDP].dport == BFD.udp_dport_echo:
1133 elif p.haslayer(BFD):
1134 if "P" in p.sprintf("%BFD.flags%"):
1135 self.assertGreaterEqual(
1136 p[BFD].required_min_rx_interval,
1138 final = self.test_session.create_packet()
1139 final[BFD].flags = "F"
1140 self.test_session.send_packet(final)
1141 if p[BFD].state == BFDState.down:
1142 self.assert_equal(p[BFD].diag,
1143 BFDDiagCode.echo_function_failed,
1145 verified_diag = True
1147 raise Exception(ppp("Received unknown packet:", p))
1148 self.test_session.send_packet()
1149 events = self.vapi.collect_events()
1150 self.assert_equal(len(events), 1, "number of bfd events")
1151 self.assert_equal(events[0].state, BFDState.down, BFDState)
1152 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1154 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1155 def test_echo_stop(self):
1156 """ echo function stops if peer sets required min echo rx zero """
1157 bfd_session_up(self)
1158 self.test_session.update(required_min_echo_rx=50000)
1159 self.test_session.send_packet()
1160 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1161 # wait for first echo packet
1163 p = self.pg0.wait_for_packet(1)
1164 self.logger.debug(ppp("Got packet:", p))
1165 if p[UDP].dport == BFD.udp_dport_echo:
1166 self.logger.debug(ppp("Looping back packet:", p))
1167 p[Ether].dst = self.pg0.local_mac
1168 self.pg0.add_stream(p)
1171 elif p.haslayer(BFD):
1175 raise Exception(ppp("Received unknown packet:", p))
1176 self.test_session.update(required_min_echo_rx=0)
1177 self.test_session.send_packet()
1178 # echo packets shouldn't arrive anymore
1179 for dummy in range(5):
1180 wait_for_bfd_packet(
1181 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1182 self.test_session.send_packet()
1183 events = self.vapi.collect_events()
1184 self.assert_equal(len(events), 0, "number of bfd events")
1186 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1187 def test_echo_source_removed(self):
1188 """ echo function stops if echo source is removed """
1189 bfd_session_up(self)
1190 self.test_session.update(required_min_echo_rx=50000)
1191 self.test_session.send_packet()
1192 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1193 # wait for first echo packet
1195 p = self.pg0.wait_for_packet(1)
1196 self.logger.debug(ppp("Got packet:", p))
1197 if p[UDP].dport == BFD.udp_dport_echo:
1198 self.logger.debug(ppp("Looping back packet:", p))
1199 p[Ether].dst = self.pg0.local_mac
1200 self.pg0.add_stream(p)
1203 elif p.haslayer(BFD):
1207 raise Exception(ppp("Received unknown packet:", p))
1208 self.vapi.bfd_udp_del_echo_source()
1209 self.test_session.send_packet()
1210 # echo packets shouldn't arrive anymore
1211 for dummy in range(5):
1212 wait_for_bfd_packet(
1213 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1214 self.test_session.send_packet()
1215 events = self.vapi.collect_events()
1216 self.assert_equal(len(events), 0, "number of bfd events")
1218 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1219 def test_stale_echo(self):
1220 """ stale echo packets don't keep a session up """
1221 bfd_session_up(self)
1222 self.test_session.update(required_min_echo_rx=50000)
1223 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1224 self.test_session.send_packet()
1225 # should be turned on - loopback echo packets
1229 for dummy in range(10 * self.vpp_session.detect_mult):
1230 p = self.pg0.wait_for_packet(1)
1231 if p[UDP].dport == BFD.udp_dport_echo:
1232 if echo_packet is None:
1233 self.logger.debug(ppp("Got first echo packet:", p))
1235 timeout_at = time.time() + self.vpp_session.detect_mult * \
1236 self.test_session.required_min_echo_rx / USEC_IN_SEC
1238 self.logger.debug(ppp("Got followup echo packet:", p))
1239 self.logger.debug(ppp("Looping back first echo packet:", p))
1240 echo_packet[Ether].dst = self.pg0.local_mac
1241 self.pg0.add_stream(echo_packet)
1243 elif p.haslayer(BFD):
1244 self.logger.debug(ppp("Got packet:", p))
1245 if "P" in p.sprintf("%BFD.flags%"):
1246 final = self.test_session.create_packet()
1247 final[BFD].flags = "F"
1248 self.test_session.send_packet(final)
1249 if p[BFD].state == BFDState.down:
1250 self.assertIsNotNone(
1252 "Session went down before first echo packet received")
1254 self.assertGreaterEqual(
1256 "Session timeout at %s, but is expected at %s" %
1258 self.assert_equal(p[BFD].diag,
1259 BFDDiagCode.echo_function_failed,
1261 events = self.vapi.collect_events()
1262 self.assert_equal(len(events), 1, "number of bfd events")
1263 self.assert_equal(events[0].state, BFDState.down, BFDState)
1267 raise Exception(ppp("Received unknown packet:", p))
1268 self.test_session.send_packet()
1269 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1271 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1272 def test_invalid_echo_checksum(self):
1273 """ echo packets with invalid checksum don't keep a session up """
1274 bfd_session_up(self)
1275 self.test_session.update(required_min_echo_rx=50000)
1276 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1277 self.test_session.send_packet()
1278 # should be turned on - loopback echo packets
1281 for dummy in range(10 * self.vpp_session.detect_mult):
1282 p = self.pg0.wait_for_packet(1)
1283 if p[UDP].dport == BFD.udp_dport_echo:
1284 self.logger.debug(ppp("Got echo packet:", p))
1285 if timeout_at is None:
1286 timeout_at = time.time() + self.vpp_session.detect_mult * \
1287 self.test_session.required_min_echo_rx / USEC_IN_SEC
1288 p[BFD_vpp_echo].checksum = getrandbits(64)
1289 p[Ether].dst = self.pg0.local_mac
1290 self.logger.debug(ppp("Looping back modified echo packet:", p))
1291 self.pg0.add_stream(p)
1293 elif p.haslayer(BFD):
1294 self.logger.debug(ppp("Got packet:", p))
1295 if "P" in p.sprintf("%BFD.flags%"):
1296 final = self.test_session.create_packet()
1297 final[BFD].flags = "F"
1298 self.test_session.send_packet(final)
1299 if p[BFD].state == BFDState.down:
1300 self.assertIsNotNone(
1302 "Session went down before first echo packet received")
1304 self.assertGreaterEqual(
1306 "Session timeout at %s, but is expected at %s" %
1308 self.assert_equal(p[BFD].diag,
1309 BFDDiagCode.echo_function_failed,
1311 events = self.vapi.collect_events()
1312 self.assert_equal(len(events), 1, "number of bfd events")
1313 self.assert_equal(events[0].state, BFDState.down, BFDState)
1317 raise Exception(ppp("Received unknown packet:", p))
1318 self.test_session.send_packet()
1319 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1321 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1322 def test_admin_up_down(self):
1323 """ put session admin-up and admin-down """
1324 bfd_session_up(self)
1325 self.vpp_session.admin_down()
1326 self.pg0.enable_capture()
1327 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1328 verify_event(self, e, expected_state=BFDState.admin_down)
1329 for dummy in range(2):
1330 p = wait_for_bfd_packet(self)
1331 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1332 # try to bring session up - shouldn't be possible
1333 self.test_session.update(state=BFDState.init)
1334 self.test_session.send_packet()
1335 for dummy in range(2):
1336 p = wait_for_bfd_packet(self)
1337 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1338 self.vpp_session.admin_up()
1339 self.test_session.update(state=BFDState.down)
1340 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1341 verify_event(self, e, expected_state=BFDState.down)
1342 p = wait_for_bfd_packet(
1343 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1344 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1345 self.test_session.send_packet()
1346 p = wait_for_bfd_packet(
1347 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1348 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1349 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1350 verify_event(self, e, expected_state=BFDState.init)
1351 self.test_session.update(state=BFDState.up)
1352 self.test_session.send_packet()
1353 p = wait_for_bfd_packet(
1354 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1355 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1356 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1357 verify_event(self, e, expected_state=BFDState.up)
1359 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1360 def test_config_change_remote_demand(self):
1361 """ configuration change while peer in demand mode """
1362 bfd_session_up(self)
1363 demand = self.test_session.create_packet()
1364 demand[BFD].flags = "D"
1365 self.test_session.send_packet(demand)
1366 self.vpp_session.modify_parameters(
1367 required_min_rx=2 * self.vpp_session.required_min_rx)
1368 p = wait_for_bfd_packet(
1369 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1370 # poll bit must be set
1371 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1372 # terminate poll sequence
1373 final = self.test_session.create_packet()
1374 final[BFD].flags = "D+F"
1375 self.test_session.send_packet(final)
1376 # vpp should be quiet now again
1377 transmit_time = 0.9 \
1378 * max(self.vpp_session.required_min_rx,
1379 self.test_session.desired_min_tx) \
1382 for dummy in range(self.test_session.detect_mult * 2):
1383 time.sleep(transmit_time)
1384 self.test_session.send_packet(demand)
1386 p = wait_for_bfd_packet(self, timeout=0)
1387 self.logger.error(ppp("Received unexpected packet:", p))
1389 except CaptureTimeoutError:
1391 events = self.vapi.collect_events()
1393 self.logger.error("Received unexpected event: %s", e)
1394 self.assert_equal(count, 0, "number of packets received")
1395 self.assert_equal(len(events), 0, "number of events received")
1398 class BFD6TestCase(VppTestCase):
1399 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1402 vpp_clock_offset = None
1407 def setUpClass(cls):
1408 super(BFD6TestCase, cls).setUpClass()
1410 cls.create_pg_interfaces([0])
1411 cls.pg0.config_ip6()
1412 cls.pg0.configure_ipv6_neighbors()
1414 cls.pg0.resolve_ndp()
1415 cls.create_loopback_interfaces([0])
1416 cls.loopback0 = cls.lo_interfaces[0]
1417 cls.loopback0.config_ip6()
1418 cls.loopback0.admin_up()
1421 super(BFD6TestCase, cls).tearDownClass()
1425 super(BFD6TestCase, self).setUp()
1426 self.factory = AuthKeyFactory()
1427 self.vapi.want_bfd_events()
1428 self.pg0.enable_capture()
1430 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1431 self.pg0.remote_ip6,
1433 self.vpp_session.add_vpp_config()
1434 self.vpp_session.admin_up()
1435 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1436 self.logger.debug(self.vapi.cli("show adj nbr"))
1438 self.vapi.want_bfd_events(enable_disable=0)
1442 if not self.vpp_dead:
1443 self.vapi.want_bfd_events(enable_disable=0)
1444 self.vapi.collect_events() # clear the event queue
1445 super(BFD6TestCase, self).tearDown()
1447 def test_session_up(self):
1448 """ bring BFD session up """
1449 bfd_session_up(self)
1451 def test_session_up_by_ip(self):
1452 """ bring BFD session up - first frame looked up by address pair """
1453 self.logger.info("BFD: Sending Slow control frame")
1454 self.test_session.update(my_discriminator=randint(0, 40000000))
1455 self.test_session.send_packet()
1456 self.pg0.enable_capture()
1457 p = self.pg0.wait_for_packet(1)
1458 self.assert_equal(p[BFD].your_discriminator,
1459 self.test_session.my_discriminator,
1460 "BFD - your discriminator")
1461 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1462 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1464 self.logger.info("BFD: Waiting for event")
1465 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1466 verify_event(self, e, expected_state=BFDState.init)
1467 self.logger.info("BFD: Sending Up")
1468 self.test_session.send_packet()
1469 self.logger.info("BFD: Waiting for event")
1470 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1471 verify_event(self, e, expected_state=BFDState.up)
1472 self.logger.info("BFD: Session is Up")
1473 self.test_session.update(state=BFDState.up)
1474 self.test_session.send_packet()
1475 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1477 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1478 def test_hold_up(self):
1479 """ hold BFD session up """
1480 bfd_session_up(self)
1481 for dummy in range(self.test_session.detect_mult * 2):
1482 wait_for_bfd_packet(self)
1483 self.test_session.send_packet()
1484 self.assert_equal(len(self.vapi.collect_events()), 0,
1485 "number of bfd events")
1486 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1488 def test_echo_looped_back(self):
1489 """ echo packets looped back """
1490 # don't need a session in this case..
1491 self.vpp_session.remove_vpp_config()
1492 self.pg0.enable_capture()
1493 echo_packet_count = 10
1494 # random source port low enough to increment a few times..
1495 udp_sport_tx = randint(1, 50000)
1496 udp_sport_rx = udp_sport_tx
1497 echo_packet = (Ether(src=self.pg0.remote_mac,
1498 dst=self.pg0.local_mac) /
1499 IPv6(src=self.pg0.remote_ip6,
1500 dst=self.pg0.remote_ip6) /
1501 UDP(dport=BFD.udp_dport_echo) /
1502 Raw("this should be looped back"))
1503 for dummy in range(echo_packet_count):
1504 self.sleep(.01, "delay between echo packets")
1505 echo_packet[UDP].sport = udp_sport_tx
1507 self.logger.debug(ppp("Sending packet:", echo_packet))
1508 self.pg0.add_stream(echo_packet)
1510 for dummy in range(echo_packet_count):
1511 p = self.pg0.wait_for_packet(1)
1512 self.logger.debug(ppp("Got packet:", p))
1514 self.assert_equal(self.pg0.remote_mac,
1515 ether.dst, "Destination MAC")
1516 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1518 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1519 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1521 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1522 "UDP destination port")
1523 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1525 # need to compare the hex payload here, otherwise BFD_vpp_echo
1527 self.assertEqual(str(p[UDP].payload),
1528 str(echo_packet[UDP].payload),
1529 "Received packet is not the echo packet sent")
1530 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1531 "ECHO packet identifier for test purposes)")
1532 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1533 "ECHO packet identifier for test purposes)")
1535 def test_echo(self):
1536 """ echo function used """
1537 bfd_session_up(self)
1538 self.test_session.update(required_min_echo_rx=50000)
1539 self.test_session.send_packet()
1540 detection_time = self.test_session.detect_mult *\
1541 self.vpp_session.required_min_rx / USEC_IN_SEC
1542 # echo shouldn't work without echo source set
1543 for dummy in range(3):
1544 sleep = 0.75 * detection_time
1545 self.sleep(sleep, "delay before sending bfd packet")
1546 self.test_session.send_packet()
1547 p = wait_for_bfd_packet(
1548 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1549 self.assert_equal(p[BFD].required_min_rx_interval,
1550 self.vpp_session.required_min_rx,
1551 "BFD required min rx interval")
1552 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1553 # should be turned on - loopback echo packets
1554 for dummy in range(3):
1555 loop_until = time.time() + 0.75 * detection_time
1556 while time.time() < loop_until:
1557 p = self.pg0.wait_for_packet(1)
1558 self.logger.debug(ppp("Got packet:", p))
1559 if p[UDP].dport == BFD.udp_dport_echo:
1561 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1562 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1563 "BFD ECHO src IP equal to loopback IP")
1564 self.logger.debug(ppp("Looping back packet:", p))
1565 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1566 "ECHO packet destination MAC address")
1567 p[Ether].dst = self.pg0.local_mac
1568 self.pg0.add_stream(p)
1570 elif p.haslayer(BFD):
1571 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1573 if "P" in p.sprintf("%BFD.flags%"):
1574 final = self.test_session.create_packet()
1575 final[BFD].flags = "F"
1576 self.test_session.send_packet(final)
1578 raise Exception(ppp("Received unknown packet:", p))
1580 self.assert_equal(len(self.vapi.collect_events()), 0,
1581 "number of bfd events")
1582 self.test_session.send_packet()
1585 class BFDSHA1TestCase(VppTestCase):
1586 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1589 vpp_clock_offset = None
1594 def setUpClass(cls):
1595 super(BFDSHA1TestCase, cls).setUpClass()
1597 cls.create_pg_interfaces([0])
1598 cls.pg0.config_ip4()
1600 cls.pg0.resolve_arp()
1603 super(BFDSHA1TestCase, cls).tearDownClass()
1607 super(BFDSHA1TestCase, self).setUp()
1608 self.factory = AuthKeyFactory()
1609 self.vapi.want_bfd_events()
1610 self.pg0.enable_capture()
1613 if not self.vpp_dead:
1614 self.vapi.want_bfd_events(enable_disable=0)
1615 self.vapi.collect_events() # clear the event queue
1616 super(BFDSHA1TestCase, self).tearDown()
1618 def test_session_up(self):
1619 """ bring BFD session up """
1620 key = self.factory.create_random_key(self)
1621 key.add_vpp_config()
1622 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1623 self.pg0.remote_ip4,
1625 self.vpp_session.add_vpp_config()
1626 self.vpp_session.admin_up()
1627 self.test_session = BFDTestSession(
1628 self, self.pg0, AF_INET, sha1_key=key,
1629 bfd_key_id=self.vpp_session.bfd_key_id)
1630 bfd_session_up(self)
1632 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1633 def test_hold_up(self):
1634 """ hold BFD session up """
1635 key = self.factory.create_random_key(self)
1636 key.add_vpp_config()
1637 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1638 self.pg0.remote_ip4,
1640 self.vpp_session.add_vpp_config()
1641 self.vpp_session.admin_up()
1642 self.test_session = BFDTestSession(
1643 self, self.pg0, AF_INET, sha1_key=key,
1644 bfd_key_id=self.vpp_session.bfd_key_id)
1645 bfd_session_up(self)
1646 for dummy in range(self.test_session.detect_mult * 2):
1647 wait_for_bfd_packet(self)
1648 self.test_session.send_packet()
1649 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1651 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1652 def test_hold_up_meticulous(self):
1653 """ hold BFD session up - meticulous auth """
1654 key = self.factory.create_random_key(
1655 self, BFDAuthType.meticulous_keyed_sha1)
1656 key.add_vpp_config()
1657 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1658 self.pg0.remote_ip4, sha1_key=key)
1659 self.vpp_session.add_vpp_config()
1660 self.vpp_session.admin_up()
1661 # specify sequence number so that it wraps
1662 self.test_session = BFDTestSession(
1663 self, self.pg0, AF_INET, sha1_key=key,
1664 bfd_key_id=self.vpp_session.bfd_key_id,
1665 our_seq_number=0xFFFFFFFF - 4)
1666 bfd_session_up(self)
1667 for dummy in range(30):
1668 wait_for_bfd_packet(self)
1669 self.test_session.inc_seq_num()
1670 self.test_session.send_packet()
1671 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1673 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1674 def test_send_bad_seq_number(self):
1675 """ session is not kept alive by msgs with bad sequence numbers"""
1676 key = self.factory.create_random_key(
1677 self, BFDAuthType.meticulous_keyed_sha1)
1678 key.add_vpp_config()
1679 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1680 self.pg0.remote_ip4, sha1_key=key)
1681 self.vpp_session.add_vpp_config()
1682 self.test_session = BFDTestSession(
1683 self, self.pg0, AF_INET, sha1_key=key,
1684 bfd_key_id=self.vpp_session.bfd_key_id)
1685 bfd_session_up(self)
1686 detection_time = self.test_session.detect_mult *\
1687 self.vpp_session.required_min_rx / USEC_IN_SEC
1688 send_until = time.time() + 2 * detection_time
1689 while time.time() < send_until:
1690 self.test_session.send_packet()
1691 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1692 "time between bfd packets")
1693 e = self.vapi.collect_events()
1694 # session should be down now, because the sequence numbers weren't
1696 self.assert_equal(len(e), 1, "number of bfd events")
1697 verify_event(self, e[0], expected_state=BFDState.down)
1699 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1700 legitimate_test_session,
1702 rogue_bfd_values=None):
1703 """ execute a rogue session interaction scenario
1705 1. create vpp session, add config
1706 2. bring the legitimate session up
1707 3. copy the bfd values from legitimate session to rogue session
1708 4. apply rogue_bfd_values to rogue session
1709 5. set rogue session state to down
1710 6. send message to take the session down from the rogue session
1711 7. assert that the legitimate session is unaffected
1714 self.vpp_session = vpp_bfd_udp_session
1715 self.vpp_session.add_vpp_config()
1716 self.test_session = legitimate_test_session
1717 # bring vpp session up
1718 bfd_session_up(self)
1719 # send packet from rogue session
1720 rogue_test_session.update(
1721 my_discriminator=self.test_session.my_discriminator,
1722 your_discriminator=self.test_session.your_discriminator,
1723 desired_min_tx=self.test_session.desired_min_tx,
1724 required_min_rx=self.test_session.required_min_rx,
1725 detect_mult=self.test_session.detect_mult,
1726 diag=self.test_session.diag,
1727 state=self.test_session.state,
1728 auth_type=self.test_session.auth_type)
1729 if rogue_bfd_values:
1730 rogue_test_session.update(**rogue_bfd_values)
1731 rogue_test_session.update(state=BFDState.down)
1732 rogue_test_session.send_packet()
1733 wait_for_bfd_packet(self)
1734 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1736 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1737 def test_mismatch_auth(self):
1738 """ session is not brought down by unauthenticated msg """
1739 key = self.factory.create_random_key(self)
1740 key.add_vpp_config()
1741 vpp_session = VppBFDUDPSession(
1742 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1743 legitimate_test_session = BFDTestSession(
1744 self, self.pg0, AF_INET, sha1_key=key,
1745 bfd_key_id=vpp_session.bfd_key_id)
1746 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1747 self.execute_rogue_session_scenario(vpp_session,
1748 legitimate_test_session,
1751 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1752 def test_mismatch_bfd_key_id(self):
1753 """ session is not brought down by msg with non-existent key-id """
1754 key = self.factory.create_random_key(self)
1755 key.add_vpp_config()
1756 vpp_session = VppBFDUDPSession(
1757 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1758 # pick a different random bfd key id
1760 while x == vpp_session.bfd_key_id:
1762 legitimate_test_session = BFDTestSession(
1763 self, self.pg0, AF_INET, sha1_key=key,
1764 bfd_key_id=vpp_session.bfd_key_id)
1765 rogue_test_session = BFDTestSession(
1766 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1767 self.execute_rogue_session_scenario(vpp_session,
1768 legitimate_test_session,
1771 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1772 def test_mismatched_auth_type(self):
1773 """ session is not brought down by msg with wrong auth type """
1774 key = self.factory.create_random_key(self)
1775 key.add_vpp_config()
1776 vpp_session = VppBFDUDPSession(
1777 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1778 legitimate_test_session = BFDTestSession(
1779 self, self.pg0, AF_INET, sha1_key=key,
1780 bfd_key_id=vpp_session.bfd_key_id)
1781 rogue_test_session = BFDTestSession(
1782 self, self.pg0, AF_INET, sha1_key=key,
1783 bfd_key_id=vpp_session.bfd_key_id)
1784 self.execute_rogue_session_scenario(
1785 vpp_session, legitimate_test_session, rogue_test_session,
1786 {'auth_type': BFDAuthType.keyed_md5})
1788 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1789 def test_restart(self):
1790 """ simulate remote peer restart and resynchronization """
1791 key = self.factory.create_random_key(
1792 self, BFDAuthType.meticulous_keyed_sha1)
1793 key.add_vpp_config()
1794 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1795 self.pg0.remote_ip4, sha1_key=key)
1796 self.vpp_session.add_vpp_config()
1797 self.test_session = BFDTestSession(
1798 self, self.pg0, AF_INET, sha1_key=key,
1799 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1800 bfd_session_up(self)
1801 # don't send any packets for 2*detection_time
1802 detection_time = self.test_session.detect_mult *\
1803 self.vpp_session.required_min_rx / USEC_IN_SEC
1804 self.sleep(2 * detection_time, "simulating peer restart")
1805 events = self.vapi.collect_events()
1806 self.assert_equal(len(events), 1, "number of bfd events")
1807 verify_event(self, events[0], expected_state=BFDState.down)
1808 self.test_session.update(state=BFDState.down)
1809 # reset sequence number
1810 self.test_session.our_seq_number = 0
1811 self.test_session.vpp_seq_number = None
1812 # now throw away any pending packets
1813 self.pg0.enable_capture()
1814 bfd_session_up(self)
1817 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1818 class BFDAuthOnOffTestCase(VppTestCase):
1819 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1826 def setUpClass(cls):
1827 super(BFDAuthOnOffTestCase, cls).setUpClass()
1829 cls.create_pg_interfaces([0])
1830 cls.pg0.config_ip4()
1832 cls.pg0.resolve_arp()
1835 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1839 super(BFDAuthOnOffTestCase, self).setUp()
1840 self.factory = AuthKeyFactory()
1841 self.vapi.want_bfd_events()
1842 self.pg0.enable_capture()
1845 if not self.vpp_dead:
1846 self.vapi.want_bfd_events(enable_disable=0)
1847 self.vapi.collect_events() # clear the event queue
1848 super(BFDAuthOnOffTestCase, self).tearDown()
1850 def test_auth_on_immediate(self):
1851 """ turn auth on without disturbing session state (immediate) """
1852 key = self.factory.create_random_key(self)
1853 key.add_vpp_config()
1854 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1855 self.pg0.remote_ip4)
1856 self.vpp_session.add_vpp_config()
1857 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1858 bfd_session_up(self)
1859 for dummy in range(self.test_session.detect_mult * 2):
1860 p = wait_for_bfd_packet(self)
1861 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1862 self.test_session.send_packet()
1863 self.vpp_session.activate_auth(key)
1864 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1865 self.test_session.sha1_key = key
1866 for dummy in range(self.test_session.detect_mult * 2):
1867 p = wait_for_bfd_packet(self)
1868 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1869 self.test_session.send_packet()
1870 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1871 self.assert_equal(len(self.vapi.collect_events()), 0,
1872 "number of bfd events")
1874 def test_auth_off_immediate(self):
1875 """ turn auth off without disturbing session state (immediate) """
1876 key = self.factory.create_random_key(self)
1877 key.add_vpp_config()
1878 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1879 self.pg0.remote_ip4, sha1_key=key)
1880 self.vpp_session.add_vpp_config()
1881 self.test_session = BFDTestSession(
1882 self, self.pg0, AF_INET, sha1_key=key,
1883 bfd_key_id=self.vpp_session.bfd_key_id)
1884 bfd_session_up(self)
1885 # self.vapi.want_bfd_events(enable_disable=0)
1886 for dummy in range(self.test_session.detect_mult * 2):
1887 p = wait_for_bfd_packet(self)
1888 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1889 self.test_session.inc_seq_num()
1890 self.test_session.send_packet()
1891 self.vpp_session.deactivate_auth()
1892 self.test_session.bfd_key_id = None
1893 self.test_session.sha1_key = None
1894 for dummy in range(self.test_session.detect_mult * 2):
1895 p = wait_for_bfd_packet(self)
1896 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1897 self.test_session.inc_seq_num()
1898 self.test_session.send_packet()
1899 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1900 self.assert_equal(len(self.vapi.collect_events()), 0,
1901 "number of bfd events")
1903 def test_auth_change_key_immediate(self):
1904 """ change auth key without disturbing session state (immediate) """
1905 key1 = self.factory.create_random_key(self)
1906 key1.add_vpp_config()
1907 key2 = self.factory.create_random_key(self)
1908 key2.add_vpp_config()
1909 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1910 self.pg0.remote_ip4, sha1_key=key1)
1911 self.vpp_session.add_vpp_config()
1912 self.test_session = BFDTestSession(
1913 self, self.pg0, AF_INET, sha1_key=key1,
1914 bfd_key_id=self.vpp_session.bfd_key_id)
1915 bfd_session_up(self)
1916 for dummy in range(self.test_session.detect_mult * 2):
1917 p = wait_for_bfd_packet(self)
1918 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1919 self.test_session.send_packet()
1920 self.vpp_session.activate_auth(key2)
1921 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1922 self.test_session.sha1_key = key2
1923 for dummy in range(self.test_session.detect_mult * 2):
1924 p = wait_for_bfd_packet(self)
1925 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1926 self.test_session.send_packet()
1927 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1928 self.assert_equal(len(self.vapi.collect_events()), 0,
1929 "number of bfd events")
1931 def test_auth_on_delayed(self):
1932 """ turn auth on without disturbing session state (delayed) """
1933 key = self.factory.create_random_key(self)
1934 key.add_vpp_config()
1935 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1936 self.pg0.remote_ip4)
1937 self.vpp_session.add_vpp_config()
1938 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1939 bfd_session_up(self)
1940 for dummy in range(self.test_session.detect_mult * 2):
1941 wait_for_bfd_packet(self)
1942 self.test_session.send_packet()
1943 self.vpp_session.activate_auth(key, delayed=True)
1944 for dummy in range(self.test_session.detect_mult * 2):
1945 p = wait_for_bfd_packet(self)
1946 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1947 self.test_session.send_packet()
1948 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1949 self.test_session.sha1_key = key
1950 self.test_session.send_packet()
1951 for dummy in range(self.test_session.detect_mult * 2):
1952 p = wait_for_bfd_packet(self)
1953 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1954 self.test_session.send_packet()
1955 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1956 self.assert_equal(len(self.vapi.collect_events()), 0,
1957 "number of bfd events")
1959 def test_auth_off_delayed(self):
1960 """ turn auth off without disturbing session state (delayed) """
1961 key = self.factory.create_random_key(self)
1962 key.add_vpp_config()
1963 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1964 self.pg0.remote_ip4, sha1_key=key)
1965 self.vpp_session.add_vpp_config()
1966 self.test_session = BFDTestSession(
1967 self, self.pg0, AF_INET, sha1_key=key,
1968 bfd_key_id=self.vpp_session.bfd_key_id)
1969 bfd_session_up(self)
1970 for dummy in range(self.test_session.detect_mult * 2):
1971 p = wait_for_bfd_packet(self)
1972 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1973 self.test_session.send_packet()
1974 self.vpp_session.deactivate_auth(delayed=True)
1975 for dummy in range(self.test_session.detect_mult * 2):
1976 p = wait_for_bfd_packet(self)
1977 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1978 self.test_session.send_packet()
1979 self.test_session.bfd_key_id = None
1980 self.test_session.sha1_key = None
1981 self.test_session.send_packet()
1982 for dummy in range(self.test_session.detect_mult * 2):
1983 p = wait_for_bfd_packet(self)
1984 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1985 self.test_session.send_packet()
1986 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1987 self.assert_equal(len(self.vapi.collect_events()), 0,
1988 "number of bfd events")
1990 def test_auth_change_key_delayed(self):
1991 """ change auth key without disturbing session state (delayed) """
1992 key1 = self.factory.create_random_key(self)
1993 key1.add_vpp_config()
1994 key2 = self.factory.create_random_key(self)
1995 key2.add_vpp_config()
1996 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1997 self.pg0.remote_ip4, sha1_key=key1)
1998 self.vpp_session.add_vpp_config()
1999 self.vpp_session.admin_up()
2000 self.test_session = BFDTestSession(
2001 self, self.pg0, AF_INET, sha1_key=key1,
2002 bfd_key_id=self.vpp_session.bfd_key_id)
2003 bfd_session_up(self)
2004 for dummy in range(self.test_session.detect_mult * 2):
2005 p = wait_for_bfd_packet(self)
2006 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2007 self.test_session.send_packet()
2008 self.vpp_session.activate_auth(key2, delayed=True)
2009 for dummy in range(self.test_session.detect_mult * 2):
2010 p = wait_for_bfd_packet(self)
2011 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2012 self.test_session.send_packet()
2013 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2014 self.test_session.sha1_key = key2
2015 self.test_session.send_packet()
2016 for dummy in range(self.test_session.detect_mult * 2):
2017 p = wait_for_bfd_packet(self)
2018 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2019 self.test_session.send_packet()
2020 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2021 self.assert_equal(len(self.vapi.collect_events()), 0,
2022 "number of bfd events")
2025 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2026 class BFDCLITestCase(VppTestCase):
2027 """Bidirectional Forwarding Detection (BFD) (CLI) """
2031 def setUpClass(cls):
2032 super(BFDCLITestCase, cls).setUpClass()
2035 cls.create_pg_interfaces((0,))
2036 cls.pg0.config_ip4()
2037 cls.pg0.config_ip6()
2038 cls.pg0.resolve_arp()
2039 cls.pg0.resolve_ndp()
2042 super(BFDCLITestCase, cls).tearDownClass()
2046 super(BFDCLITestCase, self).setUp()
2047 self.factory = AuthKeyFactory()
2048 self.pg0.enable_capture()
2052 self.vapi.want_bfd_events(enable_disable=0)
2053 except UnexpectedApiReturnValueError:
2054 # some tests aren't subscribed, so this is not an issue
2056 self.vapi.collect_events() # clear the event queue
2057 super(BFDCLITestCase, self).tearDown()
2059 def cli_verify_no_response(self, cli):
2060 """ execute a CLI, asserting that the response is empty """
2061 self.assert_equal(self.vapi.cli(cli),
2063 "CLI command response")
2065 def cli_verify_response(self, cli, expected):
2066 """ execute a CLI, asserting that the response matches expectation """
2067 self.assert_equal(self.vapi.cli(cli).strip(),
2069 "CLI command response")
2071 def test_show(self):
2072 """ show commands """
2073 k1 = self.factory.create_random_key(self)
2075 k2 = self.factory.create_random_key(
2076 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2078 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2080 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2083 self.logger.info(self.vapi.ppcli("show bfd keys"))
2084 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2085 self.logger.info(self.vapi.ppcli("show bfd"))
2087 def test_set_del_sha1_key(self):
2088 """ set/delete SHA1 auth key """
2089 k = self.factory.create_random_key(self)
2090 self.registry.register(k, self.logger)
2091 self.cli_verify_no_response(
2092 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2094 "".join("{:02x}".format(ord(c)) for c in k.key)))
2095 self.assertTrue(k.query_vpp_config())
2096 self.vpp_session = VppBFDUDPSession(
2097 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2098 self.vpp_session.add_vpp_config()
2099 self.test_session = \
2100 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2101 bfd_key_id=self.vpp_session.bfd_key_id)
2102 self.vapi.want_bfd_events()
2103 bfd_session_up(self)
2104 bfd_session_down(self)
2105 # try to replace the secret for the key - should fail because the key
2107 k2 = self.factory.create_random_key(self)
2108 self.cli_verify_response(
2109 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2111 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2112 "bfd key set: `bfd_auth_set_key' API call failed, "
2113 "rv=-103:BFD object in use")
2114 # manipulating the session using old secret should still work
2115 bfd_session_up(self)
2116 bfd_session_down(self)
2117 self.vpp_session.remove_vpp_config()
2118 self.cli_verify_no_response(
2119 "bfd key del conf-key-id %s" % k.conf_key_id)
2120 self.assertFalse(k.query_vpp_config())
2122 def test_set_del_meticulous_sha1_key(self):
2123 """ set/delete meticulous SHA1 auth key """
2124 k = self.factory.create_random_key(
2125 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2126 self.registry.register(k, self.logger)
2127 self.cli_verify_no_response(
2128 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2130 "".join("{:02x}".format(ord(c)) for c in k.key)))
2131 self.assertTrue(k.query_vpp_config())
2132 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2133 self.pg0.remote_ip6, af=AF_INET6,
2135 self.vpp_session.add_vpp_config()
2136 self.vpp_session.admin_up()
2137 self.test_session = \
2138 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2139 bfd_key_id=self.vpp_session.bfd_key_id)
2140 self.vapi.want_bfd_events()
2141 bfd_session_up(self)
2142 bfd_session_down(self)
2143 # try to replace the secret for the key - should fail because the key
2145 k2 = self.factory.create_random_key(self)
2146 self.cli_verify_response(
2147 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2149 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2150 "bfd key set: `bfd_auth_set_key' API call failed, "
2151 "rv=-103:BFD object in use")
2152 # manipulating the session using old secret should still work
2153 bfd_session_up(self)
2154 bfd_session_down(self)
2155 self.vpp_session.remove_vpp_config()
2156 self.cli_verify_no_response(
2157 "bfd key del conf-key-id %s" % k.conf_key_id)
2158 self.assertFalse(k.query_vpp_config())
2160 def test_add_mod_del_bfd_udp(self):
2161 """ create/modify/delete IPv4 BFD UDP session """
2162 vpp_session = VppBFDUDPSession(
2163 self, self.pg0, self.pg0.remote_ip4)
2164 self.registry.register(vpp_session, self.logger)
2165 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2166 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2167 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2168 self.pg0.remote_ip4,
2169 vpp_session.desired_min_tx,
2170 vpp_session.required_min_rx,
2171 vpp_session.detect_mult)
2172 self.cli_verify_no_response(cli_add_cmd)
2173 # 2nd add should fail
2174 self.cli_verify_response(
2176 "bfd udp session add: `bfd_add_add_session' API call"
2177 " failed, rv=-101:Duplicate BFD object")
2178 verify_bfd_session_config(self, vpp_session)
2179 mod_session = VppBFDUDPSession(
2180 self, self.pg0, self.pg0.remote_ip4,
2181 required_min_rx=2 * vpp_session.required_min_rx,
2182 desired_min_tx=3 * vpp_session.desired_min_tx,
2183 detect_mult=4 * vpp_session.detect_mult)
2184 self.cli_verify_no_response(
2185 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2186 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2187 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2188 mod_session.desired_min_tx, mod_session.required_min_rx,
2189 mod_session.detect_mult))
2190 verify_bfd_session_config(self, mod_session)
2191 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2192 "peer-addr %s" % (self.pg0.name,
2193 self.pg0.local_ip4, self.pg0.remote_ip4)
2194 self.cli_verify_no_response(cli_del_cmd)
2195 # 2nd del is expected to fail
2196 self.cli_verify_response(
2197 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2198 " failed, rv=-102:No such BFD object")
2199 self.assertFalse(vpp_session.query_vpp_config())
2201 def test_add_mod_del_bfd_udp6(self):
2202 """ create/modify/delete IPv6 BFD UDP session """
2203 vpp_session = VppBFDUDPSession(
2204 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2205 self.registry.register(vpp_session, self.logger)
2206 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2207 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2208 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2209 self.pg0.remote_ip6,
2210 vpp_session.desired_min_tx,
2211 vpp_session.required_min_rx,
2212 vpp_session.detect_mult)
2213 self.cli_verify_no_response(cli_add_cmd)
2214 # 2nd add should fail
2215 self.cli_verify_response(
2217 "bfd udp session add: `bfd_add_add_session' API call"
2218 " failed, rv=-101:Duplicate BFD object")
2219 verify_bfd_session_config(self, vpp_session)
2220 mod_session = VppBFDUDPSession(
2221 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2222 required_min_rx=2 * vpp_session.required_min_rx,
2223 desired_min_tx=3 * vpp_session.desired_min_tx,
2224 detect_mult=4 * vpp_session.detect_mult)
2225 self.cli_verify_no_response(
2226 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2227 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2228 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2229 mod_session.desired_min_tx,
2230 mod_session.required_min_rx, mod_session.detect_mult))
2231 verify_bfd_session_config(self, mod_session)
2232 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2233 "peer-addr %s" % (self.pg0.name,
2234 self.pg0.local_ip6, self.pg0.remote_ip6)
2235 self.cli_verify_no_response(cli_del_cmd)
2236 # 2nd del is expected to fail
2237 self.cli_verify_response(
2239 "bfd udp session del: `bfd_udp_del_session' API call"
2240 " failed, rv=-102:No such BFD object")
2241 self.assertFalse(vpp_session.query_vpp_config())
2243 def test_add_mod_del_bfd_udp_auth(self):
2244 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2245 key = self.factory.create_random_key(self)
2246 key.add_vpp_config()
2247 vpp_session = VppBFDUDPSession(
2248 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2249 self.registry.register(vpp_session, self.logger)
2250 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2251 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2252 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2253 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2254 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2255 vpp_session.detect_mult, key.conf_key_id,
2256 vpp_session.bfd_key_id)
2257 self.cli_verify_no_response(cli_add_cmd)
2258 # 2nd add should fail
2259 self.cli_verify_response(
2261 "bfd udp session add: `bfd_add_add_session' API call"
2262 " failed, rv=-101:Duplicate BFD object")
2263 verify_bfd_session_config(self, vpp_session)
2264 mod_session = VppBFDUDPSession(
2265 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2266 bfd_key_id=vpp_session.bfd_key_id,
2267 required_min_rx=2 * vpp_session.required_min_rx,
2268 desired_min_tx=3 * vpp_session.desired_min_tx,
2269 detect_mult=4 * vpp_session.detect_mult)
2270 self.cli_verify_no_response(
2271 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2272 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2273 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2274 mod_session.desired_min_tx,
2275 mod_session.required_min_rx, mod_session.detect_mult))
2276 verify_bfd_session_config(self, mod_session)
2277 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2278 "peer-addr %s" % (self.pg0.name,
2279 self.pg0.local_ip4, self.pg0.remote_ip4)
2280 self.cli_verify_no_response(cli_del_cmd)
2281 # 2nd del is expected to fail
2282 self.cli_verify_response(
2284 "bfd udp session del: `bfd_udp_del_session' API call"
2285 " failed, rv=-102:No such BFD object")
2286 self.assertFalse(vpp_session.query_vpp_config())
2288 def test_add_mod_del_bfd_udp6_auth(self):
2289 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2290 key = self.factory.create_random_key(
2291 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2292 key.add_vpp_config()
2293 vpp_session = VppBFDUDPSession(
2294 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2295 self.registry.register(vpp_session, self.logger)
2296 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2297 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2298 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2299 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2300 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2301 vpp_session.detect_mult, key.conf_key_id,
2302 vpp_session.bfd_key_id)
2303 self.cli_verify_no_response(cli_add_cmd)
2304 # 2nd add should fail
2305 self.cli_verify_response(
2307 "bfd udp session add: `bfd_add_add_session' API call"
2308 " failed, rv=-101:Duplicate BFD object")
2309 verify_bfd_session_config(self, vpp_session)
2310 mod_session = VppBFDUDPSession(
2311 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2312 bfd_key_id=vpp_session.bfd_key_id,
2313 required_min_rx=2 * vpp_session.required_min_rx,
2314 desired_min_tx=3 * vpp_session.desired_min_tx,
2315 detect_mult=4 * vpp_session.detect_mult)
2316 self.cli_verify_no_response(
2317 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2318 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2319 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2320 mod_session.desired_min_tx,
2321 mod_session.required_min_rx, mod_session.detect_mult))
2322 verify_bfd_session_config(self, mod_session)
2323 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2324 "peer-addr %s" % (self.pg0.name,
2325 self.pg0.local_ip6, self.pg0.remote_ip6)
2326 self.cli_verify_no_response(cli_del_cmd)
2327 # 2nd del is expected to fail
2328 self.cli_verify_response(
2330 "bfd udp session del: `bfd_udp_del_session' API call"
2331 " failed, rv=-102:No such BFD object")
2332 self.assertFalse(vpp_session.query_vpp_config())
2334 def test_auth_on_off(self):
2335 """ turn authentication on and off """
2336 key = self.factory.create_random_key(
2337 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2338 key.add_vpp_config()
2339 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2340 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2342 session.add_vpp_config()
2344 "bfd udp session auth activate interface %s local-addr %s "\
2345 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2346 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2347 key.conf_key_id, auth_session.bfd_key_id)
2348 self.cli_verify_no_response(cli_activate)
2349 verify_bfd_session_config(self, auth_session)
2350 self.cli_verify_no_response(cli_activate)
2351 verify_bfd_session_config(self, auth_session)
2353 "bfd udp session auth deactivate interface %s local-addr %s "\
2355 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2356 self.cli_verify_no_response(cli_deactivate)
2357 verify_bfd_session_config(self, session)
2358 self.cli_verify_no_response(cli_deactivate)
2359 verify_bfd_session_config(self, session)
2361 def test_auth_on_off_delayed(self):
2362 """ turn authentication on and off (delayed) """
2363 key = self.factory.create_random_key(
2364 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2365 key.add_vpp_config()
2366 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2367 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2369 session.add_vpp_config()
2371 "bfd udp session auth activate interface %s local-addr %s "\
2372 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2373 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2374 key.conf_key_id, auth_session.bfd_key_id)
2375 self.cli_verify_no_response(cli_activate)
2376 verify_bfd_session_config(self, auth_session)
2377 self.cli_verify_no_response(cli_activate)
2378 verify_bfd_session_config(self, auth_session)
2380 "bfd udp session auth deactivate interface %s local-addr %s "\
2381 "peer-addr %s delayed yes"\
2382 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2383 self.cli_verify_no_response(cli_deactivate)
2384 verify_bfd_session_config(self, session)
2385 self.cli_verify_no_response(cli_deactivate)
2386 verify_bfd_session_config(self, session)
2388 def test_admin_up_down(self):
2389 """ put session admin-up and admin-down """
2390 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2391 session.add_vpp_config()
2393 "bfd udp session set-flags admin down interface %s local-addr %s "\
2395 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2397 "bfd udp session set-flags admin up interface %s local-addr %s "\
2399 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2400 self.cli_verify_no_response(cli_down)
2401 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2402 self.cli_verify_no_response(cli_up)
2403 verify_bfd_session_config(self, session, state=BFDState.down)
2405 def test_set_del_udp_echo_source(self):
2406 """ set/del udp echo source """
2407 self.create_loopback_interfaces([0])
2408 self.loopback0 = self.lo_interfaces[0]
2409 self.loopback0.admin_up()
2410 self.cli_verify_response("show bfd echo-source",
2411 "UDP echo source is not set.")
2412 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2413 self.cli_verify_no_response(cli_set)
2414 self.cli_verify_response("show bfd echo-source",
2415 "UDP echo source is: %s\n"
2416 "IPv4 address usable as echo source: none\n"
2417 "IPv6 address usable as echo source: none" %
2418 self.loopback0.name)
2419 self.loopback0.config_ip4()
2420 unpacked = unpack("!L", self.loopback0.local_ip4n)
2421 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2422 self.cli_verify_response("show bfd echo-source",
2423 "UDP echo source is: %s\n"
2424 "IPv4 address usable as echo source: %s\n"
2425 "IPv6 address usable as echo source: none" %
2426 (self.loopback0.name, echo_ip4))
2427 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2428 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2429 unpacked[2], unpacked[3] ^ 1))
2430 self.loopback0.config_ip6()
2431 self.cli_verify_response("show bfd echo-source",
2432 "UDP echo source is: %s\n"
2433 "IPv4 address usable as echo source: %s\n"
2434 "IPv6 address usable as echo source: %s" %
2435 (self.loopback0.name, echo_ip4, echo_ip6))
2436 cli_del = "bfd udp echo-source del"
2437 self.cli_verify_no_response(cli_del)
2438 self.cli_verify_response("show bfd echo-source",
2439 "UDP echo source is not set.")
2441 if __name__ == '__main__':
2442 unittest.main(testRunner=VppTestRunner)