4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError
21 from vpp_papi_provider import UnexpectedApiReturnValueError
26 class AuthKeyFactory(object):
27 """Factory class for creating auth keys with unique conf key ID"""
30 self._conf_key_ids = {}
32 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
33 """ create a random key with unique conf key id """
34 conf_key_id = randint(0, 0xFFFFFFFF)
35 while conf_key_id in self._conf_key_ids:
36 conf_key_id = randint(0, 0xFFFFFFFF)
37 self._conf_key_ids[conf_key_id] = 1
38 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
39 return VppBFDAuthKey(test=test, auth_type=auth_type,
40 conf_key_id=conf_key_id, key=key)
43 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
44 class BFDAPITestCase(VppTestCase):
45 """Bidirectional Forwarding Detection (BFD) - API"""
52 super(BFDAPITestCase, cls).setUpClass()
55 cls.create_pg_interfaces(range(2))
56 for i in cls.pg_interfaces:
62 super(BFDAPITestCase, cls).tearDownClass()
66 super(BFDAPITestCase, self).setUp()
67 self.factory = AuthKeyFactory()
69 def test_add_bfd(self):
70 """ create a BFD session """
71 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
72 session.add_vpp_config()
73 self.logger.debug("Session state is %s", session.state)
74 session.remove_vpp_config()
75 session.add_vpp_config()
76 self.logger.debug("Session state is %s", session.state)
77 session.remove_vpp_config()
79 def test_double_add(self):
80 """ create the same BFD session twice (negative case) """
81 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
82 session.add_vpp_config()
84 with self.vapi.expect_negative_api_retval():
85 session.add_vpp_config()
87 session.remove_vpp_config()
89 def test_add_bfd6(self):
90 """ create IPv6 BFD session """
91 session = VppBFDUDPSession(
92 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
93 session.add_vpp_config()
94 self.logger.debug("Session state is %s", session.state)
95 session.remove_vpp_config()
96 session.add_vpp_config()
97 self.logger.debug("Session state is %s", session.state)
98 session.remove_vpp_config()
100 def test_mod_bfd(self):
101 """ modify BFD session parameters """
102 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
103 desired_min_tx=50000,
104 required_min_rx=10000,
106 session.add_vpp_config()
107 s = session.get_bfd_udp_session_dump_entry()
108 self.assert_equal(session.desired_min_tx,
110 "desired min transmit interval")
111 self.assert_equal(session.required_min_rx,
113 "required min receive interval")
114 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
115 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
116 required_min_rx=session.required_min_rx * 2,
117 detect_mult=session.detect_mult * 2)
118 s = session.get_bfd_udp_session_dump_entry()
119 self.assert_equal(session.desired_min_tx,
121 "desired min transmit interval")
122 self.assert_equal(session.required_min_rx,
124 "required min receive interval")
125 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
127 def test_add_sha1_keys(self):
128 """ add SHA1 keys """
130 keys = [self.factory.create_random_key(
131 self) for i in range(0, key_count)]
133 self.assertFalse(key.query_vpp_config())
137 self.assertTrue(key.query_vpp_config())
139 indexes = range(key_count)
144 key.remove_vpp_config()
146 for j in range(key_count):
149 self.assertFalse(key.query_vpp_config())
151 self.assertTrue(key.query_vpp_config())
152 # should be removed now
154 self.assertFalse(key.query_vpp_config())
155 # add back and remove again
159 self.assertTrue(key.query_vpp_config())
161 key.remove_vpp_config()
163 self.assertFalse(key.query_vpp_config())
165 def test_add_bfd_sha1(self):
166 """ create a BFD session (SHA1) """
167 key = self.factory.create_random_key(self)
169 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
171 session.add_vpp_config()
172 self.logger.debug("Session state is %s", session.state)
173 session.remove_vpp_config()
174 session.add_vpp_config()
175 self.logger.debug("Session state is %s", session.state)
176 session.remove_vpp_config()
178 def test_double_add_sha1(self):
179 """ create the same BFD session twice (negative case) (SHA1) """
180 key = self.factory.create_random_key(self)
182 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184 session.add_vpp_config()
185 with self.assertRaises(Exception):
186 session.add_vpp_config()
188 def test_add_auth_nonexistent_key(self):
189 """ create BFD session using non-existent SHA1 (negative case) """
190 session = VppBFDUDPSession(
191 self, self.pg0, self.pg0.remote_ip4,
192 sha1_key=self.factory.create_random_key(self))
193 with self.assertRaises(Exception):
194 session.add_vpp_config()
196 def test_shared_sha1_key(self):
197 """ share single SHA1 key between multiple BFD sessions """
198 key = self.factory.create_random_key(self)
201 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
203 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
204 sha1_key=key, af=AF_INET6),
205 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
207 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
208 sha1_key=key, af=AF_INET6)]
213 e = key.get_bfd_auth_keys_dump_entry()
214 self.assert_equal(e.use_count, len(sessions) - removed,
215 "Use count for shared key")
216 s.remove_vpp_config()
218 e = key.get_bfd_auth_keys_dump_entry()
219 self.assert_equal(e.use_count, len(sessions) - removed,
220 "Use count for shared key")
222 def test_activate_auth(self):
223 """ activate SHA1 authentication """
224 key = self.factory.create_random_key(self)
226 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
227 session.add_vpp_config()
228 session.activate_auth(key)
230 def test_deactivate_auth(self):
231 """ deactivate SHA1 authentication """
232 key = self.factory.create_random_key(self)
234 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
235 session.add_vpp_config()
236 session.activate_auth(key)
237 session.deactivate_auth()
239 def test_change_key(self):
240 """ change SHA1 key """
241 key1 = self.factory.create_random_key(self)
242 key2 = self.factory.create_random_key(self)
243 while key2.conf_key_id == key1.conf_key_id:
244 key2 = self.factory.create_random_key(self)
245 key1.add_vpp_config()
246 key2.add_vpp_config()
247 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
249 session.add_vpp_config()
250 session.activate_auth(key2)
253 class BFDTestSession(object):
254 """ BFD session as seen from test framework side """
256 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
257 bfd_key_id=None, our_seq_number=None):
260 self.sha1_key = sha1_key
261 self.bfd_key_id = bfd_key_id
262 self.interface = interface
263 self.udp_sport = randint(49152, 65535)
264 if our_seq_number is None:
265 self.our_seq_number = randint(0, 40000000)
267 self.our_seq_number = our_seq_number
268 self.vpp_seq_number = None
269 self.my_discriminator = 0
270 self.desired_min_tx = 100000
271 self.required_min_rx = 100000
272 self.required_min_echo_rx = None
273 self.detect_mult = detect_mult
274 self.diag = BFDDiagCode.no_diagnostic
275 self.your_discriminator = None
276 self.state = BFDState.down
277 self.auth_type = BFDAuthType.no_auth
279 def inc_seq_num(self):
280 """ increment sequence number, wrapping if needed """
281 if self.our_seq_number == 0xFFFFFFFF:
282 self.our_seq_number = 0
284 self.our_seq_number += 1
286 def update(self, my_discriminator=None, your_discriminator=None,
287 desired_min_tx=None, required_min_rx=None,
288 required_min_echo_rx=None, detect_mult=None,
289 diag=None, state=None, auth_type=None):
290 """ update BFD parameters associated with session """
291 if my_discriminator is not None:
292 self.my_discriminator = my_discriminator
293 if your_discriminator is not None:
294 self.your_discriminator = your_discriminator
295 if required_min_rx is not None:
296 self.required_min_rx = required_min_rx
297 if required_min_echo_rx is not None:
298 self.required_min_echo_rx = required_min_echo_rx
299 if desired_min_tx is not None:
300 self.desired_min_tx = desired_min_tx
301 if detect_mult is not None:
302 self.detect_mult = detect_mult
305 if state is not None:
307 if auth_type is not None:
308 self.auth_type = auth_type
310 def fill_packet_fields(self, packet):
311 """ set packet fields with known values in packet """
313 if self.my_discriminator:
314 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
315 self.my_discriminator)
316 bfd.my_discriminator = self.my_discriminator
317 if self.your_discriminator:
318 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
319 self.your_discriminator)
320 bfd.your_discriminator = self.your_discriminator
321 if self.required_min_rx:
322 self.test.logger.debug(
323 "BFD: setting packet.required_min_rx_interval=%s",
324 self.required_min_rx)
325 bfd.required_min_rx_interval = self.required_min_rx
326 if self.required_min_echo_rx:
327 self.test.logger.debug(
328 "BFD: setting packet.required_min_echo_rx=%s",
329 self.required_min_echo_rx)
330 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
331 if self.desired_min_tx:
332 self.test.logger.debug(
333 "BFD: setting packet.desired_min_tx_interval=%s",
335 bfd.desired_min_tx_interval = self.desired_min_tx
337 self.test.logger.debug(
338 "BFD: setting packet.detect_mult=%s", self.detect_mult)
339 bfd.detect_mult = self.detect_mult
341 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
344 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
345 bfd.state = self.state
347 # this is used by a negative test-case
348 self.test.logger.debug("BFD: setting packet.auth_type=%s",
350 bfd.auth_type = self.auth_type
352 def create_packet(self):
353 """ create a BFD packet, reflecting the current state of session """
356 bfd.auth_type = self.sha1_key.auth_type
357 bfd.auth_len = BFD.sha1_auth_len
358 bfd.auth_key_id = self.bfd_key_id
359 bfd.auth_seq_num = self.our_seq_number
360 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
363 if self.af == AF_INET6:
364 packet = (Ether(src=self.interface.remote_mac,
365 dst=self.interface.local_mac) /
366 IPv6(src=self.interface.remote_ip6,
367 dst=self.interface.local_ip6,
369 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
372 packet = (Ether(src=self.interface.remote_mac,
373 dst=self.interface.local_mac) /
374 IP(src=self.interface.remote_ip4,
375 dst=self.interface.local_ip4,
377 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
379 self.test.logger.debug("BFD: Creating packet")
380 self.fill_packet_fields(packet)
382 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
383 "\0" * (20 - len(self.sha1_key.key))
384 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
385 hashlib.sha1(hash_material).hexdigest())
386 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
389 def send_packet(self, packet=None, interface=None):
390 """ send packet on interface, creating the packet if needed """
392 packet = self.create_packet()
393 if interface is None:
394 interface = self.test.pg0
395 self.test.logger.debug(ppp("Sending packet:", packet))
396 interface.add_stream(packet)
399 def verify_sha1_auth(self, packet):
400 """ Verify correctness of authentication in BFD layer. """
402 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
403 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
405 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
406 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
407 if self.vpp_seq_number is None:
408 self.vpp_seq_number = bfd.auth_seq_num
409 self.test.logger.debug("Received initial sequence number: %s" %
412 recvd_seq_num = bfd.auth_seq_num
413 self.test.logger.debug("Received followup sequence number: %s" %
415 if self.vpp_seq_number < 0xffffffff:
416 if self.sha1_key.auth_type == \
417 BFDAuthType.meticulous_keyed_sha1:
418 self.test.assert_equal(recvd_seq_num,
419 self.vpp_seq_number + 1,
420 "BFD sequence number")
422 self.test.assert_in_range(recvd_seq_num,
424 self.vpp_seq_number + 1,
425 "BFD sequence number")
427 if self.sha1_key.auth_type == \
428 BFDAuthType.meticulous_keyed_sha1:
429 self.test.assert_equal(recvd_seq_num, 0,
430 "BFD sequence number")
432 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
433 "BFD sequence number not one of "
434 "(%s, 0)" % self.vpp_seq_number)
435 self.vpp_seq_number = recvd_seq_num
436 # last 20 bytes represent the hash - so replace them with the key,
437 # pad the result with zeros and hash the result
438 hash_material = bfd.original[:-20] + self.sha1_key.key + \
439 "\0" * (20 - len(self.sha1_key.key))
440 expected_hash = hashlib.sha1(hash_material).hexdigest()
441 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
442 expected_hash, "Auth key hash")
444 def verify_bfd(self, packet):
445 """ Verify correctness of BFD layer. """
447 self.test.assert_equal(bfd.version, 1, "BFD version")
448 self.test.assert_equal(bfd.your_discriminator,
449 self.my_discriminator,
450 "BFD - your discriminator")
452 self.verify_sha1_auth(packet)
455 def bfd_session_up(test):
456 """ Bring BFD session up """
457 test.logger.info("BFD: Waiting for slow hello")
458 p = wait_for_bfd_packet(test, 2)
460 if hasattr(test, 'vpp_clock_offset'):
461 old_offset = test.vpp_clock_offset
462 test.vpp_clock_offset = time.time() - p.time
463 test.logger.debug("BFD: Calculated vpp clock offset: %s",
464 test.vpp_clock_offset)
466 test.assertAlmostEqual(
467 old_offset, test.vpp_clock_offset, delta=0.5,
468 msg="vpp clock offset not stable (new: %s, old: %s)" %
469 (test.vpp_clock_offset, old_offset))
470 test.logger.info("BFD: Sending Init")
471 test.test_session.update(my_discriminator=randint(0, 40000000),
472 your_discriminator=p[BFD].my_discriminator,
474 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
475 BFDAuthType.meticulous_keyed_sha1:
476 test.test_session.inc_seq_num()
477 test.test_session.send_packet()
478 test.logger.info("BFD: Waiting for event")
479 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
480 verify_event(test, e, expected_state=BFDState.up)
481 test.logger.info("BFD: Session is Up")
482 test.test_session.update(state=BFDState.up)
483 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
484 BFDAuthType.meticulous_keyed_sha1:
485 test.test_session.inc_seq_num()
486 test.test_session.send_packet()
487 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
490 def bfd_session_down(test):
491 """ Bring BFD session down """
492 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
493 test.test_session.update(state=BFDState.down)
494 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
495 BFDAuthType.meticulous_keyed_sha1:
496 test.test_session.inc_seq_num()
497 test.test_session.send_packet()
498 test.logger.info("BFD: Waiting for event")
499 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
500 verify_event(test, e, expected_state=BFDState.down)
501 test.logger.info("BFD: Session is Down")
502 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
505 def verify_bfd_session_config(test, session, state=None):
506 dump = session.get_bfd_udp_session_dump_entry()
507 test.assertIsNotNone(dump)
508 # since dump is not none, we have verified that sw_if_index and addresses
509 # are valid (in get_bfd_udp_session_dump_entry)
511 test.assert_equal(dump.state, state, "session state")
512 test.assert_equal(dump.required_min_rx, session.required_min_rx,
513 "required min rx interval")
514 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
515 "desired min tx interval")
516 test.assert_equal(dump.detect_mult, session.detect_mult,
518 if session.sha1_key is None:
519 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
521 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
522 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
524 test.assert_equal(dump.conf_key_id,
525 session.sha1_key.conf_key_id,
529 def verify_ip(test, packet):
530 """ Verify correctness of IP layer. """
531 if test.vpp_session.af == AF_INET6:
533 local_ip = test.pg0.local_ip6
534 remote_ip = test.pg0.remote_ip6
535 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
538 local_ip = test.pg0.local_ip4
539 remote_ip = test.pg0.remote_ip4
540 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
541 test.assert_equal(ip.src, local_ip, "IP source address")
542 test.assert_equal(ip.dst, remote_ip, "IP destination address")
545 def verify_udp(test, packet):
546 """ Verify correctness of UDP layer. """
548 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
549 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
553 def verify_event(test, event, expected_state):
554 """ Verify correctness of event values. """
556 test.logger.debug("BFD: Event: %s" % repr(e))
557 test.assert_equal(e.sw_if_index,
558 test.vpp_session.interface.sw_if_index,
559 "BFD interface index")
561 if test.vpp_session.af == AF_INET6:
563 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
564 if test.vpp_session.af == AF_INET:
565 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
566 "Local IPv4 address")
567 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
570 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
571 "Local IPv6 address")
572 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
574 test.assert_equal(e.state, expected_state, BFDState)
577 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
578 """ wait for BFD packet and verify its correctness
580 :param timeout: how long to wait
581 :param pcap_time_min: ignore packets with pcap timestamp lower than this
583 :returns: tuple (packet, time spent waiting for packet)
585 test.logger.info("BFD: Waiting for BFD packet")
586 deadline = time.time() + timeout
591 test.assert_in_range(counter, 0, 100, "number of packets ignored")
592 time_left = deadline - time.time()
594 raise CaptureTimeoutError("Packet did not arrive within timeout")
595 p = test.pg0.wait_for_packet(timeout=time_left)
596 test.logger.debug(ppp("BFD: Got packet:", p))
597 if pcap_time_min is not None and p.time < pcap_time_min:
598 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
599 "pcap time min %s):" %
600 (p.time, pcap_time_min), p))
605 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
607 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
610 test.test_session.verify_bfd(p)
614 class BFD4TestCase(VppTestCase):
615 """Bidirectional Forwarding Detection (BFD)"""
618 vpp_clock_offset = None
624 super(BFD4TestCase, cls).setUpClass()
626 cls.create_pg_interfaces([0])
627 cls.create_loopback_interfaces([0])
628 cls.loopback0 = cls.lo_interfaces[0]
629 cls.loopback0.config_ip4()
630 cls.loopback0.admin_up()
632 cls.pg0.configure_ipv4_neighbors()
634 cls.pg0.resolve_arp()
637 super(BFD4TestCase, cls).tearDownClass()
641 super(BFD4TestCase, self).setUp()
642 self.factory = AuthKeyFactory()
643 self.vapi.want_bfd_events()
644 self.pg0.enable_capture()
646 self.vpp_session = VppBFDUDPSession(self, self.pg0,
648 self.vpp_session.add_vpp_config()
649 self.vpp_session.admin_up()
650 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
652 self.vapi.want_bfd_events(enable_disable=0)
656 if not self.vpp_dead:
657 self.vapi.want_bfd_events(enable_disable=0)
658 self.vapi.collect_events() # clear the event queue
659 super(BFD4TestCase, self).tearDown()
661 def test_session_up(self):
662 """ bring BFD session up """
665 def test_session_up_by_ip(self):
666 """ bring BFD session up - first frame looked up by address pair """
667 self.logger.info("BFD: Sending Slow control frame")
668 self.test_session.update(my_discriminator=randint(0, 40000000))
669 self.test_session.send_packet()
670 self.pg0.enable_capture()
671 p = self.pg0.wait_for_packet(1)
672 self.assert_equal(p[BFD].your_discriminator,
673 self.test_session.my_discriminator,
674 "BFD - your discriminator")
675 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
676 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
678 self.logger.info("BFD: Waiting for event")
679 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
680 verify_event(self, e, expected_state=BFDState.init)
681 self.logger.info("BFD: Sending Up")
682 self.test_session.send_packet()
683 self.logger.info("BFD: Waiting for event")
684 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
685 verify_event(self, e, expected_state=BFDState.up)
686 self.logger.info("BFD: Session is Up")
687 self.test_session.update(state=BFDState.up)
688 self.test_session.send_packet()
689 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
691 def test_session_down(self):
692 """ bring BFD session down """
694 bfd_session_down(self)
696 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
697 def test_hold_up(self):
698 """ hold BFD session up """
700 for dummy in range(self.test_session.detect_mult * 2):
701 wait_for_bfd_packet(self)
702 self.test_session.send_packet()
703 self.assert_equal(len(self.vapi.collect_events()), 0,
704 "number of bfd events")
706 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
707 def test_slow_timer(self):
708 """ verify slow periodic control frames while session down """
710 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
711 prev_packet = wait_for_bfd_packet(self, 2)
712 for dummy in range(packet_count):
713 next_packet = wait_for_bfd_packet(self, 2)
714 time_diff = next_packet.time - prev_packet.time
715 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
716 # to work around timing issues
717 self.assert_in_range(
718 time_diff, 0.70, 1.05, "time between slow packets")
719 prev_packet = next_packet
721 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
722 def test_zero_remote_min_rx(self):
723 """ no packets when zero remote required min rx interval """
725 self.test_session.update(required_min_rx=0)
726 self.test_session.send_packet()
727 for dummy in range(self.test_session.detect_mult):
728 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
729 "sleep before transmitting bfd packet")
730 self.test_session.send_packet()
732 p = wait_for_bfd_packet(self, timeout=0)
733 self.logger.error(ppp("Received unexpected packet:", p))
734 except CaptureTimeoutError:
737 len(self.vapi.collect_events()), 0, "number of bfd events")
738 self.test_session.update(required_min_rx=100000)
739 for dummy in range(3):
740 self.test_session.send_packet()
742 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
744 len(self.vapi.collect_events()), 0, "number of bfd events")
746 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
747 def test_conn_down(self):
748 """ verify session goes down after inactivity """
750 detection_time = self.test_session.detect_mult *\
751 self.vpp_session.required_min_rx / USEC_IN_SEC
752 self.sleep(detection_time, "waiting for BFD session time-out")
753 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
754 verify_event(self, e, expected_state=BFDState.down)
756 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
757 def test_large_required_min_rx(self):
758 """ large remote required min rx interval """
760 p = wait_for_bfd_packet(self)
762 self.test_session.update(required_min_rx=interval)
763 self.test_session.send_packet()
764 time_mark = time.time()
766 # busy wait here, trying to collect a packet or event, vpp is not
767 # allowed to send packets and the session will timeout first - so the
768 # Up->Down event must arrive before any packets do
769 while time.time() < time_mark + interval / USEC_IN_SEC:
771 p = wait_for_bfd_packet(self, timeout=0)
772 # if vpp managed to send a packet before we did the session
773 # session update, then that's fine, ignore it
774 if p.time < time_mark - self.vpp_clock_offset:
776 self.logger.error(ppp("Received unexpected packet:", p))
778 except CaptureTimeoutError:
780 events = self.vapi.collect_events()
782 verify_event(self, events[0], BFDState.down)
784 self.assert_equal(count, 0, "number of packets received")
786 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
787 def test_immediate_remote_min_rx_reduction(self):
788 """ immediately honor remote required min rx reduction """
789 self.vpp_session.remove_vpp_config()
790 self.vpp_session = VppBFDUDPSession(
791 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
792 self.pg0.enable_capture()
793 self.vpp_session.add_vpp_config()
794 self.test_session.update(desired_min_tx=1000000,
795 required_min_rx=1000000)
797 reference_packet = wait_for_bfd_packet(self)
798 time_mark = time.time()
800 self.test_session.update(required_min_rx=interval)
801 self.test_session.send_packet()
802 extra_time = time.time() - time_mark
803 p = wait_for_bfd_packet(self)
804 # first packet is allowed to be late by time we spent doing the update
805 # calculated in extra_time
806 self.assert_in_range(p.time - reference_packet.time,
807 .95 * 0.75 * interval / USEC_IN_SEC,
808 1.05 * interval / USEC_IN_SEC + extra_time,
809 "time between BFD packets")
811 for dummy in range(3):
812 p = wait_for_bfd_packet(self)
813 diff = p.time - reference_packet.time
814 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
815 1.05 * interval / USEC_IN_SEC,
816 "time between BFD packets")
819 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
820 def test_modify_req_min_rx_double(self):
821 """ modify session - double required min rx """
823 p = wait_for_bfd_packet(self)
824 self.test_session.update(desired_min_tx=10000,
825 required_min_rx=10000)
826 self.test_session.send_packet()
827 # double required min rx
828 self.vpp_session.modify_parameters(
829 required_min_rx=2 * self.vpp_session.required_min_rx)
830 p = wait_for_bfd_packet(
831 self, pcap_time_min=time.time() - self.vpp_clock_offset)
832 # poll bit needs to be set
833 self.assertIn("P", p.sprintf("%BFD.flags%"),
834 "Poll bit not set in BFD packet")
835 # finish poll sequence with final packet
836 final = self.test_session.create_packet()
837 final[BFD].flags = "F"
838 timeout = self.test_session.detect_mult * \
839 max(self.test_session.desired_min_tx,
840 self.vpp_session.required_min_rx) / USEC_IN_SEC
841 self.test_session.send_packet(final)
842 time_mark = time.time()
843 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
844 verify_event(self, e, expected_state=BFDState.down)
845 time_to_event = time.time() - time_mark
846 self.assert_in_range(time_to_event, .9 * timeout,
847 1.1 * timeout, "session timeout")
849 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
850 def test_modify_req_min_rx_halve(self):
851 """ modify session - halve required min rx """
852 self.vpp_session.modify_parameters(
853 required_min_rx=2 * self.vpp_session.required_min_rx)
855 p = wait_for_bfd_packet(self)
856 self.test_session.update(desired_min_tx=10000,
857 required_min_rx=10000)
858 self.test_session.send_packet()
859 p = wait_for_bfd_packet(
860 self, pcap_time_min=time.time() - self.vpp_clock_offset)
861 # halve required min rx
862 old_required_min_rx = self.vpp_session.required_min_rx
863 self.vpp_session.modify_parameters(
864 required_min_rx=0.5 * self.vpp_session.required_min_rx)
865 # now we wait 0.8*3*old-req-min-rx and the session should still be up
866 self.sleep(0.8 * self.vpp_session.detect_mult *
867 old_required_min_rx / USEC_IN_SEC)
868 self.assert_equal(len(self.vapi.collect_events()), 0,
869 "number of bfd events")
870 p = wait_for_bfd_packet(self)
871 # poll bit needs to be set
872 self.assertIn("P", p.sprintf("%BFD.flags%"),
873 "Poll bit not set in BFD packet")
874 # finish poll sequence with final packet
875 final = self.test_session.create_packet()
876 final[BFD].flags = "F"
877 self.test_session.send_packet(final)
878 # now the session should time out under new conditions
880 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
882 detection_time = self.test_session.detect_mult *\
883 self.vpp_session.required_min_rx / USEC_IN_SEC
884 self.assert_in_range(after - before,
885 0.9 * detection_time,
886 1.1 * detection_time,
887 "time before bfd session goes down")
888 verify_event(self, e, expected_state=BFDState.down)
890 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
891 def test_modify_detect_mult(self):
892 """ modify detect multiplier """
894 p = wait_for_bfd_packet(self)
895 self.vpp_session.modify_parameters(detect_mult=1)
896 p = wait_for_bfd_packet(
897 self, pcap_time_min=time.time() - self.vpp_clock_offset)
898 self.assert_equal(self.vpp_session.detect_mult,
901 # poll bit must not be set
902 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
903 "Poll bit not set in BFD packet")
904 self.vpp_session.modify_parameters(detect_mult=10)
905 p = wait_for_bfd_packet(
906 self, pcap_time_min=time.time() - self.vpp_clock_offset)
907 self.assert_equal(self.vpp_session.detect_mult,
910 # poll bit must not be set
911 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
912 "Poll bit not set in BFD packet")
914 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
915 def test_queued_poll(self):
916 """ test poll sequence queueing """
918 p = wait_for_bfd_packet(self)
919 self.vpp_session.modify_parameters(
920 required_min_rx=2 * self.vpp_session.required_min_rx)
921 p = wait_for_bfd_packet(self)
922 poll_sequence_start = time.time()
923 poll_sequence_length_min = 0.5
924 send_final_after = time.time() + poll_sequence_length_min
925 # poll bit needs to be set
926 self.assertIn("P", p.sprintf("%BFD.flags%"),
927 "Poll bit not set in BFD packet")
928 self.assert_equal(p[BFD].required_min_rx_interval,
929 self.vpp_session.required_min_rx,
930 "BFD required min rx interval")
931 self.vpp_session.modify_parameters(
932 required_min_rx=2 * self.vpp_session.required_min_rx)
933 # 2nd poll sequence should be queued now
934 # don't send the reply back yet, wait for some time to emulate
935 # longer round-trip time
937 while time.time() < send_final_after:
938 self.test_session.send_packet()
939 p = wait_for_bfd_packet(self)
940 self.assert_equal(len(self.vapi.collect_events()), 0,
941 "number of bfd events")
942 self.assert_equal(p[BFD].required_min_rx_interval,
943 self.vpp_session.required_min_rx,
944 "BFD required min rx interval")
946 # poll bit must be set
947 self.assertIn("P", p.sprintf("%BFD.flags%"),
948 "Poll bit not set in BFD packet")
949 final = self.test_session.create_packet()
950 final[BFD].flags = "F"
951 self.test_session.send_packet(final)
952 # finish 1st with final
953 poll_sequence_length = time.time() - poll_sequence_start
954 # vpp must wait for some time before starting new poll sequence
955 poll_no_2_started = False
956 for dummy in range(2 * packet_count):
957 p = wait_for_bfd_packet(self)
958 self.assert_equal(len(self.vapi.collect_events()), 0,
959 "number of bfd events")
960 if "P" in p.sprintf("%BFD.flags%"):
961 poll_no_2_started = True
962 if time.time() < poll_sequence_start + poll_sequence_length:
963 raise Exception("VPP started 2nd poll sequence too soon")
964 final = self.test_session.create_packet()
965 final[BFD].flags = "F"
966 self.test_session.send_packet(final)
969 self.test_session.send_packet()
970 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
971 # finish 2nd with final
972 final = self.test_session.create_packet()
973 final[BFD].flags = "F"
974 self.test_session.send_packet(final)
975 p = wait_for_bfd_packet(self)
976 # poll bit must not be set
977 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
978 "Poll bit set in BFD packet")
980 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
981 def test_poll_response(self):
982 """ test correct response to control frame with poll bit set """
984 poll = self.test_session.create_packet()
985 poll[BFD].flags = "P"
986 self.test_session.send_packet(poll)
987 final = wait_for_bfd_packet(
988 self, pcap_time_min=time.time() - self.vpp_clock_offset)
989 self.assertIn("F", final.sprintf("%BFD.flags%"))
991 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
992 def test_no_periodic_if_remote_demand(self):
993 """ no periodic frames outside poll sequence if remote demand set """
995 demand = self.test_session.create_packet()
996 demand[BFD].flags = "D"
997 self.test_session.send_packet(demand)
998 transmit_time = 0.9 \
999 * max(self.vpp_session.required_min_rx,
1000 self.test_session.desired_min_tx) \
1003 for dummy in range(self.test_session.detect_mult * 2):
1004 time.sleep(transmit_time)
1005 self.test_session.send_packet(demand)
1007 p = wait_for_bfd_packet(self, timeout=0)
1008 self.logger.error(ppp("Received unexpected packet:", p))
1010 except CaptureTimeoutError:
1012 events = self.vapi.collect_events()
1014 self.logger.error("Received unexpected event: %s", e)
1015 self.assert_equal(count, 0, "number of packets received")
1016 self.assert_equal(len(events), 0, "number of events received")
1018 def test_echo_looped_back(self):
1019 """ echo packets looped back """
1020 # don't need a session in this case..
1021 self.vpp_session.remove_vpp_config()
1022 self.pg0.enable_capture()
1023 echo_packet_count = 10
1024 # random source port low enough to increment a few times..
1025 udp_sport_tx = randint(1, 50000)
1026 udp_sport_rx = udp_sport_tx
1027 echo_packet = (Ether(src=self.pg0.remote_mac,
1028 dst=self.pg0.local_mac) /
1029 IP(src=self.pg0.remote_ip4,
1030 dst=self.pg0.remote_ip4) /
1031 UDP(dport=BFD.udp_dport_echo) /
1032 Raw("this should be looped back"))
1033 for dummy in range(echo_packet_count):
1034 self.sleep(.01, "delay between echo packets")
1035 echo_packet[UDP].sport = udp_sport_tx
1037 self.logger.debug(ppp("Sending packet:", echo_packet))
1038 self.pg0.add_stream(echo_packet)
1040 for dummy in range(echo_packet_count):
1041 p = self.pg0.wait_for_packet(1)
1042 self.logger.debug(ppp("Got packet:", p))
1044 self.assert_equal(self.pg0.remote_mac,
1045 ether.dst, "Destination MAC")
1046 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1048 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1049 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1051 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1052 "UDP destination port")
1053 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1055 # need to compare the hex payload here, otherwise BFD_vpp_echo
1057 self.assertEqual(str(p[UDP].payload),
1058 str(echo_packet[UDP].payload),
1059 "Received packet is not the echo packet sent")
1060 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1061 "ECHO packet identifier for test purposes)")
1063 def test_echo(self):
1064 """ echo function """
1065 bfd_session_up(self)
1066 self.test_session.update(required_min_echo_rx=50000)
1067 self.test_session.send_packet()
1068 detection_time = self.test_session.detect_mult *\
1069 self.vpp_session.required_min_rx / USEC_IN_SEC
1070 # echo shouldn't work without echo source set
1071 for dummy in range(3):
1072 sleep = 0.75 * detection_time
1073 self.sleep(sleep, "delay before sending bfd packet")
1074 self.test_session.send_packet()
1075 p = wait_for_bfd_packet(
1076 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1077 self.assert_equal(p[BFD].required_min_rx_interval,
1078 self.vpp_session.required_min_rx,
1079 "BFD required min rx interval")
1080 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1081 # should be turned on - loopback echo packets
1082 for dummy in range(3):
1083 loop_until = time.time() + 0.75 * detection_time
1084 while time.time() < loop_until:
1085 p = self.pg0.wait_for_packet(1)
1086 self.logger.debug(ppp("Got packet:", p))
1087 if p[UDP].dport == BFD.udp_dport_echo:
1089 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1090 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1091 "BFD ECHO src IP equal to loopback IP")
1092 self.logger.debug(ppp("Looping back packet:", p))
1093 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1094 "ECHO packet destination MAC address")
1095 p[Ether].dst = self.pg0.local_mac
1096 self.pg0.add_stream(p)
1098 elif p.haslayer(BFD):
1099 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1101 if "P" in p.sprintf("%BFD.flags%"):
1102 final = self.test_session.create_packet()
1103 final[BFD].flags = "F"
1104 self.test_session.send_packet(final)
1106 raise Exception(ppp("Received unknown packet:", p))
1108 self.assert_equal(len(self.vapi.collect_events()), 0,
1109 "number of bfd events")
1110 self.test_session.send_packet()
1112 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1113 def test_echo_fail(self):
1114 """ session goes down if echo function fails """
1115 bfd_session_up(self)
1116 self.test_session.update(required_min_echo_rx=50000)
1117 self.test_session.send_packet()
1118 detection_time = self.test_session.detect_mult *\
1119 self.vpp_session.required_min_rx / USEC_IN_SEC
1120 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1121 # echo function should be used now, but we will drop the echo packets
1122 verified_diag = False
1123 for dummy in range(3):
1124 loop_until = time.time() + 0.75 * detection_time
1125 while time.time() < loop_until:
1126 p = self.pg0.wait_for_packet(1)
1127 self.logger.debug(ppp("Got packet:", p))
1128 if p[UDP].dport == BFD.udp_dport_echo:
1131 elif p.haslayer(BFD):
1132 if "P" in p.sprintf("%BFD.flags%"):
1133 self.assertGreaterEqual(
1134 p[BFD].required_min_rx_interval,
1136 final = self.test_session.create_packet()
1137 final[BFD].flags = "F"
1138 self.test_session.send_packet(final)
1139 if p[BFD].state == BFDState.down:
1140 self.assert_equal(p[BFD].diag,
1141 BFDDiagCode.echo_function_failed,
1143 verified_diag = True
1145 raise Exception(ppp("Received unknown packet:", p))
1146 self.test_session.send_packet()
1147 events = self.vapi.collect_events()
1148 self.assert_equal(len(events), 1, "number of bfd events")
1149 self.assert_equal(events[0].state, BFDState.down, BFDState)
1150 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1152 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1153 def test_echo_stop(self):
1154 """ echo function stops if peer sets required min echo rx zero """
1155 bfd_session_up(self)
1156 self.test_session.update(required_min_echo_rx=50000)
1157 self.test_session.send_packet()
1158 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1159 # wait for first echo packet
1161 p = self.pg0.wait_for_packet(1)
1162 self.logger.debug(ppp("Got packet:", p))
1163 if p[UDP].dport == BFD.udp_dport_echo:
1164 self.logger.debug(ppp("Looping back packet:", p))
1165 p[Ether].dst = self.pg0.local_mac
1166 self.pg0.add_stream(p)
1169 elif p.haslayer(BFD):
1173 raise Exception(ppp("Received unknown packet:", p))
1174 self.test_session.update(required_min_echo_rx=0)
1175 self.test_session.send_packet()
1176 # echo packets shouldn't arrive anymore
1177 for dummy in range(5):
1178 wait_for_bfd_packet(
1179 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1180 self.test_session.send_packet()
1181 events = self.vapi.collect_events()
1182 self.assert_equal(len(events), 0, "number of bfd events")
1184 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1185 def test_echo_source_removed(self):
1186 """ echo function stops if echo source is removed """
1187 bfd_session_up(self)
1188 self.test_session.update(required_min_echo_rx=50000)
1189 self.test_session.send_packet()
1190 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1191 # wait for first echo packet
1193 p = self.pg0.wait_for_packet(1)
1194 self.logger.debug(ppp("Got packet:", p))
1195 if p[UDP].dport == BFD.udp_dport_echo:
1196 self.logger.debug(ppp("Looping back packet:", p))
1197 p[Ether].dst = self.pg0.local_mac
1198 self.pg0.add_stream(p)
1201 elif p.haslayer(BFD):
1205 raise Exception(ppp("Received unknown packet:", p))
1206 self.vapi.bfd_udp_del_echo_source()
1207 self.test_session.send_packet()
1208 # echo packets shouldn't arrive anymore
1209 for dummy in range(5):
1210 wait_for_bfd_packet(
1211 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1212 self.test_session.send_packet()
1213 events = self.vapi.collect_events()
1214 self.assert_equal(len(events), 0, "number of bfd events")
1216 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1217 def test_stale_echo(self):
1218 """ stale echo packets don't keep a session up """
1219 bfd_session_up(self)
1220 self.test_session.update(required_min_echo_rx=50000)
1221 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1222 self.test_session.send_packet()
1223 # should be turned on - loopback echo packets
1227 for dummy in range(10 * self.vpp_session.detect_mult):
1228 p = self.pg0.wait_for_packet(1)
1229 if p[UDP].dport == BFD.udp_dport_echo:
1230 if echo_packet is None:
1231 self.logger.debug(ppp("Got first echo packet:", p))
1233 timeout_at = time.time() + self.vpp_session.detect_mult * \
1234 self.test_session.required_min_echo_rx / USEC_IN_SEC
1236 self.logger.debug(ppp("Got followup echo packet:", p))
1237 self.logger.debug(ppp("Looping back first echo packet:", p))
1238 echo_packet[Ether].dst = self.pg0.local_mac
1239 self.pg0.add_stream(echo_packet)
1241 elif p.haslayer(BFD):
1242 self.logger.debug(ppp("Got packet:", p))
1243 if "P" in p.sprintf("%BFD.flags%"):
1244 final = self.test_session.create_packet()
1245 final[BFD].flags = "F"
1246 self.test_session.send_packet(final)
1247 if p[BFD].state == BFDState.down:
1248 self.assertIsNotNone(
1250 "Session went down before first echo packet received")
1252 self.assertGreaterEqual(
1254 "Session timeout at %s, but is expected at %s" %
1256 self.assert_equal(p[BFD].diag,
1257 BFDDiagCode.echo_function_failed,
1259 events = self.vapi.collect_events()
1260 self.assert_equal(len(events), 1, "number of bfd events")
1261 self.assert_equal(events[0].state, BFDState.down, BFDState)
1265 raise Exception(ppp("Received unknown packet:", p))
1266 self.test_session.send_packet()
1267 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1269 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1270 def test_invalid_echo_checksum(self):
1271 """ echo packets with invalid checksum don't keep a session up """
1272 bfd_session_up(self)
1273 self.test_session.update(required_min_echo_rx=50000)
1274 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1275 self.test_session.send_packet()
1276 # should be turned on - loopback echo packets
1279 for dummy in range(10 * self.vpp_session.detect_mult):
1280 p = self.pg0.wait_for_packet(1)
1281 if p[UDP].dport == BFD.udp_dport_echo:
1282 self.logger.debug(ppp("Got echo packet:", p))
1283 if timeout_at is None:
1284 timeout_at = time.time() + self.vpp_session.detect_mult * \
1285 self.test_session.required_min_echo_rx / USEC_IN_SEC
1286 p[BFD_vpp_echo].checksum = getrandbits(64)
1287 p[Ether].dst = self.pg0.local_mac
1288 self.logger.debug(ppp("Looping back modified echo packet:", p))
1289 self.pg0.add_stream(p)
1291 elif p.haslayer(BFD):
1292 self.logger.debug(ppp("Got packet:", p))
1293 if "P" in p.sprintf("%BFD.flags%"):
1294 final = self.test_session.create_packet()
1295 final[BFD].flags = "F"
1296 self.test_session.send_packet(final)
1297 if p[BFD].state == BFDState.down:
1298 self.assertIsNotNone(
1300 "Session went down before first echo packet received")
1302 self.assertGreaterEqual(
1304 "Session timeout at %s, but is expected at %s" %
1306 self.assert_equal(p[BFD].diag,
1307 BFDDiagCode.echo_function_failed,
1309 events = self.vapi.collect_events()
1310 self.assert_equal(len(events), 1, "number of bfd events")
1311 self.assert_equal(events[0].state, BFDState.down, BFDState)
1315 raise Exception(ppp("Received unknown packet:", p))
1316 self.test_session.send_packet()
1317 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1319 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1320 def test_admin_up_down(self):
1321 """ put session admin-up and admin-down """
1322 bfd_session_up(self)
1323 self.vpp_session.admin_down()
1324 self.pg0.enable_capture()
1325 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1326 verify_event(self, e, expected_state=BFDState.admin_down)
1327 for dummy in range(2):
1328 p = wait_for_bfd_packet(self)
1329 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1330 # try to bring session up - shouldn't be possible
1331 self.test_session.update(state=BFDState.init)
1332 self.test_session.send_packet()
1333 for dummy in range(2):
1334 p = wait_for_bfd_packet(self)
1335 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1336 self.vpp_session.admin_up()
1337 self.test_session.update(state=BFDState.down)
1338 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1339 verify_event(self, e, expected_state=BFDState.down)
1340 p = wait_for_bfd_packet(
1341 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1342 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1343 self.test_session.send_packet()
1344 p = wait_for_bfd_packet(
1345 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1346 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1347 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1348 verify_event(self, e, expected_state=BFDState.init)
1349 self.test_session.update(state=BFDState.up)
1350 self.test_session.send_packet()
1351 p = wait_for_bfd_packet(
1352 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1353 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1354 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1355 verify_event(self, e, expected_state=BFDState.up)
1357 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1358 def test_config_change_remote_demand(self):
1359 """ configuration change while peer in demand mode """
1360 bfd_session_up(self)
1361 demand = self.test_session.create_packet()
1362 demand[BFD].flags = "D"
1363 self.test_session.send_packet(demand)
1364 self.vpp_session.modify_parameters(
1365 required_min_rx=2 * self.vpp_session.required_min_rx)
1366 p = wait_for_bfd_packet(
1367 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1368 # poll bit must be set
1369 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1370 # terminate poll sequence
1371 final = self.test_session.create_packet()
1372 final[BFD].flags = "D+F"
1373 self.test_session.send_packet(final)
1374 # vpp should be quiet now again
1375 transmit_time = 0.9 \
1376 * max(self.vpp_session.required_min_rx,
1377 self.test_session.desired_min_tx) \
1380 for dummy in range(self.test_session.detect_mult * 2):
1381 time.sleep(transmit_time)
1382 self.test_session.send_packet(demand)
1384 p = wait_for_bfd_packet(self, timeout=0)
1385 self.logger.error(ppp("Received unexpected packet:", p))
1387 except CaptureTimeoutError:
1389 events = self.vapi.collect_events()
1391 self.logger.error("Received unexpected event: %s", e)
1392 self.assert_equal(count, 0, "number of packets received")
1393 self.assert_equal(len(events), 0, "number of events received")
1396 class BFD6TestCase(VppTestCase):
1397 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1400 vpp_clock_offset = None
1405 def setUpClass(cls):
1406 super(BFD6TestCase, cls).setUpClass()
1408 cls.create_pg_interfaces([0])
1409 cls.pg0.config_ip6()
1410 cls.pg0.configure_ipv6_neighbors()
1412 cls.pg0.resolve_ndp()
1413 cls.create_loopback_interfaces([0])
1414 cls.loopback0 = cls.lo_interfaces[0]
1415 cls.loopback0.config_ip6()
1416 cls.loopback0.admin_up()
1419 super(BFD6TestCase, cls).tearDownClass()
1423 super(BFD6TestCase, self).setUp()
1424 self.factory = AuthKeyFactory()
1425 self.vapi.want_bfd_events()
1426 self.pg0.enable_capture()
1428 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1429 self.pg0.remote_ip6,
1431 self.vpp_session.add_vpp_config()
1432 self.vpp_session.admin_up()
1433 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1434 self.logger.debug(self.vapi.cli("show adj nbr"))
1436 self.vapi.want_bfd_events(enable_disable=0)
1440 if not self.vpp_dead:
1441 self.vapi.want_bfd_events(enable_disable=0)
1442 self.vapi.collect_events() # clear the event queue
1443 super(BFD6TestCase, self).tearDown()
1445 def test_session_up(self):
1446 """ bring BFD session up """
1447 bfd_session_up(self)
1449 def test_session_up_by_ip(self):
1450 """ bring BFD session up - first frame looked up by address pair """
1451 self.logger.info("BFD: Sending Slow control frame")
1452 self.test_session.update(my_discriminator=randint(0, 40000000))
1453 self.test_session.send_packet()
1454 self.pg0.enable_capture()
1455 p = self.pg0.wait_for_packet(1)
1456 self.assert_equal(p[BFD].your_discriminator,
1457 self.test_session.my_discriminator,
1458 "BFD - your discriminator")
1459 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1460 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1462 self.logger.info("BFD: Waiting for event")
1463 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1464 verify_event(self, e, expected_state=BFDState.init)
1465 self.logger.info("BFD: Sending Up")
1466 self.test_session.send_packet()
1467 self.logger.info("BFD: Waiting for event")
1468 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1469 verify_event(self, e, expected_state=BFDState.up)
1470 self.logger.info("BFD: Session is Up")
1471 self.test_session.update(state=BFDState.up)
1472 self.test_session.send_packet()
1473 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1475 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1476 def test_hold_up(self):
1477 """ hold BFD session up """
1478 bfd_session_up(self)
1479 for dummy in range(self.test_session.detect_mult * 2):
1480 wait_for_bfd_packet(self)
1481 self.test_session.send_packet()
1482 self.assert_equal(len(self.vapi.collect_events()), 0,
1483 "number of bfd events")
1484 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1486 def test_echo_looped_back(self):
1487 """ echo packets looped back """
1488 # don't need a session in this case..
1489 self.vpp_session.remove_vpp_config()
1490 self.pg0.enable_capture()
1491 echo_packet_count = 10
1492 # random source port low enough to increment a few times..
1493 udp_sport_tx = randint(1, 50000)
1494 udp_sport_rx = udp_sport_tx
1495 echo_packet = (Ether(src=self.pg0.remote_mac,
1496 dst=self.pg0.local_mac) /
1497 IPv6(src=self.pg0.remote_ip6,
1498 dst=self.pg0.remote_ip6) /
1499 UDP(dport=BFD.udp_dport_echo) /
1500 Raw("this should be looped back"))
1501 for dummy in range(echo_packet_count):
1502 self.sleep(.01, "delay between echo packets")
1503 echo_packet[UDP].sport = udp_sport_tx
1505 self.logger.debug(ppp("Sending packet:", echo_packet))
1506 self.pg0.add_stream(echo_packet)
1508 for dummy in range(echo_packet_count):
1509 p = self.pg0.wait_for_packet(1)
1510 self.logger.debug(ppp("Got packet:", p))
1512 self.assert_equal(self.pg0.remote_mac,
1513 ether.dst, "Destination MAC")
1514 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1516 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1517 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1519 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1520 "UDP destination port")
1521 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1523 # need to compare the hex payload here, otherwise BFD_vpp_echo
1525 self.assertEqual(str(p[UDP].payload),
1526 str(echo_packet[UDP].payload),
1527 "Received packet is not the echo packet sent")
1528 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1529 "ECHO packet identifier for test purposes)")
1530 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1531 "ECHO packet identifier for test purposes)")
1533 def test_echo(self):
1534 """ echo function used """
1535 bfd_session_up(self)
1536 self.test_session.update(required_min_echo_rx=50000)
1537 self.test_session.send_packet()
1538 detection_time = self.test_session.detect_mult *\
1539 self.vpp_session.required_min_rx / USEC_IN_SEC
1540 # echo shouldn't work without echo source set
1541 for dummy in range(3):
1542 sleep = 0.75 * detection_time
1543 self.sleep(sleep, "delay before sending bfd packet")
1544 self.test_session.send_packet()
1545 p = wait_for_bfd_packet(
1546 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1547 self.assert_equal(p[BFD].required_min_rx_interval,
1548 self.vpp_session.required_min_rx,
1549 "BFD required min rx interval")
1550 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1551 # should be turned on - loopback echo packets
1552 for dummy in range(3):
1553 loop_until = time.time() + 0.75 * detection_time
1554 while time.time() < loop_until:
1555 p = self.pg0.wait_for_packet(1)
1556 self.logger.debug(ppp("Got packet:", p))
1557 if p[UDP].dport == BFD.udp_dport_echo:
1559 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1560 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1561 "BFD ECHO src IP equal to loopback IP")
1562 self.logger.debug(ppp("Looping back packet:", p))
1563 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1564 "ECHO packet destination MAC address")
1565 p[Ether].dst = self.pg0.local_mac
1566 self.pg0.add_stream(p)
1568 elif p.haslayer(BFD):
1569 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1571 if "P" in p.sprintf("%BFD.flags%"):
1572 final = self.test_session.create_packet()
1573 final[BFD].flags = "F"
1574 self.test_session.send_packet(final)
1576 raise Exception(ppp("Received unknown packet:", p))
1578 self.assert_equal(len(self.vapi.collect_events()), 0,
1579 "number of bfd events")
1580 self.test_session.send_packet()
1583 class BFDSHA1TestCase(VppTestCase):
1584 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1587 vpp_clock_offset = None
1592 def setUpClass(cls):
1593 super(BFDSHA1TestCase, cls).setUpClass()
1595 cls.create_pg_interfaces([0])
1596 cls.pg0.config_ip4()
1598 cls.pg0.resolve_arp()
1601 super(BFDSHA1TestCase, cls).tearDownClass()
1605 super(BFDSHA1TestCase, self).setUp()
1606 self.factory = AuthKeyFactory()
1607 self.vapi.want_bfd_events()
1608 self.pg0.enable_capture()
1611 if not self.vpp_dead:
1612 self.vapi.want_bfd_events(enable_disable=0)
1613 self.vapi.collect_events() # clear the event queue
1614 super(BFDSHA1TestCase, self).tearDown()
1616 def test_session_up(self):
1617 """ bring BFD session up """
1618 key = self.factory.create_random_key(self)
1619 key.add_vpp_config()
1620 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1621 self.pg0.remote_ip4,
1623 self.vpp_session.add_vpp_config()
1624 self.vpp_session.admin_up()
1625 self.test_session = BFDTestSession(
1626 self, self.pg0, AF_INET, sha1_key=key,
1627 bfd_key_id=self.vpp_session.bfd_key_id)
1628 bfd_session_up(self)
1630 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1631 def test_hold_up(self):
1632 """ hold BFD session up """
1633 key = self.factory.create_random_key(self)
1634 key.add_vpp_config()
1635 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1636 self.pg0.remote_ip4,
1638 self.vpp_session.add_vpp_config()
1639 self.vpp_session.admin_up()
1640 self.test_session = BFDTestSession(
1641 self, self.pg0, AF_INET, sha1_key=key,
1642 bfd_key_id=self.vpp_session.bfd_key_id)
1643 bfd_session_up(self)
1644 for dummy in range(self.test_session.detect_mult * 2):
1645 wait_for_bfd_packet(self)
1646 self.test_session.send_packet()
1647 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1649 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1650 def test_hold_up_meticulous(self):
1651 """ hold BFD session up - meticulous auth """
1652 key = self.factory.create_random_key(
1653 self, BFDAuthType.meticulous_keyed_sha1)
1654 key.add_vpp_config()
1655 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1656 self.pg0.remote_ip4, sha1_key=key)
1657 self.vpp_session.add_vpp_config()
1658 self.vpp_session.admin_up()
1659 # specify sequence number so that it wraps
1660 self.test_session = BFDTestSession(
1661 self, self.pg0, AF_INET, sha1_key=key,
1662 bfd_key_id=self.vpp_session.bfd_key_id,
1663 our_seq_number=0xFFFFFFFF - 4)
1664 bfd_session_up(self)
1665 for dummy in range(30):
1666 wait_for_bfd_packet(self)
1667 self.test_session.inc_seq_num()
1668 self.test_session.send_packet()
1669 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1671 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1672 def test_send_bad_seq_number(self):
1673 """ session is not kept alive by msgs with bad sequence numbers"""
1674 key = self.factory.create_random_key(
1675 self, BFDAuthType.meticulous_keyed_sha1)
1676 key.add_vpp_config()
1677 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1678 self.pg0.remote_ip4, sha1_key=key)
1679 self.vpp_session.add_vpp_config()
1680 self.test_session = BFDTestSession(
1681 self, self.pg0, AF_INET, sha1_key=key,
1682 bfd_key_id=self.vpp_session.bfd_key_id)
1683 bfd_session_up(self)
1684 detection_time = self.test_session.detect_mult *\
1685 self.vpp_session.required_min_rx / USEC_IN_SEC
1686 send_until = time.time() + 2 * detection_time
1687 while time.time() < send_until:
1688 self.test_session.send_packet()
1689 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1690 "time between bfd packets")
1691 e = self.vapi.collect_events()
1692 # session should be down now, because the sequence numbers weren't
1694 self.assert_equal(len(e), 1, "number of bfd events")
1695 verify_event(self, e[0], expected_state=BFDState.down)
1697 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1698 legitimate_test_session,
1700 rogue_bfd_values=None):
1701 """ execute a rogue session interaction scenario
1703 1. create vpp session, add config
1704 2. bring the legitimate session up
1705 3. copy the bfd values from legitimate session to rogue session
1706 4. apply rogue_bfd_values to rogue session
1707 5. set rogue session state to down
1708 6. send message to take the session down from the rogue session
1709 7. assert that the legitimate session is unaffected
1712 self.vpp_session = vpp_bfd_udp_session
1713 self.vpp_session.add_vpp_config()
1714 self.test_session = legitimate_test_session
1715 # bring vpp session up
1716 bfd_session_up(self)
1717 # send packet from rogue session
1718 rogue_test_session.update(
1719 my_discriminator=self.test_session.my_discriminator,
1720 your_discriminator=self.test_session.your_discriminator,
1721 desired_min_tx=self.test_session.desired_min_tx,
1722 required_min_rx=self.test_session.required_min_rx,
1723 detect_mult=self.test_session.detect_mult,
1724 diag=self.test_session.diag,
1725 state=self.test_session.state,
1726 auth_type=self.test_session.auth_type)
1727 if rogue_bfd_values:
1728 rogue_test_session.update(**rogue_bfd_values)
1729 rogue_test_session.update(state=BFDState.down)
1730 rogue_test_session.send_packet()
1731 wait_for_bfd_packet(self)
1732 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1734 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1735 def test_mismatch_auth(self):
1736 """ session is not brought down by unauthenticated msg """
1737 key = self.factory.create_random_key(self)
1738 key.add_vpp_config()
1739 vpp_session = VppBFDUDPSession(
1740 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1741 legitimate_test_session = BFDTestSession(
1742 self, self.pg0, AF_INET, sha1_key=key,
1743 bfd_key_id=vpp_session.bfd_key_id)
1744 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1745 self.execute_rogue_session_scenario(vpp_session,
1746 legitimate_test_session,
1749 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1750 def test_mismatch_bfd_key_id(self):
1751 """ session is not brought down by msg with non-existent key-id """
1752 key = self.factory.create_random_key(self)
1753 key.add_vpp_config()
1754 vpp_session = VppBFDUDPSession(
1755 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1756 # pick a different random bfd key id
1758 while x == vpp_session.bfd_key_id:
1760 legitimate_test_session = BFDTestSession(
1761 self, self.pg0, AF_INET, sha1_key=key,
1762 bfd_key_id=vpp_session.bfd_key_id)
1763 rogue_test_session = BFDTestSession(
1764 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1765 self.execute_rogue_session_scenario(vpp_session,
1766 legitimate_test_session,
1769 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1770 def test_mismatched_auth_type(self):
1771 """ session is not brought down by msg with wrong auth type """
1772 key = self.factory.create_random_key(self)
1773 key.add_vpp_config()
1774 vpp_session = VppBFDUDPSession(
1775 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1776 legitimate_test_session = BFDTestSession(
1777 self, self.pg0, AF_INET, sha1_key=key,
1778 bfd_key_id=vpp_session.bfd_key_id)
1779 rogue_test_session = BFDTestSession(
1780 self, self.pg0, AF_INET, sha1_key=key,
1781 bfd_key_id=vpp_session.bfd_key_id)
1782 self.execute_rogue_session_scenario(
1783 vpp_session, legitimate_test_session, rogue_test_session,
1784 {'auth_type': BFDAuthType.keyed_md5})
1786 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1787 def test_restart(self):
1788 """ simulate remote peer restart and resynchronization """
1789 key = self.factory.create_random_key(
1790 self, BFDAuthType.meticulous_keyed_sha1)
1791 key.add_vpp_config()
1792 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1793 self.pg0.remote_ip4, sha1_key=key)
1794 self.vpp_session.add_vpp_config()
1795 self.test_session = BFDTestSession(
1796 self, self.pg0, AF_INET, sha1_key=key,
1797 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1798 bfd_session_up(self)
1799 # don't send any packets for 2*detection_time
1800 detection_time = self.test_session.detect_mult *\
1801 self.vpp_session.required_min_rx / USEC_IN_SEC
1802 self.sleep(2 * detection_time, "simulating peer restart")
1803 events = self.vapi.collect_events()
1804 self.assert_equal(len(events), 1, "number of bfd events")
1805 verify_event(self, events[0], expected_state=BFDState.down)
1806 self.test_session.update(state=BFDState.down)
1807 # reset sequence number
1808 self.test_session.our_seq_number = 0
1809 self.test_session.vpp_seq_number = None
1810 # now throw away any pending packets
1811 self.pg0.enable_capture()
1812 bfd_session_up(self)
1815 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1816 class BFDAuthOnOffTestCase(VppTestCase):
1817 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1824 def setUpClass(cls):
1825 super(BFDAuthOnOffTestCase, cls).setUpClass()
1827 cls.create_pg_interfaces([0])
1828 cls.pg0.config_ip4()
1830 cls.pg0.resolve_arp()
1833 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1837 super(BFDAuthOnOffTestCase, self).setUp()
1838 self.factory = AuthKeyFactory()
1839 self.vapi.want_bfd_events()
1840 self.pg0.enable_capture()
1843 if not self.vpp_dead:
1844 self.vapi.want_bfd_events(enable_disable=0)
1845 self.vapi.collect_events() # clear the event queue
1846 super(BFDAuthOnOffTestCase, self).tearDown()
1848 def test_auth_on_immediate(self):
1849 """ turn auth on without disturbing session state (immediate) """
1850 key = self.factory.create_random_key(self)
1851 key.add_vpp_config()
1852 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1853 self.pg0.remote_ip4)
1854 self.vpp_session.add_vpp_config()
1855 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1856 bfd_session_up(self)
1857 for dummy in range(self.test_session.detect_mult * 2):
1858 p = wait_for_bfd_packet(self)
1859 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1860 self.test_session.send_packet()
1861 self.vpp_session.activate_auth(key)
1862 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1863 self.test_session.sha1_key = key
1864 for dummy in range(self.test_session.detect_mult * 2):
1865 p = wait_for_bfd_packet(self)
1866 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1867 self.test_session.send_packet()
1868 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1869 self.assert_equal(len(self.vapi.collect_events()), 0,
1870 "number of bfd events")
1872 def test_auth_off_immediate(self):
1873 """ turn auth off without disturbing session state (immediate) """
1874 key = self.factory.create_random_key(self)
1875 key.add_vpp_config()
1876 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1877 self.pg0.remote_ip4, sha1_key=key)
1878 self.vpp_session.add_vpp_config()
1879 self.test_session = BFDTestSession(
1880 self, self.pg0, AF_INET, sha1_key=key,
1881 bfd_key_id=self.vpp_session.bfd_key_id)
1882 bfd_session_up(self)
1883 # self.vapi.want_bfd_events(enable_disable=0)
1884 for dummy in range(self.test_session.detect_mult * 2):
1885 p = wait_for_bfd_packet(self)
1886 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1887 self.test_session.inc_seq_num()
1888 self.test_session.send_packet()
1889 self.vpp_session.deactivate_auth()
1890 self.test_session.bfd_key_id = None
1891 self.test_session.sha1_key = None
1892 for dummy in range(self.test_session.detect_mult * 2):
1893 p = wait_for_bfd_packet(self)
1894 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1895 self.test_session.inc_seq_num()
1896 self.test_session.send_packet()
1897 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1898 self.assert_equal(len(self.vapi.collect_events()), 0,
1899 "number of bfd events")
1901 def test_auth_change_key_immediate(self):
1902 """ change auth key without disturbing session state (immediate) """
1903 key1 = self.factory.create_random_key(self)
1904 key1.add_vpp_config()
1905 key2 = self.factory.create_random_key(self)
1906 key2.add_vpp_config()
1907 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1908 self.pg0.remote_ip4, sha1_key=key1)
1909 self.vpp_session.add_vpp_config()
1910 self.test_session = BFDTestSession(
1911 self, self.pg0, AF_INET, sha1_key=key1,
1912 bfd_key_id=self.vpp_session.bfd_key_id)
1913 bfd_session_up(self)
1914 for dummy in range(self.test_session.detect_mult * 2):
1915 p = wait_for_bfd_packet(self)
1916 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1917 self.test_session.send_packet()
1918 self.vpp_session.activate_auth(key2)
1919 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1920 self.test_session.sha1_key = key2
1921 for dummy in range(self.test_session.detect_mult * 2):
1922 p = wait_for_bfd_packet(self)
1923 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1924 self.test_session.send_packet()
1925 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1926 self.assert_equal(len(self.vapi.collect_events()), 0,
1927 "number of bfd events")
1929 def test_auth_on_delayed(self):
1930 """ turn auth on without disturbing session state (delayed) """
1931 key = self.factory.create_random_key(self)
1932 key.add_vpp_config()
1933 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1934 self.pg0.remote_ip4)
1935 self.vpp_session.add_vpp_config()
1936 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1937 bfd_session_up(self)
1938 for dummy in range(self.test_session.detect_mult * 2):
1939 wait_for_bfd_packet(self)
1940 self.test_session.send_packet()
1941 self.vpp_session.activate_auth(key, delayed=True)
1942 for dummy in range(self.test_session.detect_mult * 2):
1943 p = wait_for_bfd_packet(self)
1944 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1945 self.test_session.send_packet()
1946 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1947 self.test_session.sha1_key = key
1948 self.test_session.send_packet()
1949 for dummy in range(self.test_session.detect_mult * 2):
1950 p = wait_for_bfd_packet(self)
1951 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1952 self.test_session.send_packet()
1953 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1954 self.assert_equal(len(self.vapi.collect_events()), 0,
1955 "number of bfd events")
1957 def test_auth_off_delayed(self):
1958 """ turn auth off without disturbing session state (delayed) """
1959 key = self.factory.create_random_key(self)
1960 key.add_vpp_config()
1961 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1962 self.pg0.remote_ip4, sha1_key=key)
1963 self.vpp_session.add_vpp_config()
1964 self.test_session = BFDTestSession(
1965 self, self.pg0, AF_INET, sha1_key=key,
1966 bfd_key_id=self.vpp_session.bfd_key_id)
1967 bfd_session_up(self)
1968 for dummy in range(self.test_session.detect_mult * 2):
1969 p = wait_for_bfd_packet(self)
1970 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1971 self.test_session.send_packet()
1972 self.vpp_session.deactivate_auth(delayed=True)
1973 for dummy in range(self.test_session.detect_mult * 2):
1974 p = wait_for_bfd_packet(self)
1975 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1976 self.test_session.send_packet()
1977 self.test_session.bfd_key_id = None
1978 self.test_session.sha1_key = None
1979 self.test_session.send_packet()
1980 for dummy in range(self.test_session.detect_mult * 2):
1981 p = wait_for_bfd_packet(self)
1982 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1983 self.test_session.send_packet()
1984 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1985 self.assert_equal(len(self.vapi.collect_events()), 0,
1986 "number of bfd events")
1988 def test_auth_change_key_delayed(self):
1989 """ change auth key without disturbing session state (delayed) """
1990 key1 = self.factory.create_random_key(self)
1991 key1.add_vpp_config()
1992 key2 = self.factory.create_random_key(self)
1993 key2.add_vpp_config()
1994 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1995 self.pg0.remote_ip4, sha1_key=key1)
1996 self.vpp_session.add_vpp_config()
1997 self.vpp_session.admin_up()
1998 self.test_session = BFDTestSession(
1999 self, self.pg0, AF_INET, sha1_key=key1,
2000 bfd_key_id=self.vpp_session.bfd_key_id)
2001 bfd_session_up(self)
2002 for dummy in range(self.test_session.detect_mult * 2):
2003 p = wait_for_bfd_packet(self)
2004 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2005 self.test_session.send_packet()
2006 self.vpp_session.activate_auth(key2, delayed=True)
2007 for dummy in range(self.test_session.detect_mult * 2):
2008 p = wait_for_bfd_packet(self)
2009 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2010 self.test_session.send_packet()
2011 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2012 self.test_session.sha1_key = key2
2013 self.test_session.send_packet()
2014 for dummy in range(self.test_session.detect_mult * 2):
2015 p = wait_for_bfd_packet(self)
2016 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2017 self.test_session.send_packet()
2018 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2019 self.assert_equal(len(self.vapi.collect_events()), 0,
2020 "number of bfd events")
2023 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2024 class BFDCLITestCase(VppTestCase):
2025 """Bidirectional Forwarding Detection (BFD) (CLI) """
2029 def setUpClass(cls):
2030 super(BFDCLITestCase, cls).setUpClass()
2033 cls.create_pg_interfaces((0,))
2034 cls.pg0.config_ip4()
2035 cls.pg0.config_ip6()
2036 cls.pg0.resolve_arp()
2037 cls.pg0.resolve_ndp()
2040 super(BFDCLITestCase, cls).tearDownClass()
2044 super(BFDCLITestCase, self).setUp()
2045 self.factory = AuthKeyFactory()
2046 self.pg0.enable_capture()
2050 self.vapi.want_bfd_events(enable_disable=0)
2051 except UnexpectedApiReturnValueError:
2052 # some tests aren't subscribed, so this is not an issue
2054 self.vapi.collect_events() # clear the event queue
2055 super(BFDCLITestCase, self).tearDown()
2057 def cli_verify_no_response(self, cli):
2058 """ execute a CLI, asserting that the response is empty """
2059 self.assert_equal(self.vapi.cli(cli),
2061 "CLI command response")
2063 def cli_verify_response(self, cli, expected):
2064 """ execute a CLI, asserting that the response matches expectation """
2065 self.assert_equal(self.vapi.cli(cli).strip(),
2067 "CLI command response")
2069 def test_show(self):
2070 """ show commands """
2071 k1 = self.factory.create_random_key(self)
2073 k2 = self.factory.create_random_key(
2074 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2076 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2078 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2081 self.logger.info(self.vapi.ppcli("show bfd keys"))
2082 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2083 self.logger.info(self.vapi.ppcli("show bfd"))
2085 def test_set_del_sha1_key(self):
2086 """ set/delete SHA1 auth key """
2087 k = self.factory.create_random_key(self)
2088 self.registry.register(k, self.logger)
2089 self.cli_verify_no_response(
2090 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2092 "".join("{:02x}".format(ord(c)) for c in k.key)))
2093 self.assertTrue(k.query_vpp_config())
2094 self.vpp_session = VppBFDUDPSession(
2095 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2096 self.vpp_session.add_vpp_config()
2097 self.test_session = \
2098 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2099 bfd_key_id=self.vpp_session.bfd_key_id)
2100 self.vapi.want_bfd_events()
2101 bfd_session_up(self)
2102 bfd_session_down(self)
2103 # try to replace the secret for the key - should fail because the key
2105 k2 = self.factory.create_random_key(self)
2106 self.cli_verify_response(
2107 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2109 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2110 "bfd key set: `bfd_auth_set_key' API call failed, "
2111 "rv=-103:BFD object in use")
2112 # manipulating the session using old secret should still work
2113 bfd_session_up(self)
2114 bfd_session_down(self)
2115 self.vpp_session.remove_vpp_config()
2116 self.cli_verify_no_response(
2117 "bfd key del conf-key-id %s" % k.conf_key_id)
2118 self.assertFalse(k.query_vpp_config())
2120 def test_set_del_meticulous_sha1_key(self):
2121 """ set/delete meticulous SHA1 auth key """
2122 k = self.factory.create_random_key(
2123 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2124 self.registry.register(k, self.logger)
2125 self.cli_verify_no_response(
2126 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2128 "".join("{:02x}".format(ord(c)) for c in k.key)))
2129 self.assertTrue(k.query_vpp_config())
2130 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2131 self.pg0.remote_ip6, af=AF_INET6,
2133 self.vpp_session.add_vpp_config()
2134 self.vpp_session.admin_up()
2135 self.test_session = \
2136 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2137 bfd_key_id=self.vpp_session.bfd_key_id)
2138 self.vapi.want_bfd_events()
2139 bfd_session_up(self)
2140 bfd_session_down(self)
2141 # try to replace the secret for the key - should fail because the key
2143 k2 = self.factory.create_random_key(self)
2144 self.cli_verify_response(
2145 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2147 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2148 "bfd key set: `bfd_auth_set_key' API call failed, "
2149 "rv=-103:BFD object in use")
2150 # manipulating the session using old secret should still work
2151 bfd_session_up(self)
2152 bfd_session_down(self)
2153 self.vpp_session.remove_vpp_config()
2154 self.cli_verify_no_response(
2155 "bfd key del conf-key-id %s" % k.conf_key_id)
2156 self.assertFalse(k.query_vpp_config())
2158 def test_add_mod_del_bfd_udp(self):
2159 """ create/modify/delete IPv4 BFD UDP session """
2160 vpp_session = VppBFDUDPSession(
2161 self, self.pg0, self.pg0.remote_ip4)
2162 self.registry.register(vpp_session, self.logger)
2163 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2164 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2165 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2166 self.pg0.remote_ip4,
2167 vpp_session.desired_min_tx,
2168 vpp_session.required_min_rx,
2169 vpp_session.detect_mult)
2170 self.cli_verify_no_response(cli_add_cmd)
2171 # 2nd add should fail
2172 self.cli_verify_response(
2174 "bfd udp session add: `bfd_add_add_session' API call"
2175 " failed, rv=-101:Duplicate BFD object")
2176 verify_bfd_session_config(self, vpp_session)
2177 mod_session = VppBFDUDPSession(
2178 self, self.pg0, self.pg0.remote_ip4,
2179 required_min_rx=2 * vpp_session.required_min_rx,
2180 desired_min_tx=3 * vpp_session.desired_min_tx,
2181 detect_mult=4 * vpp_session.detect_mult)
2182 self.cli_verify_no_response(
2183 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2184 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2185 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2186 mod_session.desired_min_tx, mod_session.required_min_rx,
2187 mod_session.detect_mult))
2188 verify_bfd_session_config(self, mod_session)
2189 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2190 "peer-addr %s" % (self.pg0.name,
2191 self.pg0.local_ip4, self.pg0.remote_ip4)
2192 self.cli_verify_no_response(cli_del_cmd)
2193 # 2nd del is expected to fail
2194 self.cli_verify_response(
2195 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2196 " failed, rv=-102:No such BFD object")
2197 self.assertFalse(vpp_session.query_vpp_config())
2199 def test_add_mod_del_bfd_udp6(self):
2200 """ create/modify/delete IPv6 BFD UDP session """
2201 vpp_session = VppBFDUDPSession(
2202 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2203 self.registry.register(vpp_session, self.logger)
2204 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2205 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2206 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2207 self.pg0.remote_ip6,
2208 vpp_session.desired_min_tx,
2209 vpp_session.required_min_rx,
2210 vpp_session.detect_mult)
2211 self.cli_verify_no_response(cli_add_cmd)
2212 # 2nd add should fail
2213 self.cli_verify_response(
2215 "bfd udp session add: `bfd_add_add_session' API call"
2216 " failed, rv=-101:Duplicate BFD object")
2217 verify_bfd_session_config(self, vpp_session)
2218 mod_session = VppBFDUDPSession(
2219 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2220 required_min_rx=2 * vpp_session.required_min_rx,
2221 desired_min_tx=3 * vpp_session.desired_min_tx,
2222 detect_mult=4 * vpp_session.detect_mult)
2223 self.cli_verify_no_response(
2224 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2225 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2226 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2227 mod_session.desired_min_tx,
2228 mod_session.required_min_rx, mod_session.detect_mult))
2229 verify_bfd_session_config(self, mod_session)
2230 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2231 "peer-addr %s" % (self.pg0.name,
2232 self.pg0.local_ip6, self.pg0.remote_ip6)
2233 self.cli_verify_no_response(cli_del_cmd)
2234 # 2nd del is expected to fail
2235 self.cli_verify_response(
2237 "bfd udp session del: `bfd_udp_del_session' API call"
2238 " failed, rv=-102:No such BFD object")
2239 self.assertFalse(vpp_session.query_vpp_config())
2241 def test_add_mod_del_bfd_udp_auth(self):
2242 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2243 key = self.factory.create_random_key(self)
2244 key.add_vpp_config()
2245 vpp_session = VppBFDUDPSession(
2246 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2247 self.registry.register(vpp_session, self.logger)
2248 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2249 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2250 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2251 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2252 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2253 vpp_session.detect_mult, key.conf_key_id,
2254 vpp_session.bfd_key_id)
2255 self.cli_verify_no_response(cli_add_cmd)
2256 # 2nd add should fail
2257 self.cli_verify_response(
2259 "bfd udp session add: `bfd_add_add_session' API call"
2260 " failed, rv=-101:Duplicate BFD object")
2261 verify_bfd_session_config(self, vpp_session)
2262 mod_session = VppBFDUDPSession(
2263 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2264 bfd_key_id=vpp_session.bfd_key_id,
2265 required_min_rx=2 * vpp_session.required_min_rx,
2266 desired_min_tx=3 * vpp_session.desired_min_tx,
2267 detect_mult=4 * vpp_session.detect_mult)
2268 self.cli_verify_no_response(
2269 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2270 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2271 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2272 mod_session.desired_min_tx,
2273 mod_session.required_min_rx, mod_session.detect_mult))
2274 verify_bfd_session_config(self, mod_session)
2275 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2276 "peer-addr %s" % (self.pg0.name,
2277 self.pg0.local_ip4, self.pg0.remote_ip4)
2278 self.cli_verify_no_response(cli_del_cmd)
2279 # 2nd del is expected to fail
2280 self.cli_verify_response(
2282 "bfd udp session del: `bfd_udp_del_session' API call"
2283 " failed, rv=-102:No such BFD object")
2284 self.assertFalse(vpp_session.query_vpp_config())
2286 def test_add_mod_del_bfd_udp6_auth(self):
2287 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2288 key = self.factory.create_random_key(
2289 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2290 key.add_vpp_config()
2291 vpp_session = VppBFDUDPSession(
2292 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2293 self.registry.register(vpp_session, self.logger)
2294 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2295 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2296 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2297 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2298 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2299 vpp_session.detect_mult, key.conf_key_id,
2300 vpp_session.bfd_key_id)
2301 self.cli_verify_no_response(cli_add_cmd)
2302 # 2nd add should fail
2303 self.cli_verify_response(
2305 "bfd udp session add: `bfd_add_add_session' API call"
2306 " failed, rv=-101:Duplicate BFD object")
2307 verify_bfd_session_config(self, vpp_session)
2308 mod_session = VppBFDUDPSession(
2309 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2310 bfd_key_id=vpp_session.bfd_key_id,
2311 required_min_rx=2 * vpp_session.required_min_rx,
2312 desired_min_tx=3 * vpp_session.desired_min_tx,
2313 detect_mult=4 * vpp_session.detect_mult)
2314 self.cli_verify_no_response(
2315 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2316 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2317 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2318 mod_session.desired_min_tx,
2319 mod_session.required_min_rx, mod_session.detect_mult))
2320 verify_bfd_session_config(self, mod_session)
2321 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2322 "peer-addr %s" % (self.pg0.name,
2323 self.pg0.local_ip6, self.pg0.remote_ip6)
2324 self.cli_verify_no_response(cli_del_cmd)
2325 # 2nd del is expected to fail
2326 self.cli_verify_response(
2328 "bfd udp session del: `bfd_udp_del_session' API call"
2329 " failed, rv=-102:No such BFD object")
2330 self.assertFalse(vpp_session.query_vpp_config())
2332 def test_auth_on_off(self):
2333 """ turn authentication on and off """
2334 key = self.factory.create_random_key(
2335 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2336 key.add_vpp_config()
2337 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2338 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2340 session.add_vpp_config()
2342 "bfd udp session auth activate interface %s local-addr %s "\
2343 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2344 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2345 key.conf_key_id, auth_session.bfd_key_id)
2346 self.cli_verify_no_response(cli_activate)
2347 verify_bfd_session_config(self, auth_session)
2348 self.cli_verify_no_response(cli_activate)
2349 verify_bfd_session_config(self, auth_session)
2351 "bfd udp session auth deactivate interface %s local-addr %s "\
2353 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2354 self.cli_verify_no_response(cli_deactivate)
2355 verify_bfd_session_config(self, session)
2356 self.cli_verify_no_response(cli_deactivate)
2357 verify_bfd_session_config(self, session)
2359 def test_auth_on_off_delayed(self):
2360 """ turn authentication on and off (delayed) """
2361 key = self.factory.create_random_key(
2362 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2363 key.add_vpp_config()
2364 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2365 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2367 session.add_vpp_config()
2369 "bfd udp session auth activate interface %s local-addr %s "\
2370 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2371 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2372 key.conf_key_id, auth_session.bfd_key_id)
2373 self.cli_verify_no_response(cli_activate)
2374 verify_bfd_session_config(self, auth_session)
2375 self.cli_verify_no_response(cli_activate)
2376 verify_bfd_session_config(self, auth_session)
2378 "bfd udp session auth deactivate interface %s local-addr %s "\
2379 "peer-addr %s delayed yes"\
2380 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2381 self.cli_verify_no_response(cli_deactivate)
2382 verify_bfd_session_config(self, session)
2383 self.cli_verify_no_response(cli_deactivate)
2384 verify_bfd_session_config(self, session)
2386 def test_admin_up_down(self):
2387 """ put session admin-up and admin-down """
2388 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2389 session.add_vpp_config()
2391 "bfd udp session set-flags admin down interface %s local-addr %s "\
2393 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2395 "bfd udp session set-flags admin up interface %s local-addr %s "\
2397 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2398 self.cli_verify_no_response(cli_down)
2399 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2400 self.cli_verify_no_response(cli_up)
2401 verify_bfd_session_config(self, session, state=BFDState.down)
2403 def test_set_del_udp_echo_source(self):
2404 """ set/del udp echo source """
2405 self.create_loopback_interfaces([0])
2406 self.loopback0 = self.lo_interfaces[0]
2407 self.loopback0.admin_up()
2408 self.cli_verify_response("show bfd echo-source",
2409 "UDP echo source is not set.")
2410 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2411 self.cli_verify_no_response(cli_set)
2412 self.cli_verify_response("show bfd echo-source",
2413 "UDP echo source is: %s\n"
2414 "IPv4 address usable as echo source: none\n"
2415 "IPv6 address usable as echo source: none" %
2416 self.loopback0.name)
2417 self.loopback0.config_ip4()
2418 unpacked = unpack("!L", self.loopback0.local_ip4n)
2419 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2420 self.cli_verify_response("show bfd echo-source",
2421 "UDP echo source is: %s\n"
2422 "IPv4 address usable as echo source: %s\n"
2423 "IPv6 address usable as echo source: none" %
2424 (self.loopback0.name, echo_ip4))
2425 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2426 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2427 unpacked[2], unpacked[3] ^ 1))
2428 self.loopback0.config_ip6()
2429 self.cli_verify_response("show bfd echo-source",
2430 "UDP echo source is: %s\n"
2431 "IPv4 address usable as echo source: %s\n"
2432 "IPv6 address usable as echo source: %s" %
2433 (self.loopback0.name, echo_ip4, echo_ip6))
2434 cli_del = "bfd udp echo-source del"
2435 self.cli_verify_no_response(cli_del)
2436 self.cli_verify_response("show bfd echo-source",
2437 "UDP echo source is not set.")
2439 if __name__ == '__main__':
2440 unittest.main(testRunner=VppTestRunner)