4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from vpp_lo_interface import VppLoInterface
22 from vpp_papi_provider import UnexpectedApiReturnValueError
23 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
28 class AuthKeyFactory(object):
29 """Factory class for creating auth keys with unique conf key ID"""
32 self._conf_key_ids = {}
34 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
35 """ create a random key with unique conf key id """
36 conf_key_id = randint(0, 0xFFFFFFFF)
37 while conf_key_id in self._conf_key_ids:
38 conf_key_id = randint(0, 0xFFFFFFFF)
39 self._conf_key_ids[conf_key_id] = 1
40 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
41 return VppBFDAuthKey(test=test, auth_type=auth_type,
42 conf_key_id=conf_key_id, key=key)
45 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
46 class BFDAPITestCase(VppTestCase):
47 """Bidirectional Forwarding Detection (BFD) - API"""
54 super(BFDAPITestCase, cls).setUpClass()
57 cls.create_pg_interfaces(range(2))
58 for i in cls.pg_interfaces:
64 super(BFDAPITestCase, cls).tearDownClass()
68 super(BFDAPITestCase, self).setUp()
69 self.factory = AuthKeyFactory()
71 def test_add_bfd(self):
72 """ create a BFD session """
73 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
74 session.add_vpp_config()
75 self.logger.debug("Session state is %s", session.state)
76 session.remove_vpp_config()
77 session.add_vpp_config()
78 self.logger.debug("Session state is %s", session.state)
79 session.remove_vpp_config()
81 def test_double_add(self):
82 """ create the same BFD session twice (negative case) """
83 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
84 session.add_vpp_config()
86 with self.vapi.expect_negative_api_retval():
87 session.add_vpp_config()
89 session.remove_vpp_config()
91 def test_add_bfd6(self):
92 """ create IPv6 BFD session """
93 session = VppBFDUDPSession(
94 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
95 session.add_vpp_config()
96 self.logger.debug("Session state is %s", session.state)
97 session.remove_vpp_config()
98 session.add_vpp_config()
99 self.logger.debug("Session state is %s", session.state)
100 session.remove_vpp_config()
102 def test_mod_bfd(self):
103 """ modify BFD session parameters """
104 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
105 desired_min_tx=50000,
106 required_min_rx=10000,
108 session.add_vpp_config()
109 s = session.get_bfd_udp_session_dump_entry()
110 self.assert_equal(session.desired_min_tx,
112 "desired min transmit interval")
113 self.assert_equal(session.required_min_rx,
115 "required min receive interval")
116 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
117 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
118 required_min_rx=session.required_min_rx * 2,
119 detect_mult=session.detect_mult * 2)
120 s = session.get_bfd_udp_session_dump_entry()
121 self.assert_equal(session.desired_min_tx,
123 "desired min transmit interval")
124 self.assert_equal(session.required_min_rx,
126 "required min receive interval")
127 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129 def test_add_sha1_keys(self):
130 """ add SHA1 keys """
132 keys = [self.factory.create_random_key(
133 self) for i in range(0, key_count)]
135 self.assertFalse(key.query_vpp_config())
139 self.assertTrue(key.query_vpp_config())
141 indexes = range(key_count)
146 key.remove_vpp_config()
148 for j in range(key_count):
151 self.assertFalse(key.query_vpp_config())
153 self.assertTrue(key.query_vpp_config())
154 # should be removed now
156 self.assertFalse(key.query_vpp_config())
157 # add back and remove again
161 self.assertTrue(key.query_vpp_config())
163 key.remove_vpp_config()
165 self.assertFalse(key.query_vpp_config())
167 def test_add_bfd_sha1(self):
168 """ create a BFD session (SHA1) """
169 key = self.factory.create_random_key(self)
171 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
173 session.add_vpp_config()
174 self.logger.debug("Session state is %s", session.state)
175 session.remove_vpp_config()
176 session.add_vpp_config()
177 self.logger.debug("Session state is %s", session.state)
178 session.remove_vpp_config()
180 def test_double_add_sha1(self):
181 """ create the same BFD session twice (negative case) (SHA1) """
182 key = self.factory.create_random_key(self)
184 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186 session.add_vpp_config()
187 with self.assertRaises(Exception):
188 session.add_vpp_config()
190 def test_add_auth_nonexistent_key(self):
191 """ create BFD session using non-existent SHA1 (negative case) """
192 session = VppBFDUDPSession(
193 self, self.pg0, self.pg0.remote_ip4,
194 sha1_key=self.factory.create_random_key(self))
195 with self.assertRaises(Exception):
196 session.add_vpp_config()
198 def test_shared_sha1_key(self):
199 """ share single SHA1 key between multiple BFD sessions """
200 key = self.factory.create_random_key(self)
203 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
205 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
206 sha1_key=key, af=AF_INET6),
207 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
209 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
210 sha1_key=key, af=AF_INET6)]
215 e = key.get_bfd_auth_keys_dump_entry()
216 self.assert_equal(e.use_count, len(sessions) - removed,
217 "Use count for shared key")
218 s.remove_vpp_config()
220 e = key.get_bfd_auth_keys_dump_entry()
221 self.assert_equal(e.use_count, len(sessions) - removed,
222 "Use count for shared key")
224 def test_activate_auth(self):
225 """ activate SHA1 authentication """
226 key = self.factory.create_random_key(self)
228 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
229 session.add_vpp_config()
230 session.activate_auth(key)
232 def test_deactivate_auth(self):
233 """ deactivate SHA1 authentication """
234 key = self.factory.create_random_key(self)
236 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
237 session.add_vpp_config()
238 session.activate_auth(key)
239 session.deactivate_auth()
241 def test_change_key(self):
242 """ change SHA1 key """
243 key1 = self.factory.create_random_key(self)
244 key2 = self.factory.create_random_key(self)
245 while key2.conf_key_id == key1.conf_key_id:
246 key2 = self.factory.create_random_key(self)
247 key1.add_vpp_config()
248 key2.add_vpp_config()
249 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
251 session.add_vpp_config()
252 session.activate_auth(key2)
255 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
256 class BFDTestSession(object):
257 """ BFD session as seen from test framework side """
259 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
260 bfd_key_id=None, our_seq_number=None):
263 self.sha1_key = sha1_key
264 self.bfd_key_id = bfd_key_id
265 self.interface = interface
266 self.udp_sport = randint(49152, 65535)
267 if our_seq_number is None:
268 self.our_seq_number = randint(0, 40000000)
270 self.our_seq_number = our_seq_number
271 self.vpp_seq_number = None
272 self.my_discriminator = 0
273 self.desired_min_tx = 300000
274 self.required_min_rx = 300000
275 self.required_min_echo_rx = None
276 self.detect_mult = detect_mult
277 self.diag = BFDDiagCode.no_diagnostic
278 self.your_discriminator = None
279 self.state = BFDState.down
280 self.auth_type = BFDAuthType.no_auth
282 def inc_seq_num(self):
283 """ increment sequence number, wrapping if needed """
284 if self.our_seq_number == 0xFFFFFFFF:
285 self.our_seq_number = 0
287 self.our_seq_number += 1
289 def update(self, my_discriminator=None, your_discriminator=None,
290 desired_min_tx=None, required_min_rx=None,
291 required_min_echo_rx=None, detect_mult=None,
292 diag=None, state=None, auth_type=None):
293 """ update BFD parameters associated with session """
294 if my_discriminator is not None:
295 self.my_discriminator = my_discriminator
296 if your_discriminator is not None:
297 self.your_discriminator = your_discriminator
298 if required_min_rx is not None:
299 self.required_min_rx = required_min_rx
300 if required_min_echo_rx is not None:
301 self.required_min_echo_rx = required_min_echo_rx
302 if desired_min_tx is not None:
303 self.desired_min_tx = desired_min_tx
304 if detect_mult is not None:
305 self.detect_mult = detect_mult
308 if state is not None:
310 if auth_type is not None:
311 self.auth_type = auth_type
313 def fill_packet_fields(self, packet):
314 """ set packet fields with known values in packet """
316 if self.my_discriminator:
317 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
318 self.my_discriminator)
319 bfd.my_discriminator = self.my_discriminator
320 if self.your_discriminator:
321 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
322 self.your_discriminator)
323 bfd.your_discriminator = self.your_discriminator
324 if self.required_min_rx:
325 self.test.logger.debug(
326 "BFD: setting packet.required_min_rx_interval=%s",
327 self.required_min_rx)
328 bfd.required_min_rx_interval = self.required_min_rx
329 if self.required_min_echo_rx:
330 self.test.logger.debug(
331 "BFD: setting packet.required_min_echo_rx=%s",
332 self.required_min_echo_rx)
333 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
334 if self.desired_min_tx:
335 self.test.logger.debug(
336 "BFD: setting packet.desired_min_tx_interval=%s",
338 bfd.desired_min_tx_interval = self.desired_min_tx
340 self.test.logger.debug(
341 "BFD: setting packet.detect_mult=%s", self.detect_mult)
342 bfd.detect_mult = self.detect_mult
344 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
347 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
348 bfd.state = self.state
350 # this is used by a negative test-case
351 self.test.logger.debug("BFD: setting packet.auth_type=%s",
353 bfd.auth_type = self.auth_type
355 def create_packet(self):
356 """ create a BFD packet, reflecting the current state of session """
359 bfd.auth_type = self.sha1_key.auth_type
360 bfd.auth_len = BFD.sha1_auth_len
361 bfd.auth_key_id = self.bfd_key_id
362 bfd.auth_seq_num = self.our_seq_number
363 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
366 if self.af == AF_INET6:
367 packet = (Ether(src=self.interface.remote_mac,
368 dst=self.interface.local_mac) /
369 IPv6(src=self.interface.remote_ip6,
370 dst=self.interface.local_ip6,
372 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
375 packet = (Ether(src=self.interface.remote_mac,
376 dst=self.interface.local_mac) /
377 IP(src=self.interface.remote_ip4,
378 dst=self.interface.local_ip4,
380 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
382 self.test.logger.debug("BFD: Creating packet")
383 self.fill_packet_fields(packet)
385 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
386 "\0" * (20 - len(self.sha1_key.key))
387 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
388 hashlib.sha1(hash_material).hexdigest())
389 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
392 def send_packet(self, packet=None, interface=None):
393 """ send packet on interface, creating the packet if needed """
395 packet = self.create_packet()
396 if interface is None:
397 interface = self.test.pg0
398 self.test.logger.debug(ppp("Sending packet:", packet))
399 interface.add_stream(packet)
402 def verify_sha1_auth(self, packet):
403 """ Verify correctness of authentication in BFD layer. """
405 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
406 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
408 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
409 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
410 if self.vpp_seq_number is None:
411 self.vpp_seq_number = bfd.auth_seq_num
412 self.test.logger.debug("Received initial sequence number: %s" %
415 recvd_seq_num = bfd.auth_seq_num
416 self.test.logger.debug("Received followup sequence number: %s" %
418 if self.vpp_seq_number < 0xffffffff:
419 if self.sha1_key.auth_type == \
420 BFDAuthType.meticulous_keyed_sha1:
421 self.test.assert_equal(recvd_seq_num,
422 self.vpp_seq_number + 1,
423 "BFD sequence number")
425 self.test.assert_in_range(recvd_seq_num,
427 self.vpp_seq_number + 1,
428 "BFD sequence number")
430 if self.sha1_key.auth_type == \
431 BFDAuthType.meticulous_keyed_sha1:
432 self.test.assert_equal(recvd_seq_num, 0,
433 "BFD sequence number")
435 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
436 "BFD sequence number not one of "
437 "(%s, 0)" % self.vpp_seq_number)
438 self.vpp_seq_number = recvd_seq_num
439 # last 20 bytes represent the hash - so replace them with the key,
440 # pad the result with zeros and hash the result
441 hash_material = bfd.original[:-20] + self.sha1_key.key + \
442 "\0" * (20 - len(self.sha1_key.key))
443 expected_hash = hashlib.sha1(hash_material).hexdigest()
444 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
445 expected_hash, "Auth key hash")
447 def verify_bfd(self, packet):
448 """ Verify correctness of BFD layer. """
450 self.test.assert_equal(bfd.version, 1, "BFD version")
451 self.test.assert_equal(bfd.your_discriminator,
452 self.my_discriminator,
453 "BFD - your discriminator")
455 self.verify_sha1_auth(packet)
458 def bfd_session_up(test):
459 """ Bring BFD session up """
460 test.logger.info("BFD: Waiting for slow hello")
461 p = wait_for_bfd_packet(test, 2)
463 if hasattr(test, 'vpp_clock_offset'):
464 old_offset = test.vpp_clock_offset
465 test.vpp_clock_offset = time.time() - p.time
466 test.logger.debug("BFD: Calculated vpp clock offset: %s",
467 test.vpp_clock_offset)
469 test.assertAlmostEqual(
470 old_offset, test.vpp_clock_offset, delta=0.5,
471 msg="vpp clock offset not stable (new: %s, old: %s)" %
472 (test.vpp_clock_offset, old_offset))
473 test.logger.info("BFD: Sending Init")
474 test.test_session.update(my_discriminator=randint(0, 40000000),
475 your_discriminator=p[BFD].my_discriminator,
477 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
478 BFDAuthType.meticulous_keyed_sha1:
479 test.test_session.inc_seq_num()
480 test.test_session.send_packet()
481 test.logger.info("BFD: Waiting for event")
482 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
483 verify_event(test, e, expected_state=BFDState.up)
484 test.logger.info("BFD: Session is Up")
485 test.test_session.update(state=BFDState.up)
486 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
487 BFDAuthType.meticulous_keyed_sha1:
488 test.test_session.inc_seq_num()
489 test.test_session.send_packet()
490 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
493 def bfd_session_down(test):
494 """ Bring BFD session down """
495 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
496 test.test_session.update(state=BFDState.down)
497 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
498 BFDAuthType.meticulous_keyed_sha1:
499 test.test_session.inc_seq_num()
500 test.test_session.send_packet()
501 test.logger.info("BFD: Waiting for event")
502 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
503 verify_event(test, e, expected_state=BFDState.down)
504 test.logger.info("BFD: Session is Down")
505 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
508 def verify_bfd_session_config(test, session, state=None):
509 dump = session.get_bfd_udp_session_dump_entry()
510 test.assertIsNotNone(dump)
511 # since dump is not none, we have verified that sw_if_index and addresses
512 # are valid (in get_bfd_udp_session_dump_entry)
514 test.assert_equal(dump.state, state, "session state")
515 test.assert_equal(dump.required_min_rx, session.required_min_rx,
516 "required min rx interval")
517 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
518 "desired min tx interval")
519 test.assert_equal(dump.detect_mult, session.detect_mult,
521 if session.sha1_key is None:
522 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
524 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
525 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
527 test.assert_equal(dump.conf_key_id,
528 session.sha1_key.conf_key_id,
532 def verify_ip(test, packet):
533 """ Verify correctness of IP layer. """
534 if test.vpp_session.af == AF_INET6:
536 local_ip = test.pg0.local_ip6
537 remote_ip = test.pg0.remote_ip6
538 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
541 local_ip = test.pg0.local_ip4
542 remote_ip = test.pg0.remote_ip4
543 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
544 test.assert_equal(ip.src, local_ip, "IP source address")
545 test.assert_equal(ip.dst, remote_ip, "IP destination address")
548 def verify_udp(test, packet):
549 """ Verify correctness of UDP layer. """
551 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
552 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
556 def verify_event(test, event, expected_state):
557 """ Verify correctness of event values. """
559 test.logger.debug("BFD: Event: %s" % repr(e))
560 test.assert_equal(e.sw_if_index,
561 test.vpp_session.interface.sw_if_index,
562 "BFD interface index")
564 if test.vpp_session.af == AF_INET6:
566 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
567 if test.vpp_session.af == AF_INET:
568 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
569 "Local IPv4 address")
570 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
573 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
574 "Local IPv6 address")
575 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
577 test.assert_equal(e.state, expected_state, BFDState)
580 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
581 """ wait for BFD packet and verify its correctness
583 :param timeout: how long to wait
584 :param pcap_time_min: ignore packets with pcap timestamp lower than this
586 :returns: tuple (packet, time spent waiting for packet)
588 test.logger.info("BFD: Waiting for BFD packet")
589 deadline = time.time() + timeout
594 test.assert_in_range(counter, 0, 100, "number of packets ignored")
595 time_left = deadline - time.time()
597 raise CaptureTimeoutError("Packet did not arrive within timeout")
598 p = test.pg0.wait_for_packet(timeout=time_left)
599 test.logger.debug(ppp("BFD: Got packet:", p))
600 if pcap_time_min is not None and p.time < pcap_time_min:
601 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
602 "pcap time min %s):" %
603 (p.time, pcap_time_min), p))
608 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
610 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
613 test.test_session.verify_bfd(p)
617 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
618 class BFD4TestCase(VppTestCase):
619 """Bidirectional Forwarding Detection (BFD)"""
622 vpp_clock_offset = None
628 super(BFD4TestCase, cls).setUpClass()
630 cls.create_pg_interfaces([0])
631 cls.create_loopback_interfaces([0])
632 cls.loopback0 = cls.lo_interfaces[0]
633 cls.loopback0.config_ip4()
634 cls.loopback0.admin_up()
636 cls.pg0.configure_ipv4_neighbors()
638 cls.pg0.resolve_arp()
641 super(BFD4TestCase, cls).tearDownClass()
645 super(BFD4TestCase, self).setUp()
646 self.factory = AuthKeyFactory()
647 self.vapi.want_bfd_events()
648 self.pg0.enable_capture()
650 self.vpp_session = VppBFDUDPSession(self, self.pg0,
652 self.vpp_session.add_vpp_config()
653 self.vpp_session.admin_up()
654 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
656 self.vapi.want_bfd_events(enable_disable=0)
660 if not self.vpp_dead:
661 self.vapi.want_bfd_events(enable_disable=0)
662 self.vapi.collect_events() # clear the event queue
663 super(BFD4TestCase, self).tearDown()
665 def test_session_up(self):
666 """ bring BFD session up """
669 def test_session_up_by_ip(self):
670 """ bring BFD session up - first frame looked up by address pair """
671 self.logger.info("BFD: Sending Slow control frame")
672 self.test_session.update(my_discriminator=randint(0, 40000000))
673 self.test_session.send_packet()
674 self.pg0.enable_capture()
675 p = self.pg0.wait_for_packet(1)
676 self.assert_equal(p[BFD].your_discriminator,
677 self.test_session.my_discriminator,
678 "BFD - your discriminator")
679 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
680 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
682 self.logger.info("BFD: Waiting for event")
683 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
684 verify_event(self, e, expected_state=BFDState.init)
685 self.logger.info("BFD: Sending Up")
686 self.test_session.send_packet()
687 self.logger.info("BFD: Waiting for event")
688 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
689 verify_event(self, e, expected_state=BFDState.up)
690 self.logger.info("BFD: Session is Up")
691 self.test_session.update(state=BFDState.up)
692 self.test_session.send_packet()
693 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
695 def test_session_down(self):
696 """ bring BFD session down """
698 bfd_session_down(self)
700 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
701 def test_hold_up(self):
702 """ hold BFD session up """
704 for dummy in range(self.test_session.detect_mult * 2):
705 wait_for_bfd_packet(self)
706 self.test_session.send_packet()
707 self.assert_equal(len(self.vapi.collect_events()), 0,
708 "number of bfd events")
710 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
711 def test_slow_timer(self):
712 """ verify slow periodic control frames while session down """
714 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
715 prev_packet = wait_for_bfd_packet(self, 2)
716 for dummy in range(packet_count):
717 next_packet = wait_for_bfd_packet(self, 2)
718 time_diff = next_packet.time - prev_packet.time
719 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
720 # to work around timing issues
721 self.assert_in_range(
722 time_diff, 0.70, 1.05, "time between slow packets")
723 prev_packet = next_packet
725 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
726 def test_zero_remote_min_rx(self):
727 """ no packets when zero remote required min rx interval """
729 self.test_session.update(required_min_rx=0)
730 self.test_session.send_packet()
731 for dummy in range(self.test_session.detect_mult):
732 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
733 "sleep before transmitting bfd packet")
734 self.test_session.send_packet()
736 p = wait_for_bfd_packet(self, timeout=0)
737 self.logger.error(ppp("Received unexpected packet:", p))
738 except CaptureTimeoutError:
741 len(self.vapi.collect_events()), 0, "number of bfd events")
742 self.test_session.update(required_min_rx=300000)
743 for dummy in range(3):
744 self.test_session.send_packet()
746 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
748 len(self.vapi.collect_events()), 0, "number of bfd events")
750 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
751 def test_conn_down(self):
752 """ verify session goes down after inactivity """
754 detection_time = self.test_session.detect_mult *\
755 self.vpp_session.required_min_rx / USEC_IN_SEC
756 self.sleep(detection_time, "waiting for BFD session time-out")
757 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758 verify_event(self, e, expected_state=BFDState.down)
760 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
761 def test_large_required_min_rx(self):
762 """ large remote required min rx interval """
764 p = wait_for_bfd_packet(self)
766 self.test_session.update(required_min_rx=interval)
767 self.test_session.send_packet()
768 time_mark = time.time()
770 # busy wait here, trying to collect a packet or event, vpp is not
771 # allowed to send packets and the session will timeout first - so the
772 # Up->Down event must arrive before any packets do
773 while time.time() < time_mark + interval / USEC_IN_SEC:
775 p = wait_for_bfd_packet(self, timeout=0)
776 # if vpp managed to send a packet before we did the session
777 # session update, then that's fine, ignore it
778 if p.time < time_mark - self.vpp_clock_offset:
780 self.logger.error(ppp("Received unexpected packet:", p))
782 except CaptureTimeoutError:
784 events = self.vapi.collect_events()
786 verify_event(self, events[0], BFDState.down)
788 self.assert_equal(count, 0, "number of packets received")
790 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
791 def test_immediate_remote_min_rx_reduction(self):
792 """ immediately honor remote required min rx reduction """
793 self.vpp_session.remove_vpp_config()
794 self.vpp_session = VppBFDUDPSession(
795 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
796 self.pg0.enable_capture()
797 self.vpp_session.add_vpp_config()
798 self.test_session.update(desired_min_tx=1000000,
799 required_min_rx=1000000)
801 reference_packet = wait_for_bfd_packet(self)
802 time_mark = time.time()
804 self.test_session.update(required_min_rx=interval)
805 self.test_session.send_packet()
806 extra_time = time.time() - time_mark
807 p = wait_for_bfd_packet(self)
808 # first packet is allowed to be late by time we spent doing the update
809 # calculated in extra_time
810 self.assert_in_range(p.time - reference_packet.time,
811 .95 * 0.75 * interval / USEC_IN_SEC,
812 1.05 * interval / USEC_IN_SEC + extra_time,
813 "time between BFD packets")
815 for dummy in range(3):
816 p = wait_for_bfd_packet(self)
817 diff = p.time - reference_packet.time
818 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
819 1.05 * interval / USEC_IN_SEC,
820 "time between BFD packets")
823 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
824 def test_modify_req_min_rx_double(self):
825 """ modify session - double required min rx """
827 p = wait_for_bfd_packet(self)
828 self.test_session.update(desired_min_tx=10000,
829 required_min_rx=10000)
830 self.test_session.send_packet()
831 # double required min rx
832 self.vpp_session.modify_parameters(
833 required_min_rx=2 * self.vpp_session.required_min_rx)
834 p = wait_for_bfd_packet(
835 self, pcap_time_min=time.time() - self.vpp_clock_offset)
836 # poll bit needs to be set
837 self.assertIn("P", p.sprintf("%BFD.flags%"),
838 "Poll bit not set in BFD packet")
839 # finish poll sequence with final packet
840 final = self.test_session.create_packet()
841 final[BFD].flags = "F"
842 timeout = self.test_session.detect_mult * \
843 max(self.test_session.desired_min_tx,
844 self.vpp_session.required_min_rx) / USEC_IN_SEC
845 self.test_session.send_packet(final)
846 time_mark = time.time()
847 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
848 verify_event(self, e, expected_state=BFDState.down)
849 time_to_event = time.time() - time_mark
850 self.assert_in_range(time_to_event, .9 * timeout,
851 1.1 * timeout, "session timeout")
853 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
854 def test_modify_req_min_rx_halve(self):
855 """ modify session - halve required min rx """
856 self.vpp_session.modify_parameters(
857 required_min_rx=2 * self.vpp_session.required_min_rx)
859 p = wait_for_bfd_packet(self)
860 self.test_session.update(desired_min_tx=10000,
861 required_min_rx=10000)
862 self.test_session.send_packet()
863 p = wait_for_bfd_packet(
864 self, pcap_time_min=time.time() - self.vpp_clock_offset)
865 # halve required min rx
866 old_required_min_rx = self.vpp_session.required_min_rx
867 self.vpp_session.modify_parameters(
868 required_min_rx=0.5 * self.vpp_session.required_min_rx)
869 # now we wait 0.8*3*old-req-min-rx and the session should still be up
870 self.sleep(0.8 * self.vpp_session.detect_mult *
871 old_required_min_rx / USEC_IN_SEC,
872 "wait before finishing poll sequence")
873 self.assert_equal(len(self.vapi.collect_events()), 0,
874 "number of bfd events")
875 p = wait_for_bfd_packet(self)
876 # poll bit needs to be set
877 self.assertIn("P", p.sprintf("%BFD.flags%"),
878 "Poll bit not set in BFD packet")
879 # finish poll sequence with final packet
880 final = self.test_session.create_packet()
881 final[BFD].flags = "F"
882 self.test_session.send_packet(final)
883 # now the session should time out under new conditions
884 detection_time = self.test_session.detect_mult *\
885 self.vpp_session.required_min_rx / USEC_IN_SEC
887 e = self.vapi.wait_for_event(
888 2 * detection_time, "bfd_udp_session_details")
890 self.assert_in_range(after - before,
891 0.9 * detection_time,
892 1.1 * detection_time,
893 "time before bfd session goes down")
894 verify_event(self, e, expected_state=BFDState.down)
896 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
897 def test_modify_detect_mult(self):
898 """ modify detect multiplier """
900 p = wait_for_bfd_packet(self)
901 self.vpp_session.modify_parameters(detect_mult=1)
902 p = wait_for_bfd_packet(
903 self, pcap_time_min=time.time() - self.vpp_clock_offset)
904 self.assert_equal(self.vpp_session.detect_mult,
907 # poll bit must not be set
908 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
909 "Poll bit not set in BFD packet")
910 self.vpp_session.modify_parameters(detect_mult=10)
911 p = wait_for_bfd_packet(
912 self, pcap_time_min=time.time() - self.vpp_clock_offset)
913 self.assert_equal(self.vpp_session.detect_mult,
916 # poll bit must not be set
917 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
918 "Poll bit not set in BFD packet")
920 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
921 def test_queued_poll(self):
922 """ test poll sequence queueing """
924 p = wait_for_bfd_packet(self)
925 self.vpp_session.modify_parameters(
926 required_min_rx=2 * self.vpp_session.required_min_rx)
927 p = wait_for_bfd_packet(self)
928 poll_sequence_start = time.time()
929 poll_sequence_length_min = 0.5
930 send_final_after = time.time() + poll_sequence_length_min
931 # poll bit needs to be set
932 self.assertIn("P", p.sprintf("%BFD.flags%"),
933 "Poll bit not set in BFD packet")
934 self.assert_equal(p[BFD].required_min_rx_interval,
935 self.vpp_session.required_min_rx,
936 "BFD required min rx interval")
937 self.vpp_session.modify_parameters(
938 required_min_rx=2 * self.vpp_session.required_min_rx)
939 # 2nd poll sequence should be queued now
940 # don't send the reply back yet, wait for some time to emulate
941 # longer round-trip time
943 while time.time() < send_final_after:
944 self.test_session.send_packet()
945 p = wait_for_bfd_packet(self)
946 self.assert_equal(len(self.vapi.collect_events()), 0,
947 "number of bfd events")
948 self.assert_equal(p[BFD].required_min_rx_interval,
949 self.vpp_session.required_min_rx,
950 "BFD required min rx interval")
952 # poll bit must be set
953 self.assertIn("P", p.sprintf("%BFD.flags%"),
954 "Poll bit not set in BFD packet")
955 final = self.test_session.create_packet()
956 final[BFD].flags = "F"
957 self.test_session.send_packet(final)
958 # finish 1st with final
959 poll_sequence_length = time.time() - poll_sequence_start
960 # vpp must wait for some time before starting new poll sequence
961 poll_no_2_started = False
962 for dummy in range(2 * packet_count):
963 p = wait_for_bfd_packet(self)
964 self.assert_equal(len(self.vapi.collect_events()), 0,
965 "number of bfd events")
966 if "P" in p.sprintf("%BFD.flags%"):
967 poll_no_2_started = True
968 if time.time() < poll_sequence_start + poll_sequence_length:
969 raise Exception("VPP started 2nd poll sequence too soon")
970 final = self.test_session.create_packet()
971 final[BFD].flags = "F"
972 self.test_session.send_packet(final)
975 self.test_session.send_packet()
976 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
977 # finish 2nd with final
978 final = self.test_session.create_packet()
979 final[BFD].flags = "F"
980 self.test_session.send_packet(final)
981 p = wait_for_bfd_packet(self)
982 # poll bit must not be set
983 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
984 "Poll bit set in BFD packet")
986 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
987 def test_poll_response(self):
988 """ test correct response to control frame with poll bit set """
990 poll = self.test_session.create_packet()
991 poll[BFD].flags = "P"
992 self.test_session.send_packet(poll)
993 final = wait_for_bfd_packet(
994 self, pcap_time_min=time.time() - self.vpp_clock_offset)
995 self.assertIn("F", final.sprintf("%BFD.flags%"))
997 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
998 def test_no_periodic_if_remote_demand(self):
999 """ no periodic frames outside poll sequence if remote demand set """
1000 bfd_session_up(self)
1001 demand = self.test_session.create_packet()
1002 demand[BFD].flags = "D"
1003 self.test_session.send_packet(demand)
1004 transmit_time = 0.9 \
1005 * max(self.vpp_session.required_min_rx,
1006 self.test_session.desired_min_tx) \
1009 for dummy in range(self.test_session.detect_mult * 2):
1010 time.sleep(transmit_time)
1011 self.test_session.send_packet(demand)
1013 p = wait_for_bfd_packet(self, timeout=0)
1014 self.logger.error(ppp("Received unexpected packet:", p))
1016 except CaptureTimeoutError:
1018 events = self.vapi.collect_events()
1020 self.logger.error("Received unexpected event: %s", e)
1021 self.assert_equal(count, 0, "number of packets received")
1022 self.assert_equal(len(events), 0, "number of events received")
1024 def test_echo_looped_back(self):
1025 """ echo packets looped back """
1026 # don't need a session in this case..
1027 self.vpp_session.remove_vpp_config()
1028 self.pg0.enable_capture()
1029 echo_packet_count = 10
1030 # random source port low enough to increment a few times..
1031 udp_sport_tx = randint(1, 50000)
1032 udp_sport_rx = udp_sport_tx
1033 echo_packet = (Ether(src=self.pg0.remote_mac,
1034 dst=self.pg0.local_mac) /
1035 IP(src=self.pg0.remote_ip4,
1036 dst=self.pg0.remote_ip4) /
1037 UDP(dport=BFD.udp_dport_echo) /
1038 Raw("this should be looped back"))
1039 for dummy in range(echo_packet_count):
1040 self.sleep(.01, "delay between echo packets")
1041 echo_packet[UDP].sport = udp_sport_tx
1043 self.logger.debug(ppp("Sending packet:", echo_packet))
1044 self.pg0.add_stream(echo_packet)
1046 for dummy in range(echo_packet_count):
1047 p = self.pg0.wait_for_packet(1)
1048 self.logger.debug(ppp("Got packet:", p))
1050 self.assert_equal(self.pg0.remote_mac,
1051 ether.dst, "Destination MAC")
1052 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1054 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1055 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1057 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1058 "UDP destination port")
1059 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1061 # need to compare the hex payload here, otherwise BFD_vpp_echo
1063 self.assertEqual(str(p[UDP].payload),
1064 str(echo_packet[UDP].payload),
1065 "Received packet is not the echo packet sent")
1066 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1067 "ECHO packet identifier for test purposes)")
1069 def test_echo(self):
1070 """ echo function """
1071 bfd_session_up(self)
1072 self.test_session.update(required_min_echo_rx=150000)
1073 self.test_session.send_packet()
1074 detection_time = self.test_session.detect_mult *\
1075 self.vpp_session.required_min_rx / USEC_IN_SEC
1076 # echo shouldn't work without echo source set
1077 for dummy in range(10):
1078 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1079 self.sleep(sleep, "delay before sending bfd packet")
1080 self.test_session.send_packet()
1081 p = wait_for_bfd_packet(
1082 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1083 self.assert_equal(p[BFD].required_min_rx_interval,
1084 self.vpp_session.required_min_rx,
1085 "BFD required min rx interval")
1086 self.test_session.send_packet()
1087 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1089 # should be turned on - loopback echo packets
1090 for dummy in range(3):
1091 loop_until = time.time() + 0.75 * detection_time
1092 while time.time() < loop_until:
1093 p = self.pg0.wait_for_packet(1)
1094 self.logger.debug(ppp("Got packet:", p))
1095 if p[UDP].dport == BFD.udp_dport_echo:
1097 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1098 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1099 "BFD ECHO src IP equal to loopback IP")
1100 self.logger.debug(ppp("Looping back packet:", p))
1101 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1102 "ECHO packet destination MAC address")
1103 p[Ether].dst = self.pg0.local_mac
1104 self.pg0.add_stream(p)
1107 elif p.haslayer(BFD):
1109 self.assertGreaterEqual(
1110 p[BFD].required_min_rx_interval,
1112 if "P" in p.sprintf("%BFD.flags%"):
1113 final = self.test_session.create_packet()
1114 final[BFD].flags = "F"
1115 self.test_session.send_packet(final)
1117 raise Exception(ppp("Received unknown packet:", p))
1119 self.assert_equal(len(self.vapi.collect_events()), 0,
1120 "number of bfd events")
1121 self.test_session.send_packet()
1122 self.assertTrue(echo_seen, "No echo packets received")
1124 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1125 def test_echo_fail(self):
1126 """ session goes down if echo function fails """
1127 bfd_session_up(self)
1128 self.test_session.update(required_min_echo_rx=150000)
1129 self.test_session.send_packet()
1130 detection_time = self.test_session.detect_mult *\
1131 self.vpp_session.required_min_rx / USEC_IN_SEC
1132 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1133 # echo function should be used now, but we will drop the echo packets
1134 verified_diag = False
1135 for dummy in range(3):
1136 loop_until = time.time() + 0.75 * detection_time
1137 while time.time() < loop_until:
1138 p = self.pg0.wait_for_packet(1)
1139 self.logger.debug(ppp("Got packet:", p))
1140 if p[UDP].dport == BFD.udp_dport_echo:
1143 elif p.haslayer(BFD):
1144 if "P" in p.sprintf("%BFD.flags%"):
1145 self.assertGreaterEqual(
1146 p[BFD].required_min_rx_interval,
1148 final = self.test_session.create_packet()
1149 final[BFD].flags = "F"
1150 self.test_session.send_packet(final)
1151 if p[BFD].state == BFDState.down:
1152 self.assert_equal(p[BFD].diag,
1153 BFDDiagCode.echo_function_failed,
1155 verified_diag = True
1157 raise Exception(ppp("Received unknown packet:", p))
1158 self.test_session.send_packet()
1159 events = self.vapi.collect_events()
1160 self.assert_equal(len(events), 1, "number of bfd events")
1161 self.assert_equal(events[0].state, BFDState.down, BFDState)
1162 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1164 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1165 def test_echo_stop(self):
1166 """ echo function stops if peer sets required min echo rx zero """
1167 bfd_session_up(self)
1168 self.test_session.update(required_min_echo_rx=150000)
1169 self.test_session.send_packet()
1170 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1171 # wait for first echo packet
1173 p = self.pg0.wait_for_packet(1)
1174 self.logger.debug(ppp("Got packet:", p))
1175 if p[UDP].dport == BFD.udp_dport_echo:
1176 self.logger.debug(ppp("Looping back packet:", p))
1177 p[Ether].dst = self.pg0.local_mac
1178 self.pg0.add_stream(p)
1181 elif p.haslayer(BFD):
1185 raise Exception(ppp("Received unknown packet:", p))
1186 self.test_session.update(required_min_echo_rx=0)
1187 self.test_session.send_packet()
1188 # echo packets shouldn't arrive anymore
1189 for dummy in range(5):
1190 wait_for_bfd_packet(
1191 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1192 self.test_session.send_packet()
1193 events = self.vapi.collect_events()
1194 self.assert_equal(len(events), 0, "number of bfd events")
1196 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1197 def test_echo_source_removed(self):
1198 """ echo function stops if echo source is removed """
1199 bfd_session_up(self)
1200 self.test_session.update(required_min_echo_rx=150000)
1201 self.test_session.send_packet()
1202 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1203 # wait for first echo packet
1205 p = self.pg0.wait_for_packet(1)
1206 self.logger.debug(ppp("Got packet:", p))
1207 if p[UDP].dport == BFD.udp_dport_echo:
1208 self.logger.debug(ppp("Looping back packet:", p))
1209 p[Ether].dst = self.pg0.local_mac
1210 self.pg0.add_stream(p)
1213 elif p.haslayer(BFD):
1217 raise Exception(ppp("Received unknown packet:", p))
1218 self.vapi.bfd_udp_del_echo_source()
1219 self.test_session.send_packet()
1220 # echo packets shouldn't arrive anymore
1221 for dummy in range(5):
1222 wait_for_bfd_packet(
1223 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1224 self.test_session.send_packet()
1225 events = self.vapi.collect_events()
1226 self.assert_equal(len(events), 0, "number of bfd events")
1228 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1229 def test_stale_echo(self):
1230 """ stale echo packets don't keep a session up """
1231 bfd_session_up(self)
1232 self.test_session.update(required_min_echo_rx=150000)
1233 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1234 self.test_session.send_packet()
1235 # should be turned on - loopback echo packets
1239 for dummy in range(10 * self.vpp_session.detect_mult):
1240 p = self.pg0.wait_for_packet(1)
1241 if p[UDP].dport == BFD.udp_dport_echo:
1242 if echo_packet is None:
1243 self.logger.debug(ppp("Got first echo packet:", p))
1245 timeout_at = time.time() + self.vpp_session.detect_mult * \
1246 self.test_session.required_min_echo_rx / USEC_IN_SEC
1248 self.logger.debug(ppp("Got followup echo packet:", p))
1249 self.logger.debug(ppp("Looping back first echo packet:", p))
1250 echo_packet[Ether].dst = self.pg0.local_mac
1251 self.pg0.add_stream(echo_packet)
1253 elif p.haslayer(BFD):
1254 self.logger.debug(ppp("Got packet:", p))
1255 if "P" in p.sprintf("%BFD.flags%"):
1256 final = self.test_session.create_packet()
1257 final[BFD].flags = "F"
1258 self.test_session.send_packet(final)
1259 if p[BFD].state == BFDState.down:
1260 self.assertIsNotNone(
1262 "Session went down before first echo packet received")
1264 self.assertGreaterEqual(
1266 "Session timeout at %s, but is expected at %s" %
1268 self.assert_equal(p[BFD].diag,
1269 BFDDiagCode.echo_function_failed,
1271 events = self.vapi.collect_events()
1272 self.assert_equal(len(events), 1, "number of bfd events")
1273 self.assert_equal(events[0].state, BFDState.down, BFDState)
1277 raise Exception(ppp("Received unknown packet:", p))
1278 self.test_session.send_packet()
1279 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1281 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1282 def test_invalid_echo_checksum(self):
1283 """ echo packets with invalid checksum don't keep a session up """
1284 bfd_session_up(self)
1285 self.test_session.update(required_min_echo_rx=150000)
1286 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1287 self.test_session.send_packet()
1288 # should be turned on - loopback echo packets
1291 for dummy in range(10 * self.vpp_session.detect_mult):
1292 p = self.pg0.wait_for_packet(1)
1293 if p[UDP].dport == BFD.udp_dport_echo:
1294 self.logger.debug(ppp("Got echo packet:", p))
1295 if timeout_at is None:
1296 timeout_at = time.time() + self.vpp_session.detect_mult * \
1297 self.test_session.required_min_echo_rx / USEC_IN_SEC
1298 p[BFD_vpp_echo].checksum = getrandbits(64)
1299 p[Ether].dst = self.pg0.local_mac
1300 self.logger.debug(ppp("Looping back modified echo packet:", p))
1301 self.pg0.add_stream(p)
1303 elif p.haslayer(BFD):
1304 self.logger.debug(ppp("Got packet:", p))
1305 if "P" in p.sprintf("%BFD.flags%"):
1306 final = self.test_session.create_packet()
1307 final[BFD].flags = "F"
1308 self.test_session.send_packet(final)
1309 if p[BFD].state == BFDState.down:
1310 self.assertIsNotNone(
1312 "Session went down before first echo packet received")
1314 self.assertGreaterEqual(
1316 "Session timeout at %s, but is expected at %s" %
1318 self.assert_equal(p[BFD].diag,
1319 BFDDiagCode.echo_function_failed,
1321 events = self.vapi.collect_events()
1322 self.assert_equal(len(events), 1, "number of bfd events")
1323 self.assert_equal(events[0].state, BFDState.down, BFDState)
1327 raise Exception(ppp("Received unknown packet:", p))
1328 self.test_session.send_packet()
1329 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1331 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1332 def test_admin_up_down(self):
1333 """ put session admin-up and admin-down """
1334 bfd_session_up(self)
1335 self.vpp_session.admin_down()
1336 self.pg0.enable_capture()
1337 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1338 verify_event(self, e, expected_state=BFDState.admin_down)
1339 for dummy in range(2):
1340 p = wait_for_bfd_packet(self)
1341 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1342 # try to bring session up - shouldn't be possible
1343 self.test_session.update(state=BFDState.init)
1344 self.test_session.send_packet()
1345 for dummy in range(2):
1346 p = wait_for_bfd_packet(self)
1347 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1348 self.vpp_session.admin_up()
1349 self.test_session.update(state=BFDState.down)
1350 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1351 verify_event(self, e, expected_state=BFDState.down)
1352 p = wait_for_bfd_packet(
1353 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1354 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1355 self.test_session.send_packet()
1356 p = wait_for_bfd_packet(
1357 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1358 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1359 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1360 verify_event(self, e, expected_state=BFDState.init)
1361 self.test_session.update(state=BFDState.up)
1362 self.test_session.send_packet()
1363 p = wait_for_bfd_packet(
1364 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1365 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1366 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1367 verify_event(self, e, expected_state=BFDState.up)
1369 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1370 def test_config_change_remote_demand(self):
1371 """ configuration change while peer in demand mode """
1372 bfd_session_up(self)
1373 demand = self.test_session.create_packet()
1374 demand[BFD].flags = "D"
1375 self.test_session.send_packet(demand)
1376 self.vpp_session.modify_parameters(
1377 required_min_rx=2 * self.vpp_session.required_min_rx)
1378 p = wait_for_bfd_packet(
1379 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1380 # poll bit must be set
1381 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1382 # terminate poll sequence
1383 final = self.test_session.create_packet()
1384 final[BFD].flags = "D+F"
1385 self.test_session.send_packet(final)
1386 # vpp should be quiet now again
1387 transmit_time = 0.9 \
1388 * max(self.vpp_session.required_min_rx,
1389 self.test_session.desired_min_tx) \
1392 for dummy in range(self.test_session.detect_mult * 2):
1393 time.sleep(transmit_time)
1394 self.test_session.send_packet(demand)
1396 p = wait_for_bfd_packet(self, timeout=0)
1397 self.logger.error(ppp("Received unexpected packet:", p))
1399 except CaptureTimeoutError:
1401 events = self.vapi.collect_events()
1403 self.logger.error("Received unexpected event: %s", e)
1404 self.assert_equal(count, 0, "number of packets received")
1405 self.assert_equal(len(events), 0, "number of events received")
1407 def test_intf_deleted(self):
1408 """ interface with bfd session deleted """
1409 intf = VppLoInterface(self, 0)
1412 sw_if_index = intf.sw_if_index
1413 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1414 vpp_session.add_vpp_config()
1415 vpp_session.admin_up()
1416 intf.remove_vpp_config()
1417 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1418 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1419 self.assertFalse(vpp_session.query_vpp_config())
1422 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1423 class BFD6TestCase(VppTestCase):
1424 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1427 vpp_clock_offset = None
1432 def setUpClass(cls):
1433 super(BFD6TestCase, cls).setUpClass()
1435 cls.create_pg_interfaces([0])
1436 cls.pg0.config_ip6()
1437 cls.pg0.configure_ipv6_neighbors()
1439 cls.pg0.resolve_ndp()
1440 cls.create_loopback_interfaces([0])
1441 cls.loopback0 = cls.lo_interfaces[0]
1442 cls.loopback0.config_ip6()
1443 cls.loopback0.admin_up()
1446 super(BFD6TestCase, cls).tearDownClass()
1450 super(BFD6TestCase, self).setUp()
1451 self.factory = AuthKeyFactory()
1452 self.vapi.want_bfd_events()
1453 self.pg0.enable_capture()
1455 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1456 self.pg0.remote_ip6,
1458 self.vpp_session.add_vpp_config()
1459 self.vpp_session.admin_up()
1460 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1461 self.logger.debug(self.vapi.cli("show adj nbr"))
1463 self.vapi.want_bfd_events(enable_disable=0)
1467 if not self.vpp_dead:
1468 self.vapi.want_bfd_events(enable_disable=0)
1469 self.vapi.collect_events() # clear the event queue
1470 super(BFD6TestCase, self).tearDown()
1472 def test_session_up(self):
1473 """ bring BFD session up """
1474 bfd_session_up(self)
1476 def test_session_up_by_ip(self):
1477 """ bring BFD session up - first frame looked up by address pair """
1478 self.logger.info("BFD: Sending Slow control frame")
1479 self.test_session.update(my_discriminator=randint(0, 40000000))
1480 self.test_session.send_packet()
1481 self.pg0.enable_capture()
1482 p = self.pg0.wait_for_packet(1)
1483 self.assert_equal(p[BFD].your_discriminator,
1484 self.test_session.my_discriminator,
1485 "BFD - your discriminator")
1486 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1487 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1489 self.logger.info("BFD: Waiting for event")
1490 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1491 verify_event(self, e, expected_state=BFDState.init)
1492 self.logger.info("BFD: Sending Up")
1493 self.test_session.send_packet()
1494 self.logger.info("BFD: Waiting for event")
1495 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1496 verify_event(self, e, expected_state=BFDState.up)
1497 self.logger.info("BFD: Session is Up")
1498 self.test_session.update(state=BFDState.up)
1499 self.test_session.send_packet()
1500 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1502 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1503 def test_hold_up(self):
1504 """ hold BFD session up """
1505 bfd_session_up(self)
1506 for dummy in range(self.test_session.detect_mult * 2):
1507 wait_for_bfd_packet(self)
1508 self.test_session.send_packet()
1509 self.assert_equal(len(self.vapi.collect_events()), 0,
1510 "number of bfd events")
1511 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1513 def test_echo_looped_back(self):
1514 """ echo packets looped back """
1515 # don't need a session in this case..
1516 self.vpp_session.remove_vpp_config()
1517 self.pg0.enable_capture()
1518 echo_packet_count = 10
1519 # random source port low enough to increment a few times..
1520 udp_sport_tx = randint(1, 50000)
1521 udp_sport_rx = udp_sport_tx
1522 echo_packet = (Ether(src=self.pg0.remote_mac,
1523 dst=self.pg0.local_mac) /
1524 IPv6(src=self.pg0.remote_ip6,
1525 dst=self.pg0.remote_ip6) /
1526 UDP(dport=BFD.udp_dport_echo) /
1527 Raw("this should be looped back"))
1528 for dummy in range(echo_packet_count):
1529 self.sleep(.01, "delay between echo packets")
1530 echo_packet[UDP].sport = udp_sport_tx
1532 self.logger.debug(ppp("Sending packet:", echo_packet))
1533 self.pg0.add_stream(echo_packet)
1535 for dummy in range(echo_packet_count):
1536 p = self.pg0.wait_for_packet(1)
1537 self.logger.debug(ppp("Got packet:", p))
1539 self.assert_equal(self.pg0.remote_mac,
1540 ether.dst, "Destination MAC")
1541 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1543 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1544 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1546 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1547 "UDP destination port")
1548 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1550 # need to compare the hex payload here, otherwise BFD_vpp_echo
1552 self.assertEqual(str(p[UDP].payload),
1553 str(echo_packet[UDP].payload),
1554 "Received packet is not the echo packet sent")
1555 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1556 "ECHO packet identifier for test purposes)")
1557 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1558 "ECHO packet identifier for test purposes)")
1560 def test_echo(self):
1561 """ echo function """
1562 bfd_session_up(self)
1563 self.test_session.update(required_min_echo_rx=150000)
1564 self.test_session.send_packet()
1565 detection_time = self.test_session.detect_mult *\
1566 self.vpp_session.required_min_rx / USEC_IN_SEC
1567 # echo shouldn't work without echo source set
1568 for dummy in range(10):
1569 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1570 self.sleep(sleep, "delay before sending bfd packet")
1571 self.test_session.send_packet()
1572 p = wait_for_bfd_packet(
1573 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1574 self.assert_equal(p[BFD].required_min_rx_interval,
1575 self.vpp_session.required_min_rx,
1576 "BFD required min rx interval")
1577 self.test_session.send_packet()
1578 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1580 # should be turned on - loopback echo packets
1581 for dummy in range(3):
1582 loop_until = time.time() + 0.75 * detection_time
1583 while time.time() < loop_until:
1584 p = self.pg0.wait_for_packet(1)
1585 self.logger.debug(ppp("Got packet:", p))
1586 if p[UDP].dport == BFD.udp_dport_echo:
1588 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1589 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1590 "BFD ECHO src IP equal to loopback IP")
1591 self.logger.debug(ppp("Looping back packet:", p))
1592 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1593 "ECHO packet destination MAC address")
1594 p[Ether].dst = self.pg0.local_mac
1595 self.pg0.add_stream(p)
1598 elif p.haslayer(BFD):
1600 self.assertGreaterEqual(
1601 p[BFD].required_min_rx_interval,
1603 if "P" in p.sprintf("%BFD.flags%"):
1604 final = self.test_session.create_packet()
1605 final[BFD].flags = "F"
1606 self.test_session.send_packet(final)
1608 raise Exception(ppp("Received unknown packet:", p))
1610 self.assert_equal(len(self.vapi.collect_events()), 0,
1611 "number of bfd events")
1612 self.test_session.send_packet()
1613 self.assertTrue(echo_seen, "No echo packets received")
1615 def test_intf_deleted(self):
1616 """ interface with bfd session deleted """
1617 intf = VppLoInterface(self, 0)
1620 sw_if_index = intf.sw_if_index
1621 vpp_session = VppBFDUDPSession(
1622 self, intf, intf.remote_ip6, af=AF_INET6)
1623 vpp_session.add_vpp_config()
1624 vpp_session.admin_up()
1625 intf.remove_vpp_config()
1626 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1627 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1628 self.assertFalse(vpp_session.query_vpp_config())
1631 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1632 class BFDFIBTestCase(VppTestCase):
1633 """ BFD-FIB interactions (IPv6) """
1639 super(BFDFIBTestCase, self).setUp()
1640 self.create_pg_interfaces(range(1))
1642 self.vapi.want_bfd_events()
1643 self.pg0.enable_capture()
1645 for i in self.pg_interfaces:
1648 i.configure_ipv6_neighbors()
1651 if not self.vpp_dead:
1652 self.vapi.want_bfd_events(enable_disable=0)
1654 super(BFDFIBTestCase, self).tearDown()
1657 def pkt_is_not_data_traffic(p):
1658 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1659 if p.haslayer(BFD) or is_ipv6_misc(p):
1663 def test_session_with_fib(self):
1664 """ BFD-FIB interactions """
1666 # packets to match against both of the routes
1667 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1668 IPv6(src="3001::1", dst="2001::1") /
1669 UDP(sport=1234, dport=1234) /
1671 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1672 IPv6(src="3001::1", dst="2002::1") /
1673 UDP(sport=1234, dport=1234) /
1676 # A recursive and a non-recursive route via a next-hop that
1677 # will have a BFD session
1678 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1679 [VppRoutePath(self.pg0.remote_ip6,
1680 self.pg0.sw_if_index,
1681 proto=DpoProto.DPO_PROTO_IP6)],
1683 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1684 [VppRoutePath(self.pg0.remote_ip6,
1686 proto=DpoProto.DPO_PROTO_IP6)],
1688 ip_2001_s_64.add_vpp_config()
1689 ip_2002_s_64.add_vpp_config()
1691 # bring the session up now the routes are present
1692 self.vpp_session = VppBFDUDPSession(self,
1694 self.pg0.remote_ip6,
1696 self.vpp_session.add_vpp_config()
1697 self.vpp_session.admin_up()
1698 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1700 # session is up - traffic passes
1701 bfd_session_up(self)
1703 self.pg0.add_stream(p)
1706 captured = self.pg0.wait_for_packet(
1708 filter_out_fn=self.pkt_is_not_data_traffic)
1709 self.assertEqual(captured[IPv6].dst,
1712 # session is up - traffic is dropped
1713 bfd_session_down(self)
1715 self.pg0.add_stream(p)
1717 with self.assertRaises(CaptureTimeoutError):
1718 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1720 # session is up - traffic passes
1721 bfd_session_up(self)
1723 self.pg0.add_stream(p)
1726 captured = self.pg0.wait_for_packet(
1728 filter_out_fn=self.pkt_is_not_data_traffic)
1729 self.assertEqual(captured[IPv6].dst,
1733 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1734 class BFDSHA1TestCase(VppTestCase):
1735 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1738 vpp_clock_offset = None
1743 def setUpClass(cls):
1744 super(BFDSHA1TestCase, cls).setUpClass()
1746 cls.create_pg_interfaces([0])
1747 cls.pg0.config_ip4()
1749 cls.pg0.resolve_arp()
1752 super(BFDSHA1TestCase, cls).tearDownClass()
1756 super(BFDSHA1TestCase, self).setUp()
1757 self.factory = AuthKeyFactory()
1758 self.vapi.want_bfd_events()
1759 self.pg0.enable_capture()
1762 if not self.vpp_dead:
1763 self.vapi.want_bfd_events(enable_disable=0)
1764 self.vapi.collect_events() # clear the event queue
1765 super(BFDSHA1TestCase, self).tearDown()
1767 def test_session_up(self):
1768 """ bring BFD session up """
1769 key = self.factory.create_random_key(self)
1770 key.add_vpp_config()
1771 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1772 self.pg0.remote_ip4,
1774 self.vpp_session.add_vpp_config()
1775 self.vpp_session.admin_up()
1776 self.test_session = BFDTestSession(
1777 self, self.pg0, AF_INET, sha1_key=key,
1778 bfd_key_id=self.vpp_session.bfd_key_id)
1779 bfd_session_up(self)
1781 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1782 def test_hold_up(self):
1783 """ hold BFD session up """
1784 key = self.factory.create_random_key(self)
1785 key.add_vpp_config()
1786 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1787 self.pg0.remote_ip4,
1789 self.vpp_session.add_vpp_config()
1790 self.vpp_session.admin_up()
1791 self.test_session = BFDTestSession(
1792 self, self.pg0, AF_INET, sha1_key=key,
1793 bfd_key_id=self.vpp_session.bfd_key_id)
1794 bfd_session_up(self)
1795 for dummy in range(self.test_session.detect_mult * 2):
1796 wait_for_bfd_packet(self)
1797 self.test_session.send_packet()
1798 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1800 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1801 def test_hold_up_meticulous(self):
1802 """ hold BFD session up - meticulous auth """
1803 key = self.factory.create_random_key(
1804 self, BFDAuthType.meticulous_keyed_sha1)
1805 key.add_vpp_config()
1806 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1807 self.pg0.remote_ip4, sha1_key=key)
1808 self.vpp_session.add_vpp_config()
1809 self.vpp_session.admin_up()
1810 # specify sequence number so that it wraps
1811 self.test_session = BFDTestSession(
1812 self, self.pg0, AF_INET, sha1_key=key,
1813 bfd_key_id=self.vpp_session.bfd_key_id,
1814 our_seq_number=0xFFFFFFFF - 4)
1815 bfd_session_up(self)
1816 for dummy in range(30):
1817 wait_for_bfd_packet(self)
1818 self.test_session.inc_seq_num()
1819 self.test_session.send_packet()
1820 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1822 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1823 def test_send_bad_seq_number(self):
1824 """ session is not kept alive by msgs with bad sequence numbers"""
1825 key = self.factory.create_random_key(
1826 self, BFDAuthType.meticulous_keyed_sha1)
1827 key.add_vpp_config()
1828 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1829 self.pg0.remote_ip4, sha1_key=key)
1830 self.vpp_session.add_vpp_config()
1831 self.test_session = BFDTestSession(
1832 self, self.pg0, AF_INET, sha1_key=key,
1833 bfd_key_id=self.vpp_session.bfd_key_id)
1834 bfd_session_up(self)
1835 detection_time = self.test_session.detect_mult *\
1836 self.vpp_session.required_min_rx / USEC_IN_SEC
1837 send_until = time.time() + 2 * detection_time
1838 while time.time() < send_until:
1839 self.test_session.send_packet()
1840 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1841 "time between bfd packets")
1842 e = self.vapi.collect_events()
1843 # session should be down now, because the sequence numbers weren't
1845 self.assert_equal(len(e), 1, "number of bfd events")
1846 verify_event(self, e[0], expected_state=BFDState.down)
1848 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1849 legitimate_test_session,
1851 rogue_bfd_values=None):
1852 """ execute a rogue session interaction scenario
1854 1. create vpp session, add config
1855 2. bring the legitimate session up
1856 3. copy the bfd values from legitimate session to rogue session
1857 4. apply rogue_bfd_values to rogue session
1858 5. set rogue session state to down
1859 6. send message to take the session down from the rogue session
1860 7. assert that the legitimate session is unaffected
1863 self.vpp_session = vpp_bfd_udp_session
1864 self.vpp_session.add_vpp_config()
1865 self.test_session = legitimate_test_session
1866 # bring vpp session up
1867 bfd_session_up(self)
1868 # send packet from rogue session
1869 rogue_test_session.update(
1870 my_discriminator=self.test_session.my_discriminator,
1871 your_discriminator=self.test_session.your_discriminator,
1872 desired_min_tx=self.test_session.desired_min_tx,
1873 required_min_rx=self.test_session.required_min_rx,
1874 detect_mult=self.test_session.detect_mult,
1875 diag=self.test_session.diag,
1876 state=self.test_session.state,
1877 auth_type=self.test_session.auth_type)
1878 if rogue_bfd_values:
1879 rogue_test_session.update(**rogue_bfd_values)
1880 rogue_test_session.update(state=BFDState.down)
1881 rogue_test_session.send_packet()
1882 wait_for_bfd_packet(self)
1883 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1885 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1886 def test_mismatch_auth(self):
1887 """ session is not brought down by unauthenticated msg """
1888 key = self.factory.create_random_key(self)
1889 key.add_vpp_config()
1890 vpp_session = VppBFDUDPSession(
1891 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1892 legitimate_test_session = BFDTestSession(
1893 self, self.pg0, AF_INET, sha1_key=key,
1894 bfd_key_id=vpp_session.bfd_key_id)
1895 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1896 self.execute_rogue_session_scenario(vpp_session,
1897 legitimate_test_session,
1900 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1901 def test_mismatch_bfd_key_id(self):
1902 """ session is not brought down by msg with non-existent key-id """
1903 key = self.factory.create_random_key(self)
1904 key.add_vpp_config()
1905 vpp_session = VppBFDUDPSession(
1906 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1907 # pick a different random bfd key id
1909 while x == vpp_session.bfd_key_id:
1911 legitimate_test_session = BFDTestSession(
1912 self, self.pg0, AF_INET, sha1_key=key,
1913 bfd_key_id=vpp_session.bfd_key_id)
1914 rogue_test_session = BFDTestSession(
1915 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1916 self.execute_rogue_session_scenario(vpp_session,
1917 legitimate_test_session,
1920 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1921 def test_mismatched_auth_type(self):
1922 """ session is not brought down by msg with wrong auth type """
1923 key = self.factory.create_random_key(self)
1924 key.add_vpp_config()
1925 vpp_session = VppBFDUDPSession(
1926 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1927 legitimate_test_session = BFDTestSession(
1928 self, self.pg0, AF_INET, sha1_key=key,
1929 bfd_key_id=vpp_session.bfd_key_id)
1930 rogue_test_session = BFDTestSession(
1931 self, self.pg0, AF_INET, sha1_key=key,
1932 bfd_key_id=vpp_session.bfd_key_id)
1933 self.execute_rogue_session_scenario(
1934 vpp_session, legitimate_test_session, rogue_test_session,
1935 {'auth_type': BFDAuthType.keyed_md5})
1937 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1938 def test_restart(self):
1939 """ simulate remote peer restart and resynchronization """
1940 key = self.factory.create_random_key(
1941 self, BFDAuthType.meticulous_keyed_sha1)
1942 key.add_vpp_config()
1943 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1944 self.pg0.remote_ip4, sha1_key=key)
1945 self.vpp_session.add_vpp_config()
1946 self.test_session = BFDTestSession(
1947 self, self.pg0, AF_INET, sha1_key=key,
1948 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1949 bfd_session_up(self)
1950 # don't send any packets for 2*detection_time
1951 detection_time = self.test_session.detect_mult *\
1952 self.vpp_session.required_min_rx / USEC_IN_SEC
1953 self.sleep(2 * detection_time, "simulating peer restart")
1954 events = self.vapi.collect_events()
1955 self.assert_equal(len(events), 1, "number of bfd events")
1956 verify_event(self, events[0], expected_state=BFDState.down)
1957 self.test_session.update(state=BFDState.down)
1958 # reset sequence number
1959 self.test_session.our_seq_number = 0
1960 self.test_session.vpp_seq_number = None
1961 # now throw away any pending packets
1962 self.pg0.enable_capture()
1963 bfd_session_up(self)
1966 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1967 class BFDAuthOnOffTestCase(VppTestCase):
1968 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1975 def setUpClass(cls):
1976 super(BFDAuthOnOffTestCase, cls).setUpClass()
1978 cls.create_pg_interfaces([0])
1979 cls.pg0.config_ip4()
1981 cls.pg0.resolve_arp()
1984 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1988 super(BFDAuthOnOffTestCase, self).setUp()
1989 self.factory = AuthKeyFactory()
1990 self.vapi.want_bfd_events()
1991 self.pg0.enable_capture()
1994 if not self.vpp_dead:
1995 self.vapi.want_bfd_events(enable_disable=0)
1996 self.vapi.collect_events() # clear the event queue
1997 super(BFDAuthOnOffTestCase, self).tearDown()
1999 def test_auth_on_immediate(self):
2000 """ turn auth on without disturbing session state (immediate) """
2001 key = self.factory.create_random_key(self)
2002 key.add_vpp_config()
2003 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2004 self.pg0.remote_ip4)
2005 self.vpp_session.add_vpp_config()
2006 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2007 bfd_session_up(self)
2008 for dummy in range(self.test_session.detect_mult * 2):
2009 p = wait_for_bfd_packet(self)
2010 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2011 self.test_session.send_packet()
2012 self.vpp_session.activate_auth(key)
2013 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2014 self.test_session.sha1_key = key
2015 for dummy in range(self.test_session.detect_mult * 2):
2016 p = wait_for_bfd_packet(self)
2017 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2018 self.test_session.send_packet()
2019 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2020 self.assert_equal(len(self.vapi.collect_events()), 0,
2021 "number of bfd events")
2023 def test_auth_off_immediate(self):
2024 """ turn auth off without disturbing session state (immediate) """
2025 key = self.factory.create_random_key(self)
2026 key.add_vpp_config()
2027 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2028 self.pg0.remote_ip4, sha1_key=key)
2029 self.vpp_session.add_vpp_config()
2030 self.test_session = BFDTestSession(
2031 self, self.pg0, AF_INET, sha1_key=key,
2032 bfd_key_id=self.vpp_session.bfd_key_id)
2033 bfd_session_up(self)
2034 # self.vapi.want_bfd_events(enable_disable=0)
2035 for dummy in range(self.test_session.detect_mult * 2):
2036 p = wait_for_bfd_packet(self)
2037 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2038 self.test_session.inc_seq_num()
2039 self.test_session.send_packet()
2040 self.vpp_session.deactivate_auth()
2041 self.test_session.bfd_key_id = None
2042 self.test_session.sha1_key = None
2043 for dummy in range(self.test_session.detect_mult * 2):
2044 p = wait_for_bfd_packet(self)
2045 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2046 self.test_session.inc_seq_num()
2047 self.test_session.send_packet()
2048 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2049 self.assert_equal(len(self.vapi.collect_events()), 0,
2050 "number of bfd events")
2052 def test_auth_change_key_immediate(self):
2053 """ change auth key without disturbing session state (immediate) """
2054 key1 = self.factory.create_random_key(self)
2055 key1.add_vpp_config()
2056 key2 = self.factory.create_random_key(self)
2057 key2.add_vpp_config()
2058 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2059 self.pg0.remote_ip4, sha1_key=key1)
2060 self.vpp_session.add_vpp_config()
2061 self.test_session = BFDTestSession(
2062 self, self.pg0, AF_INET, sha1_key=key1,
2063 bfd_key_id=self.vpp_session.bfd_key_id)
2064 bfd_session_up(self)
2065 for dummy in range(self.test_session.detect_mult * 2):
2066 p = wait_for_bfd_packet(self)
2067 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2068 self.test_session.send_packet()
2069 self.vpp_session.activate_auth(key2)
2070 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2071 self.test_session.sha1_key = key2
2072 for dummy in range(self.test_session.detect_mult * 2):
2073 p = wait_for_bfd_packet(self)
2074 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2075 self.test_session.send_packet()
2076 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2077 self.assert_equal(len(self.vapi.collect_events()), 0,
2078 "number of bfd events")
2080 def test_auth_on_delayed(self):
2081 """ turn auth on without disturbing session state (delayed) """
2082 key = self.factory.create_random_key(self)
2083 key.add_vpp_config()
2084 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2085 self.pg0.remote_ip4)
2086 self.vpp_session.add_vpp_config()
2087 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2088 bfd_session_up(self)
2089 for dummy in range(self.test_session.detect_mult * 2):
2090 wait_for_bfd_packet(self)
2091 self.test_session.send_packet()
2092 self.vpp_session.activate_auth(key, delayed=True)
2093 for dummy in range(self.test_session.detect_mult * 2):
2094 p = wait_for_bfd_packet(self)
2095 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2096 self.test_session.send_packet()
2097 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2098 self.test_session.sha1_key = key
2099 self.test_session.send_packet()
2100 for dummy in range(self.test_session.detect_mult * 2):
2101 p = wait_for_bfd_packet(self)
2102 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2103 self.test_session.send_packet()
2104 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2105 self.assert_equal(len(self.vapi.collect_events()), 0,
2106 "number of bfd events")
2108 def test_auth_off_delayed(self):
2109 """ turn auth off without disturbing session state (delayed) """
2110 key = self.factory.create_random_key(self)
2111 key.add_vpp_config()
2112 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2113 self.pg0.remote_ip4, sha1_key=key)
2114 self.vpp_session.add_vpp_config()
2115 self.test_session = BFDTestSession(
2116 self, self.pg0, AF_INET, sha1_key=key,
2117 bfd_key_id=self.vpp_session.bfd_key_id)
2118 bfd_session_up(self)
2119 for dummy in range(self.test_session.detect_mult * 2):
2120 p = wait_for_bfd_packet(self)
2121 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2122 self.test_session.send_packet()
2123 self.vpp_session.deactivate_auth(delayed=True)
2124 for dummy in range(self.test_session.detect_mult * 2):
2125 p = wait_for_bfd_packet(self)
2126 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2127 self.test_session.send_packet()
2128 self.test_session.bfd_key_id = None
2129 self.test_session.sha1_key = None
2130 self.test_session.send_packet()
2131 for dummy in range(self.test_session.detect_mult * 2):
2132 p = wait_for_bfd_packet(self)
2133 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2134 self.test_session.send_packet()
2135 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2136 self.assert_equal(len(self.vapi.collect_events()), 0,
2137 "number of bfd events")
2139 def test_auth_change_key_delayed(self):
2140 """ change auth key without disturbing session state (delayed) """
2141 key1 = self.factory.create_random_key(self)
2142 key1.add_vpp_config()
2143 key2 = self.factory.create_random_key(self)
2144 key2.add_vpp_config()
2145 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2146 self.pg0.remote_ip4, sha1_key=key1)
2147 self.vpp_session.add_vpp_config()
2148 self.vpp_session.admin_up()
2149 self.test_session = BFDTestSession(
2150 self, self.pg0, AF_INET, sha1_key=key1,
2151 bfd_key_id=self.vpp_session.bfd_key_id)
2152 bfd_session_up(self)
2153 for dummy in range(self.test_session.detect_mult * 2):
2154 p = wait_for_bfd_packet(self)
2155 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2156 self.test_session.send_packet()
2157 self.vpp_session.activate_auth(key2, delayed=True)
2158 for dummy in range(self.test_session.detect_mult * 2):
2159 p = wait_for_bfd_packet(self)
2160 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2161 self.test_session.send_packet()
2162 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2163 self.test_session.sha1_key = key2
2164 self.test_session.send_packet()
2165 for dummy in range(self.test_session.detect_mult * 2):
2166 p = wait_for_bfd_packet(self)
2167 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2168 self.test_session.send_packet()
2169 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2170 self.assert_equal(len(self.vapi.collect_events()), 0,
2171 "number of bfd events")
2174 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2175 class BFDCLITestCase(VppTestCase):
2176 """Bidirectional Forwarding Detection (BFD) (CLI) """
2180 def setUpClass(cls):
2181 super(BFDCLITestCase, cls).setUpClass()
2184 cls.create_pg_interfaces((0,))
2185 cls.pg0.config_ip4()
2186 cls.pg0.config_ip6()
2187 cls.pg0.resolve_arp()
2188 cls.pg0.resolve_ndp()
2191 super(BFDCLITestCase, cls).tearDownClass()
2195 super(BFDCLITestCase, self).setUp()
2196 self.factory = AuthKeyFactory()
2197 self.pg0.enable_capture()
2201 self.vapi.want_bfd_events(enable_disable=0)
2202 except UnexpectedApiReturnValueError:
2203 # some tests aren't subscribed, so this is not an issue
2205 self.vapi.collect_events() # clear the event queue
2206 super(BFDCLITestCase, self).tearDown()
2208 def cli_verify_no_response(self, cli):
2209 """ execute a CLI, asserting that the response is empty """
2210 self.assert_equal(self.vapi.cli(cli),
2212 "CLI command response")
2214 def cli_verify_response(self, cli, expected):
2215 """ execute a CLI, asserting that the response matches expectation """
2216 self.assert_equal(self.vapi.cli(cli).strip(),
2218 "CLI command response")
2220 def test_show(self):
2221 """ show commands """
2222 k1 = self.factory.create_random_key(self)
2224 k2 = self.factory.create_random_key(
2225 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2227 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2229 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2232 self.logger.info(self.vapi.ppcli("show bfd keys"))
2233 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2234 self.logger.info(self.vapi.ppcli("show bfd"))
2236 def test_set_del_sha1_key(self):
2237 """ set/delete SHA1 auth key """
2238 k = self.factory.create_random_key(self)
2239 self.registry.register(k, self.logger)
2240 self.cli_verify_no_response(
2241 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2243 "".join("{:02x}".format(ord(c)) for c in k.key)))
2244 self.assertTrue(k.query_vpp_config())
2245 self.vpp_session = VppBFDUDPSession(
2246 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2247 self.vpp_session.add_vpp_config()
2248 self.test_session = \
2249 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2250 bfd_key_id=self.vpp_session.bfd_key_id)
2251 self.vapi.want_bfd_events()
2252 bfd_session_up(self)
2253 bfd_session_down(self)
2254 # try to replace the secret for the key - should fail because the key
2256 k2 = self.factory.create_random_key(self)
2257 self.cli_verify_response(
2258 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2260 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2261 "bfd key set: `bfd_auth_set_key' API call failed, "
2262 "rv=-103:BFD object in use")
2263 # manipulating the session using old secret should still work
2264 bfd_session_up(self)
2265 bfd_session_down(self)
2266 self.vpp_session.remove_vpp_config()
2267 self.cli_verify_no_response(
2268 "bfd key del conf-key-id %s" % k.conf_key_id)
2269 self.assertFalse(k.query_vpp_config())
2271 def test_set_del_meticulous_sha1_key(self):
2272 """ set/delete meticulous SHA1 auth key """
2273 k = self.factory.create_random_key(
2274 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2275 self.registry.register(k, self.logger)
2276 self.cli_verify_no_response(
2277 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2279 "".join("{:02x}".format(ord(c)) for c in k.key)))
2280 self.assertTrue(k.query_vpp_config())
2281 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2282 self.pg0.remote_ip6, af=AF_INET6,
2284 self.vpp_session.add_vpp_config()
2285 self.vpp_session.admin_up()
2286 self.test_session = \
2287 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2288 bfd_key_id=self.vpp_session.bfd_key_id)
2289 self.vapi.want_bfd_events()
2290 bfd_session_up(self)
2291 bfd_session_down(self)
2292 # try to replace the secret for the key - should fail because the key
2294 k2 = self.factory.create_random_key(self)
2295 self.cli_verify_response(
2296 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2298 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2299 "bfd key set: `bfd_auth_set_key' API call failed, "
2300 "rv=-103:BFD object in use")
2301 # manipulating the session using old secret should still work
2302 bfd_session_up(self)
2303 bfd_session_down(self)
2304 self.vpp_session.remove_vpp_config()
2305 self.cli_verify_no_response(
2306 "bfd key del conf-key-id %s" % k.conf_key_id)
2307 self.assertFalse(k.query_vpp_config())
2309 def test_add_mod_del_bfd_udp(self):
2310 """ create/modify/delete IPv4 BFD UDP session """
2311 vpp_session = VppBFDUDPSession(
2312 self, self.pg0, self.pg0.remote_ip4)
2313 self.registry.register(vpp_session, self.logger)
2314 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2315 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2316 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2317 self.pg0.remote_ip4,
2318 vpp_session.desired_min_tx,
2319 vpp_session.required_min_rx,
2320 vpp_session.detect_mult)
2321 self.cli_verify_no_response(cli_add_cmd)
2322 # 2nd add should fail
2323 self.cli_verify_response(
2325 "bfd udp session add: `bfd_add_add_session' API call"
2326 " failed, rv=-101:Duplicate BFD object")
2327 verify_bfd_session_config(self, vpp_session)
2328 mod_session = VppBFDUDPSession(
2329 self, self.pg0, self.pg0.remote_ip4,
2330 required_min_rx=2 * vpp_session.required_min_rx,
2331 desired_min_tx=3 * vpp_session.desired_min_tx,
2332 detect_mult=4 * vpp_session.detect_mult)
2333 self.cli_verify_no_response(
2334 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2335 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2336 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2337 mod_session.desired_min_tx, mod_session.required_min_rx,
2338 mod_session.detect_mult))
2339 verify_bfd_session_config(self, mod_session)
2340 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2341 "peer-addr %s" % (self.pg0.name,
2342 self.pg0.local_ip4, self.pg0.remote_ip4)
2343 self.cli_verify_no_response(cli_del_cmd)
2344 # 2nd del is expected to fail
2345 self.cli_verify_response(
2346 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2347 " failed, rv=-102:No such BFD object")
2348 self.assertFalse(vpp_session.query_vpp_config())
2350 def test_add_mod_del_bfd_udp6(self):
2351 """ create/modify/delete IPv6 BFD UDP session """
2352 vpp_session = VppBFDUDPSession(
2353 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2354 self.registry.register(vpp_session, self.logger)
2355 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2356 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2357 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2358 self.pg0.remote_ip6,
2359 vpp_session.desired_min_tx,
2360 vpp_session.required_min_rx,
2361 vpp_session.detect_mult)
2362 self.cli_verify_no_response(cli_add_cmd)
2363 # 2nd add should fail
2364 self.cli_verify_response(
2366 "bfd udp session add: `bfd_add_add_session' API call"
2367 " failed, rv=-101:Duplicate BFD object")
2368 verify_bfd_session_config(self, vpp_session)
2369 mod_session = VppBFDUDPSession(
2370 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2371 required_min_rx=2 * vpp_session.required_min_rx,
2372 desired_min_tx=3 * vpp_session.desired_min_tx,
2373 detect_mult=4 * vpp_session.detect_mult)
2374 self.cli_verify_no_response(
2375 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2376 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2377 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2378 mod_session.desired_min_tx,
2379 mod_session.required_min_rx, mod_session.detect_mult))
2380 verify_bfd_session_config(self, mod_session)
2381 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2382 "peer-addr %s" % (self.pg0.name,
2383 self.pg0.local_ip6, self.pg0.remote_ip6)
2384 self.cli_verify_no_response(cli_del_cmd)
2385 # 2nd del is expected to fail
2386 self.cli_verify_response(
2388 "bfd udp session del: `bfd_udp_del_session' API call"
2389 " failed, rv=-102:No such BFD object")
2390 self.assertFalse(vpp_session.query_vpp_config())
2392 def test_add_mod_del_bfd_udp_auth(self):
2393 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2394 key = self.factory.create_random_key(self)
2395 key.add_vpp_config()
2396 vpp_session = VppBFDUDPSession(
2397 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2398 self.registry.register(vpp_session, self.logger)
2399 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2400 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2401 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2402 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2403 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2404 vpp_session.detect_mult, key.conf_key_id,
2405 vpp_session.bfd_key_id)
2406 self.cli_verify_no_response(cli_add_cmd)
2407 # 2nd add should fail
2408 self.cli_verify_response(
2410 "bfd udp session add: `bfd_add_add_session' API call"
2411 " failed, rv=-101:Duplicate BFD object")
2412 verify_bfd_session_config(self, vpp_session)
2413 mod_session = VppBFDUDPSession(
2414 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2415 bfd_key_id=vpp_session.bfd_key_id,
2416 required_min_rx=2 * vpp_session.required_min_rx,
2417 desired_min_tx=3 * vpp_session.desired_min_tx,
2418 detect_mult=4 * vpp_session.detect_mult)
2419 self.cli_verify_no_response(
2420 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2421 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2422 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2423 mod_session.desired_min_tx,
2424 mod_session.required_min_rx, mod_session.detect_mult))
2425 verify_bfd_session_config(self, mod_session)
2426 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2427 "peer-addr %s" % (self.pg0.name,
2428 self.pg0.local_ip4, self.pg0.remote_ip4)
2429 self.cli_verify_no_response(cli_del_cmd)
2430 # 2nd del is expected to fail
2431 self.cli_verify_response(
2433 "bfd udp session del: `bfd_udp_del_session' API call"
2434 " failed, rv=-102:No such BFD object")
2435 self.assertFalse(vpp_session.query_vpp_config())
2437 def test_add_mod_del_bfd_udp6_auth(self):
2438 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2439 key = self.factory.create_random_key(
2440 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2441 key.add_vpp_config()
2442 vpp_session = VppBFDUDPSession(
2443 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2444 self.registry.register(vpp_session, self.logger)
2445 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2446 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2447 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2448 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2449 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2450 vpp_session.detect_mult, key.conf_key_id,
2451 vpp_session.bfd_key_id)
2452 self.cli_verify_no_response(cli_add_cmd)
2453 # 2nd add should fail
2454 self.cli_verify_response(
2456 "bfd udp session add: `bfd_add_add_session' API call"
2457 " failed, rv=-101:Duplicate BFD object")
2458 verify_bfd_session_config(self, vpp_session)
2459 mod_session = VppBFDUDPSession(
2460 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2461 bfd_key_id=vpp_session.bfd_key_id,
2462 required_min_rx=2 * vpp_session.required_min_rx,
2463 desired_min_tx=3 * vpp_session.desired_min_tx,
2464 detect_mult=4 * vpp_session.detect_mult)
2465 self.cli_verify_no_response(
2466 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2467 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2468 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2469 mod_session.desired_min_tx,
2470 mod_session.required_min_rx, mod_session.detect_mult))
2471 verify_bfd_session_config(self, mod_session)
2472 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2473 "peer-addr %s" % (self.pg0.name,
2474 self.pg0.local_ip6, self.pg0.remote_ip6)
2475 self.cli_verify_no_response(cli_del_cmd)
2476 # 2nd del is expected to fail
2477 self.cli_verify_response(
2479 "bfd udp session del: `bfd_udp_del_session' API call"
2480 " failed, rv=-102:No such BFD object")
2481 self.assertFalse(vpp_session.query_vpp_config())
2483 def test_auth_on_off(self):
2484 """ turn authentication on and off """
2485 key = self.factory.create_random_key(
2486 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2487 key.add_vpp_config()
2488 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2489 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2491 session.add_vpp_config()
2493 "bfd udp session auth activate interface %s local-addr %s "\
2494 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2495 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2496 key.conf_key_id, auth_session.bfd_key_id)
2497 self.cli_verify_no_response(cli_activate)
2498 verify_bfd_session_config(self, auth_session)
2499 self.cli_verify_no_response(cli_activate)
2500 verify_bfd_session_config(self, auth_session)
2502 "bfd udp session auth deactivate interface %s local-addr %s "\
2504 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2505 self.cli_verify_no_response(cli_deactivate)
2506 verify_bfd_session_config(self, session)
2507 self.cli_verify_no_response(cli_deactivate)
2508 verify_bfd_session_config(self, session)
2510 def test_auth_on_off_delayed(self):
2511 """ turn authentication on and off (delayed) """
2512 key = self.factory.create_random_key(
2513 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2514 key.add_vpp_config()
2515 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2516 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2518 session.add_vpp_config()
2520 "bfd udp session auth activate interface %s local-addr %s "\
2521 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2522 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2523 key.conf_key_id, auth_session.bfd_key_id)
2524 self.cli_verify_no_response(cli_activate)
2525 verify_bfd_session_config(self, auth_session)
2526 self.cli_verify_no_response(cli_activate)
2527 verify_bfd_session_config(self, auth_session)
2529 "bfd udp session auth deactivate interface %s local-addr %s "\
2530 "peer-addr %s delayed yes"\
2531 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2532 self.cli_verify_no_response(cli_deactivate)
2533 verify_bfd_session_config(self, session)
2534 self.cli_verify_no_response(cli_deactivate)
2535 verify_bfd_session_config(self, session)
2537 def test_admin_up_down(self):
2538 """ put session admin-up and admin-down """
2539 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2540 session.add_vpp_config()
2542 "bfd udp session set-flags admin down interface %s local-addr %s "\
2544 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2546 "bfd udp session set-flags admin up interface %s local-addr %s "\
2548 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2549 self.cli_verify_no_response(cli_down)
2550 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2551 self.cli_verify_no_response(cli_up)
2552 verify_bfd_session_config(self, session, state=BFDState.down)
2554 def test_set_del_udp_echo_source(self):
2555 """ set/del udp echo source """
2556 self.create_loopback_interfaces([0])
2557 self.loopback0 = self.lo_interfaces[0]
2558 self.loopback0.admin_up()
2559 self.cli_verify_response("show bfd echo-source",
2560 "UDP echo source is not set.")
2561 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2562 self.cli_verify_no_response(cli_set)
2563 self.cli_verify_response("show bfd echo-source",
2564 "UDP echo source is: %s\n"
2565 "IPv4 address usable as echo source: none\n"
2566 "IPv6 address usable as echo source: none" %
2567 self.loopback0.name)
2568 self.loopback0.config_ip4()
2569 unpacked = unpack("!L", self.loopback0.local_ip4n)
2570 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2571 self.cli_verify_response("show bfd echo-source",
2572 "UDP echo source is: %s\n"
2573 "IPv4 address usable as echo source: %s\n"
2574 "IPv6 address usable as echo source: none" %
2575 (self.loopback0.name, echo_ip4))
2576 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2577 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2578 unpacked[2], unpacked[3] ^ 1))
2579 self.loopback0.config_ip6()
2580 self.cli_verify_response("show bfd echo-source",
2581 "UDP echo source is: %s\n"
2582 "IPv4 address usable as echo source: %s\n"
2583 "IPv6 address usable as echo source: %s" %
2584 (self.loopback0.name, echo_ip4, echo_ip6))
2585 cli_del = "bfd udp echo-source del"
2586 self.cli_verify_no_response(cli_del)
2587 self.cli_verify_response("show bfd echo-source",
2588 "UDP echo source is not set.")
2590 if __name__ == '__main__':
2591 unittest.main(testRunner=VppTestRunner)