4 from __future__ import division
9 from random import randint, shuffle
10 from socket import AF_INET, AF_INET6
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import UDP, IP
13 from scapy.layers.inet6 import IPv6
14 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
16 from framework import VppTestCase, VppTestRunner
17 from vpp_pg_interface import CaptureTimeoutError
23 class AuthKeyFactory(object):
24 """Factory class for creating auth keys with unique conf key ID"""
27 self._conf_key_ids = {}
29 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
30 """ create a random key with unique conf key id """
31 conf_key_id = randint(0, 0xFFFFFFFF)
32 while conf_key_id in self._conf_key_ids:
33 conf_key_id = randint(0, 0xFFFFFFFF)
34 self._conf_key_ids[conf_key_id] = 1
35 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
36 return VppBFDAuthKey(test=test, auth_type=auth_type,
37 conf_key_id=conf_key_id, key=key)
40 class BFDAPITestCase(VppTestCase):
41 """Bidirectional Forwarding Detection (BFD) - API"""
48 super(BFDAPITestCase, cls).setUpClass()
51 cls.create_pg_interfaces(range(2))
52 for i in cls.pg_interfaces:
58 super(BFDAPITestCase, cls).tearDownClass()
62 super(BFDAPITestCase, self).setUp()
63 self.factory = AuthKeyFactory()
65 def test_add_bfd(self):
66 """ create a BFD session """
67 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
68 session.add_vpp_config()
69 self.logger.debug("Session state is %s", session.state)
70 session.remove_vpp_config()
71 session.add_vpp_config()
72 self.logger.debug("Session state is %s", session.state)
73 session.remove_vpp_config()
75 def test_double_add(self):
76 """ create the same BFD session twice (negative case) """
77 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
78 session.add_vpp_config()
80 with self.vapi.expect_negative_api_retval():
81 session.add_vpp_config()
83 session.remove_vpp_config()
85 def test_add_bfd6(self):
86 """ create IPv6 BFD session """
87 session = VppBFDUDPSession(
88 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
89 session.add_vpp_config()
90 self.logger.debug("Session state is %s", session.state)
91 session.remove_vpp_config()
92 session.add_vpp_config()
93 self.logger.debug("Session state is %s", session.state)
94 session.remove_vpp_config()
96 def test_mod_bfd(self):
97 """ modify BFD session parameters """
98 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
100 required_min_rx=10000,
102 session.add_vpp_config()
103 s = session.get_bfd_udp_session_dump_entry()
104 self.assert_equal(session.desired_min_tx,
106 "desired min transmit interval")
107 self.assert_equal(session.required_min_rx,
109 "required min receive interval")
110 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
111 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
112 required_min_rx=session.required_min_rx * 2,
113 detect_mult=session.detect_mult * 2)
114 s = session.get_bfd_udp_session_dump_entry()
115 self.assert_equal(session.desired_min_tx,
117 "desired min transmit interval")
118 self.assert_equal(session.required_min_rx,
120 "required min receive interval")
121 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
123 def test_add_sha1_keys(self):
124 """ add SHA1 keys """
126 keys = [self.factory.create_random_key(
127 self) for i in range(0, key_count)]
129 self.assertFalse(key.query_vpp_config())
133 self.assertTrue(key.query_vpp_config())
135 indexes = range(key_count)
140 key.remove_vpp_config()
142 for j in range(key_count):
145 self.assertFalse(key.query_vpp_config())
147 self.assertTrue(key.query_vpp_config())
148 # should be removed now
150 self.assertFalse(key.query_vpp_config())
151 # add back and remove again
155 self.assertTrue(key.query_vpp_config())
157 key.remove_vpp_config()
159 self.assertFalse(key.query_vpp_config())
161 def test_add_bfd_sha1(self):
162 """ create a BFD session (SHA1) """
163 key = self.factory.create_random_key(self)
165 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
167 session.add_vpp_config()
168 self.logger.debug("Session state is %s", session.state)
169 session.remove_vpp_config()
170 session.add_vpp_config()
171 self.logger.debug("Session state is %s", session.state)
172 session.remove_vpp_config()
174 def test_double_add_sha1(self):
175 """ create the same BFD session twice (negative case) (SHA1) """
176 key = self.factory.create_random_key(self)
178 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
180 session.add_vpp_config()
181 with self.assertRaises(Exception):
182 session.add_vpp_config()
184 def test_add_auth_nonexistent_key(self):
185 """ create BFD session using non-existent SHA1 (negative case) """
186 session = VppBFDUDPSession(
187 self, self.pg0, self.pg0.remote_ip4,
188 sha1_key=self.factory.create_random_key(self))
189 with self.assertRaises(Exception):
190 session.add_vpp_config()
192 def test_shared_sha1_key(self):
193 """ share single SHA1 key between multiple BFD sessions """
194 key = self.factory.create_random_key(self)
197 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
199 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
200 sha1_key=key, af=AF_INET6),
201 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
203 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
204 sha1_key=key, af=AF_INET6)]
209 e = key.get_bfd_auth_keys_dump_entry()
210 self.assert_equal(e.use_count, len(sessions) - removed,
211 "Use count for shared key")
212 s.remove_vpp_config()
214 e = key.get_bfd_auth_keys_dump_entry()
215 self.assert_equal(e.use_count, len(sessions) - removed,
216 "Use count for shared key")
218 def test_activate_auth(self):
219 """ activate SHA1 authentication """
220 key = self.factory.create_random_key(self)
222 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
223 session.add_vpp_config()
224 session.activate_auth(key)
226 def test_deactivate_auth(self):
227 """ deactivate SHA1 authentication """
228 key = self.factory.create_random_key(self)
230 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
231 session.add_vpp_config()
232 session.activate_auth(key)
233 session.deactivate_auth()
235 def test_change_key(self):
236 """ change SHA1 key """
237 key1 = self.factory.create_random_key(self)
238 key2 = self.factory.create_random_key(self)
239 while key2.conf_key_id == key1.conf_key_id:
240 key2 = self.factory.create_random_key(self)
241 key1.add_vpp_config()
242 key2.add_vpp_config()
243 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
245 session.add_vpp_config()
246 session.activate_auth(key2)
249 class BFDTestSession(object):
250 """ BFD session as seen from test framework side """
252 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
253 bfd_key_id=None, our_seq_number=None):
256 self.sha1_key = sha1_key
257 self.bfd_key_id = bfd_key_id
258 self.interface = interface
259 self.udp_sport = randint(49152, 65535)
260 if our_seq_number is None:
261 self.our_seq_number = randint(0, 40000000)
263 self.our_seq_number = our_seq_number
264 self.vpp_seq_number = None
265 self.my_discriminator = 0
266 self.desired_min_tx = 100000
267 self.required_min_rx = 100000
268 self.detect_mult = detect_mult
269 self.diag = BFDDiagCode.no_diagnostic
270 self.your_discriminator = None
271 self.state = BFDState.down
272 self.auth_type = BFDAuthType.no_auth
274 def inc_seq_num(self):
275 """ increment sequence number, wrapping if needed """
276 if self.our_seq_number == 0xFFFFFFFF:
277 self.our_seq_number = 0
279 self.our_seq_number += 1
281 def update(self, my_discriminator=None, your_discriminator=None,
282 desired_min_tx=None, required_min_rx=None, detect_mult=None,
283 diag=None, state=None, auth_type=None):
284 """ update BFD parameters associated with session """
286 self.my_discriminator = my_discriminator
287 if your_discriminator:
288 self.your_discriminator = your_discriminator
290 self.required_min_rx = required_min_rx
292 self.desired_min_tx = desired_min_tx
294 self.detect_mult = detect_mult
300 self.auth_type = auth_type
302 def fill_packet_fields(self, packet):
303 """ set packet fields with known values in packet """
305 if self.my_discriminator:
306 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
307 self.my_discriminator)
308 bfd.my_discriminator = self.my_discriminator
309 if self.your_discriminator:
310 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
311 self.your_discriminator)
312 bfd.your_discriminator = self.your_discriminator
313 if self.required_min_rx:
314 self.test.logger.debug(
315 "BFD: setting packet.required_min_rx_interval=%s",
316 self.required_min_rx)
317 bfd.required_min_rx_interval = self.required_min_rx
318 if self.desired_min_tx:
319 self.test.logger.debug(
320 "BFD: setting packet.desired_min_tx_interval=%s",
322 bfd.desired_min_tx_interval = self.desired_min_tx
324 self.test.logger.debug(
325 "BFD: setting packet.detect_mult=%s", self.detect_mult)
326 bfd.detect_mult = self.detect_mult
328 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
331 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
332 bfd.state = self.state
334 # this is used by a negative test-case
335 self.test.logger.debug("BFD: setting packet.auth_type=%s",
337 bfd.auth_type = self.auth_type
339 def create_packet(self):
340 """ create a BFD packet, reflecting the current state of session """
343 bfd.auth_type = self.sha1_key.auth_type
344 bfd.auth_len = BFD.sha1_auth_len
345 bfd.auth_key_id = self.bfd_key_id
346 bfd.auth_seq_num = self.our_seq_number
347 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
350 if self.af == AF_INET6:
351 packet = (Ether(src=self.interface.remote_mac,
352 dst=self.interface.local_mac) /
353 IPv6(src=self.interface.remote_ip6,
354 dst=self.interface.local_ip6,
356 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
359 packet = (Ether(src=self.interface.remote_mac,
360 dst=self.interface.local_mac) /
361 IP(src=self.interface.remote_ip4,
362 dst=self.interface.local_ip4,
364 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
366 self.test.logger.debug("BFD: Creating packet")
367 self.fill_packet_fields(packet)
369 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
370 "\0" * (20 - len(self.sha1_key.key))
371 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
372 hashlib.sha1(hash_material).hexdigest())
373 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
376 def send_packet(self, packet=None, interface=None):
377 """ send packet on interface, creating the packet if needed """
379 packet = self.create_packet()
380 if interface is None:
381 interface = self.test.pg0
382 self.test.logger.debug(ppp("Sending packet:", packet))
383 interface.add_stream(packet)
386 def verify_sha1_auth(self, packet):
387 """ Verify correctness of authentication in BFD layer. """
389 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
390 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
392 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
393 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
394 if self.vpp_seq_number is None:
395 self.vpp_seq_number = bfd.auth_seq_num
396 self.test.logger.debug("Received initial sequence number: %s" %
399 recvd_seq_num = bfd.auth_seq_num
400 self.test.logger.debug("Received followup sequence number: %s" %
402 if self.vpp_seq_number < 0xffffffff:
403 if self.sha1_key.auth_type == \
404 BFDAuthType.meticulous_keyed_sha1:
405 self.test.assert_equal(recvd_seq_num,
406 self.vpp_seq_number + 1,
407 "BFD sequence number")
409 self.test.assert_in_range(recvd_seq_num,
411 self.vpp_seq_number + 1,
412 "BFD sequence number")
414 if self.sha1_key.auth_type == \
415 BFDAuthType.meticulous_keyed_sha1:
416 self.test.assert_equal(recvd_seq_num, 0,
417 "BFD sequence number")
419 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
420 "BFD sequence number not one of "
421 "(%s, 0)" % self.vpp_seq_number)
422 self.vpp_seq_number = recvd_seq_num
423 # last 20 bytes represent the hash - so replace them with the key,
424 # pad the result with zeros and hash the result
425 hash_material = bfd.original[:-20] + self.sha1_key.key + \
426 "\0" * (20 - len(self.sha1_key.key))
427 expected_hash = hashlib.sha1(hash_material).hexdigest()
428 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
429 expected_hash, "Auth key hash")
431 def verify_bfd(self, packet):
432 """ Verify correctness of BFD layer. """
434 self.test.assert_equal(bfd.version, 1, "BFD version")
435 self.test.assert_equal(bfd.your_discriminator,
436 self.my_discriminator,
437 "BFD - your discriminator")
439 self.verify_sha1_auth(packet)
442 def bfd_session_up(test):
443 """ Bring BFD session up """
444 test.logger.info("BFD: Waiting for slow hello")
445 p = wait_for_bfd_packet(test, 2)
447 if hasattr(test, 'vpp_clock_offset'):
448 old_offset = test.vpp_clock_offset
449 test.vpp_clock_offset = time.time() - p.time
450 test.logger.debug("BFD: Calculated vpp clock offset: %s",
451 test.vpp_clock_offset)
453 test.assertAlmostEqual(
454 old_offset, test.vpp_clock_offset, delta=0.1,
455 msg="vpp clock offset not stable (new: %s, old: %s)" %
456 (test.vpp_clock_offset, old_offset))
457 test.logger.info("BFD: Sending Init")
458 test.test_session.update(my_discriminator=randint(0, 40000000),
459 your_discriminator=p[BFD].my_discriminator,
461 test.test_session.send_packet()
462 test.logger.info("BFD: Waiting for event")
463 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
464 verify_event(test, e, expected_state=BFDState.up)
465 test.logger.info("BFD: Session is Up")
466 test.test_session.update(state=BFDState.up)
467 test.test_session.send_packet()
468 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
471 def bfd_session_down(test):
472 """ Bring BFD session down """
473 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
474 test.test_session.update(state=BFDState.down)
475 test.test_session.send_packet()
476 test.logger.info("BFD: Waiting for event")
477 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
478 verify_event(test, e, expected_state=BFDState.down)
479 test.logger.info("BFD: Session is Down")
480 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
483 def verify_ip(test, packet):
484 """ Verify correctness of IP layer. """
485 if test.vpp_session.af == AF_INET6:
487 local_ip = test.pg0.local_ip6
488 remote_ip = test.pg0.remote_ip6
489 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
492 local_ip = test.pg0.local_ip4
493 remote_ip = test.pg0.remote_ip4
494 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
495 test.assert_equal(ip.src, local_ip, "IP source address")
496 test.assert_equal(ip.dst, remote_ip, "IP destination address")
499 def verify_udp(test, packet):
500 """ Verify correctness of UDP layer. """
502 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
503 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
507 def verify_event(test, event, expected_state):
508 """ Verify correctness of event values. """
510 test.logger.debug("BFD: Event: %s" % repr(e))
511 test.assert_equal(e.sw_if_index,
512 test.vpp_session.interface.sw_if_index,
513 "BFD interface index")
515 if test.vpp_session.af == AF_INET6:
517 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
518 if test.vpp_session.af == AF_INET:
519 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
520 "Local IPv4 address")
521 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
524 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
525 "Local IPv6 address")
526 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
528 test.assert_equal(e.state, expected_state, BFDState)
531 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
532 """ wait for BFD packet and verify its correctness
534 :param timeout: how long to wait
535 :param pcap_time_min: ignore packets with pcap timestamp lower than this
537 :returns: tuple (packet, time spent waiting for packet)
539 test.logger.info("BFD: Waiting for BFD packet")
540 deadline = time.time() + timeout
545 test.assert_in_range(counter, 0, 100, "number of packets ignored")
546 time_left = deadline - time.time()
548 raise CaptureTimeoutError("Packet did not arrive within timeout")
549 p = test.pg0.wait_for_packet(timeout=time_left)
550 test.logger.debug(ppp("BFD: Got packet:", p))
551 if pcap_time_min is not None and p.time < pcap_time_min:
552 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
553 "pcap time min %s):" %
554 (p.time, pcap_time_min), p))
559 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
561 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
564 test.test_session.verify_bfd(p)
568 class BFD4TestCase(VppTestCase):
569 """Bidirectional Forwarding Detection (BFD)"""
572 vpp_clock_offset = None
578 super(BFD4TestCase, cls).setUpClass()
580 cls.create_pg_interfaces([0])
582 cls.pg0.configure_ipv4_neighbors()
584 cls.pg0.resolve_arp()
587 super(BFD4TestCase, cls).tearDownClass()
591 super(BFD4TestCase, self).setUp()
592 self.factory = AuthKeyFactory()
593 self.vapi.want_bfd_events()
594 self.pg0.enable_capture()
596 self.vpp_session = VppBFDUDPSession(self, self.pg0,
598 self.vpp_session.add_vpp_config()
599 self.vpp_session.admin_up()
600 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
602 self.vapi.want_bfd_events(enable_disable=0)
606 if not self.vpp_dead:
607 self.vapi.want_bfd_events(enable_disable=0)
608 self.vapi.collect_events() # clear the event queue
609 super(BFD4TestCase, self).tearDown()
611 def test_session_up(self):
612 """ bring BFD session up """
615 def test_session_down(self):
616 """ bring BFD session down """
618 bfd_session_down(self)
620 def test_hold_up(self):
621 """ hold BFD session up """
623 for dummy in range(self.test_session.detect_mult * 2):
624 wait_for_bfd_packet(self)
625 self.test_session.send_packet()
626 self.assert_equal(len(self.vapi.collect_events()), 0,
627 "number of bfd events")
629 def test_slow_timer(self):
630 """ verify slow periodic control frames while session down """
632 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
633 prev_packet = wait_for_bfd_packet(self, 2)
634 for dummy in range(packet_count):
635 next_packet = wait_for_bfd_packet(self, 2)
636 time_diff = next_packet.time - prev_packet.time
637 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
638 # to work around timing issues
639 self.assert_in_range(
640 time_diff, 0.70, 1.05, "time between slow packets")
641 prev_packet = next_packet
643 def test_zero_remote_min_rx(self):
644 """ no packets when zero remote required min rx interval """
646 self.test_session.update(required_min_rx=0)
647 self.test_session.send_packet()
648 cap = 2 * self.vpp_session.desired_min_tx *\
649 self.test_session.detect_mult
650 time_mark = time.time()
652 # busy wait here, trying to collect a packet or event, vpp is not
653 # allowed to send packets and the session will timeout first - so the
654 # Up->Down event must arrive before any packets do
655 while time.time() < time_mark + cap / USEC_IN_SEC:
657 p = wait_for_bfd_packet(
659 pcap_time_min=time_mark - self.vpp_clock_offset)
660 self.logger.error(ppp("Received unexpected packet:", p))
662 except CaptureTimeoutError:
664 events = self.vapi.collect_events()
666 verify_event(self, events[0], BFDState.down)
668 self.assert_equal(count, 0, "number of packets received")
670 def test_conn_down(self):
671 """ verify session goes down after inactivity """
673 for dummy in range(self.test_session.detect_mult):
674 wait_for_bfd_packet(self)
675 self.assert_equal(len(self.vapi.collect_events()), 0,
676 "number of bfd events")
677 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
678 verify_event(self, e, expected_state=BFDState.down)
680 def test_large_required_min_rx(self):
681 """ large remote required min rx interval """
683 p = wait_for_bfd_packet(self)
685 self.test_session.update(required_min_rx=interval)
686 self.test_session.send_packet()
687 time_mark = time.time()
689 # busy wait here, trying to collect a packet or event, vpp is not
690 # allowed to send packets and the session will timeout first - so the
691 # Up->Down event must arrive before any packets do
692 while time.time() < time_mark + interval / USEC_IN_SEC:
694 p = wait_for_bfd_packet(self, timeout=0)
695 # if vpp managed to send a packet before we did the session
696 # session update, then that's fine, ignore it
697 if p.time < time_mark - self.vpp_clock_offset:
699 self.logger.error(ppp("Received unexpected packet:", p))
701 except CaptureTimeoutError:
703 events = self.vapi.collect_events()
705 verify_event(self, events[0], BFDState.down)
707 self.assert_equal(count, 0, "number of packets received")
709 def test_immediate_remote_min_rx_reduction(self):
710 """ immediately honor remote required min rx reduction """
711 self.vpp_session.remove_vpp_config()
712 self.vpp_session = VppBFDUDPSession(
713 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
714 self.pg0.enable_capture()
715 self.vpp_session.add_vpp_config()
716 self.test_session.update(desired_min_tx=1000000,
717 required_min_rx=1000000)
719 reference_packet = wait_for_bfd_packet(self)
720 time_mark = time.time()
722 self.test_session.update(required_min_rx=interval)
723 self.test_session.send_packet()
724 extra_time = time.time() - time_mark
725 p = wait_for_bfd_packet(self)
726 # first packet is allowed to be late by time we spent doing the update
727 # calculated in extra_time
728 self.assert_in_range(p.time - reference_packet.time,
729 .95 * 0.75 * interval / USEC_IN_SEC,
730 1.05 * interval / USEC_IN_SEC + extra_time,
731 "time between BFD packets")
733 for dummy in range(3):
734 p = wait_for_bfd_packet(self)
735 diff = p.time - reference_packet.time
736 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
737 1.05 * interval / USEC_IN_SEC,
738 "time between BFD packets")
741 def test_modify_req_min_rx_double(self):
742 """ modify session - double required min rx """
744 p = wait_for_bfd_packet(self)
745 self.test_session.update(desired_min_tx=10000,
746 required_min_rx=10000)
747 self.test_session.send_packet()
748 # double required min rx
749 self.vpp_session.modify_parameters(
750 required_min_rx=2 * self.vpp_session.required_min_rx)
751 p = wait_for_bfd_packet(
752 self, pcap_time_min=time.time() - self.vpp_clock_offset)
753 # poll bit needs to be set
754 self.assertIn("P", p.sprintf("%BFD.flags%"),
755 "Poll bit not set in BFD packet")
756 # finish poll sequence with final packet
757 final = self.test_session.create_packet()
758 final[BFD].flags = "F"
759 timeout = self.test_session.detect_mult * \
760 max(self.test_session.desired_min_tx,
761 self.vpp_session.required_min_rx) / USEC_IN_SEC
762 self.test_session.send_packet(final)
763 time_mark = time.time()
764 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
765 verify_event(self, e, expected_state=BFDState.down)
766 time_to_event = time.time() - time_mark
767 self.assert_in_range(time_to_event, .9 * timeout,
768 1.1 * timeout, "session timeout")
770 def test_modify_req_min_rx_halve(self):
771 """ modify session - halve required min rx """
772 self.vpp_session.modify_parameters(
773 required_min_rx=2 * self.vpp_session.required_min_rx)
775 p = wait_for_bfd_packet(self)
776 self.test_session.update(desired_min_tx=10000,
777 required_min_rx=10000)
778 self.test_session.send_packet()
779 p = wait_for_bfd_packet(
780 self, pcap_time_min=time.time() - self.vpp_clock_offset)
781 # halve required min rx
782 old_required_min_rx = self.vpp_session.required_min_rx
783 self.vpp_session.modify_parameters(
784 required_min_rx=0.5 * self.vpp_session.required_min_rx)
785 # now we wait 0.8*3*old-req-min-rx and the session should still be up
786 self.sleep(0.8 * self.vpp_session.detect_mult *
787 old_required_min_rx / USEC_IN_SEC)
788 self.assert_equal(len(self.vapi.collect_events()), 0,
789 "number of bfd events")
790 p = wait_for_bfd_packet(self)
791 # poll bit needs to be set
792 self.assertIn("P", p.sprintf("%BFD.flags%"),
793 "Poll bit not set in BFD packet")
794 # finish poll sequence with final packet
795 final = self.test_session.create_packet()
796 final[BFD].flags = "F"
797 self.test_session.send_packet(final)
798 # now the session should time out under new conditions
800 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
802 detection_time = self.vpp_session.detect_mult *\
803 self.vpp_session.required_min_rx / USEC_IN_SEC
804 self.assert_in_range(after - before,
805 0.9 * detection_time,
806 1.1 * detection_time,
807 "time before bfd session goes down")
808 verify_event(self, e, expected_state=BFDState.down)
810 def test_modify_des_min_tx(self):
811 """ modify desired min tx interval """
814 def test_modify_detect_mult(self):
815 """ modify detect multiplier """
817 p = wait_for_bfd_packet(self)
818 self.vpp_session.modify_parameters(detect_mult=1)
819 p = wait_for_bfd_packet(
820 self, pcap_time_min=time.time() - self.vpp_clock_offset)
821 self.assert_equal(self.vpp_session.detect_mult,
824 # poll bit must not be set
825 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
826 "Poll bit not set in BFD packet")
827 self.vpp_session.modify_parameters(detect_mult=10)
828 p = wait_for_bfd_packet(
829 self, pcap_time_min=time.time() - self.vpp_clock_offset)
830 self.assert_equal(self.vpp_session.detect_mult,
833 # poll bit must not be set
834 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
835 "Poll bit not set in BFD packet")
837 def test_no_periodic_if_remote_demand(self):
838 """ no periodic frames outside poll sequence if remote demand set """
839 self.test_session.update(detect_mult=10)
841 demand = self.test_session.create_packet()
842 demand[BFD].flags = "D"
843 self.test_session.send_packet(demand)
844 transmit_time = 0.9 \
845 * max(self.vpp_session.required_min_rx,
846 self.test_session.desired_min_tx) \
849 for dummy in range(self.test_session.detect_mult):
850 time.sleep(transmit_time)
851 self.test_session.send_packet(demand)
853 p = wait_for_bfd_packet(self, timeout=0)
854 self.logger.error(ppp("Received unexpected packet:", p))
856 except CaptureTimeoutError:
858 events = self.vapi.collect_events()
860 self.logger.error("Received unexpected event: %s", e)
861 self.assert_equal(count, 0, "number of packets received")
862 self.assert_equal(len(events), 0, "number of events received")
865 class BFD6TestCase(VppTestCase):
866 """Bidirectional Forwarding Detection (BFD) (IPv6) """
869 vpp_clock_offset = None
875 super(BFD6TestCase, cls).setUpClass()
877 cls.create_pg_interfaces([0])
879 cls.pg0.configure_ipv6_neighbors()
881 cls.pg0.resolve_ndp()
884 super(BFD6TestCase, cls).tearDownClass()
888 super(BFD6TestCase, self).setUp()
889 self.factory = AuthKeyFactory()
890 self.vapi.want_bfd_events()
891 self.pg0.enable_capture()
893 self.vpp_session = VppBFDUDPSession(self, self.pg0,
896 self.vpp_session.add_vpp_config()
897 self.vpp_session.admin_up()
898 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
899 self.logger.debug(self.vapi.cli("show adj nbr"))
901 self.vapi.want_bfd_events(enable_disable=0)
905 if not self.vpp_dead:
906 self.vapi.want_bfd_events(enable_disable=0)
907 self.vapi.collect_events() # clear the event queue
908 super(BFD6TestCase, self).tearDown()
910 def test_session_up(self):
911 """ bring BFD session up """
914 def test_hold_up(self):
915 """ hold BFD session up """
917 for dummy in range(self.test_session.detect_mult*2):
918 wait_for_bfd_packet(self)
919 self.test_session.send_packet()
920 self.assert_equal(len(self.vapi.collect_events()), 0,
921 "number of bfd events")
922 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
925 class BFDSHA1TestCase(VppTestCase):
926 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
929 vpp_clock_offset = None
935 super(BFDSHA1TestCase, cls).setUpClass()
937 cls.create_pg_interfaces([0])
940 cls.pg0.resolve_arp()
943 super(BFDSHA1TestCase, cls).tearDownClass()
947 super(BFDSHA1TestCase, self).setUp()
948 self.factory = AuthKeyFactory()
949 self.vapi.want_bfd_events()
950 self.pg0.enable_capture()
953 if not self.vpp_dead:
954 self.vapi.want_bfd_events(enable_disable=0)
955 self.vapi.collect_events() # clear the event queue
956 super(BFDSHA1TestCase, self).tearDown()
958 def test_session_up(self):
959 """ bring BFD session up """
960 key = self.factory.create_random_key(self)
962 self.vpp_session = VppBFDUDPSession(self, self.pg0,
965 self.vpp_session.add_vpp_config()
966 self.vpp_session.admin_up()
967 self.test_session = BFDTestSession(
968 self, self.pg0, AF_INET, sha1_key=key,
969 bfd_key_id=self.vpp_session.bfd_key_id)
972 def test_hold_up(self):
973 """ hold BFD session up """
974 key = self.factory.create_random_key(self)
976 self.vpp_session = VppBFDUDPSession(self, self.pg0,
979 self.vpp_session.add_vpp_config()
980 self.vpp_session.admin_up()
981 self.test_session = BFDTestSession(
982 self, self.pg0, AF_INET, sha1_key=key,
983 bfd_key_id=self.vpp_session.bfd_key_id)
985 for dummy in range(self.test_session.detect_mult*2):
986 wait_for_bfd_packet(self)
987 self.test_session.send_packet()
988 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
990 def test_hold_up_meticulous(self):
991 """ hold BFD session up - meticulous auth """
992 key = self.factory.create_random_key(
993 self, BFDAuthType.meticulous_keyed_sha1)
995 self.vpp_session = VppBFDUDPSession(self, self.pg0,
996 self.pg0.remote_ip4, sha1_key=key)
997 self.vpp_session.add_vpp_config()
998 self.vpp_session.admin_up()
999 # specify sequence number so that it wraps
1000 self.test_session = BFDTestSession(
1001 self, self.pg0, AF_INET, sha1_key=key,
1002 bfd_key_id=self.vpp_session.bfd_key_id,
1003 our_seq_number=0xFFFFFFFF - 4)
1004 bfd_session_up(self)
1005 for dummy in range(30):
1006 wait_for_bfd_packet(self)
1007 self.test_session.inc_seq_num()
1008 self.test_session.send_packet()
1009 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1011 def test_send_bad_seq_number(self):
1012 """ session is not kept alive by msgs with bad seq numbers"""
1013 key = self.factory.create_random_key(
1014 self, BFDAuthType.meticulous_keyed_sha1)
1015 key.add_vpp_config()
1016 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1017 self.pg0.remote_ip4, sha1_key=key)
1018 self.vpp_session.add_vpp_config()
1019 self.vpp_session.admin_up()
1020 self.test_session = BFDTestSession(
1021 self, self.pg0, AF_INET, sha1_key=key,
1022 bfd_key_id=self.vpp_session.bfd_key_id)
1023 bfd_session_up(self)
1024 detection_time = self.vpp_session.detect_mult *\
1025 self.vpp_session.required_min_rx / USEC_IN_SEC
1026 session_timeout = time.time() + detection_time
1027 while time.time() < session_timeout:
1028 self.assert_equal(len(self.vapi.collect_events()), 0,
1029 "number of bfd events")
1030 wait_for_bfd_packet(self)
1031 self.test_session.send_packet()
1032 wait_for_bfd_packet(self)
1033 self.test_session.send_packet()
1034 e = self.vapi.collect_events()
1035 # session should be down now, because the sequence numbers weren't
1037 self.assert_equal(len(e), 1, "number of bfd events")
1038 verify_event(self, e[0], expected_state=BFDState.down)
1040 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1041 legitimate_test_session,
1043 rogue_bfd_values=None):
1044 """ execute a rogue session interaction scenario
1046 1. create vpp session, add config
1047 2. bring the legitimate session up
1048 3. copy the bfd values from legitimate session to rogue session
1049 4. apply rogue_bfd_values to rogue session
1050 5. set rogue session state to down
1051 6. send message to take the session down from the rogue session
1052 7. assert that the legitimate session is unaffected
1055 self.vpp_session = vpp_bfd_udp_session
1056 self.vpp_session.add_vpp_config()
1057 self.vpp_session.admin_up()
1058 self.test_session = legitimate_test_session
1059 # bring vpp session up
1060 bfd_session_up(self)
1061 # send packet from rogue session
1062 rogue_test_session.update(
1063 my_discriminator=self.test_session.my_discriminator,
1064 your_discriminator=self.test_session.your_discriminator,
1065 desired_min_tx=self.test_session.desired_min_tx,
1066 required_min_rx=self.test_session.required_min_rx,
1067 detect_mult=self.test_session.detect_mult,
1068 diag=self.test_session.diag,
1069 state=self.test_session.state,
1070 auth_type=self.test_session.auth_type)
1071 if rogue_bfd_values:
1072 rogue_test_session.update(**rogue_bfd_values)
1073 rogue_test_session.update(state=BFDState.down)
1074 rogue_test_session.send_packet()
1075 wait_for_bfd_packet(self)
1076 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1078 def test_mismatch_auth(self):
1079 """ session is not brought down by unauthenticated msg """
1080 key = self.factory.create_random_key(self)
1081 key.add_vpp_config()
1082 vpp_session = VppBFDUDPSession(
1083 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1084 legitimate_test_session = BFDTestSession(
1085 self, self.pg0, AF_INET, sha1_key=key,
1086 bfd_key_id=vpp_session.bfd_key_id)
1087 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1088 self.execute_rogue_session_scenario(vpp_session,
1089 legitimate_test_session,
1092 def test_mismatch_bfd_key_id(self):
1093 """ session is not brought down by msg with non-existent key-id """
1094 key = self.factory.create_random_key(self)
1095 key.add_vpp_config()
1096 vpp_session = VppBFDUDPSession(
1097 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1098 # pick a different random bfd key id
1100 while x == vpp_session.bfd_key_id:
1102 legitimate_test_session = BFDTestSession(
1103 self, self.pg0, AF_INET, sha1_key=key,
1104 bfd_key_id=vpp_session.bfd_key_id)
1105 rogue_test_session = BFDTestSession(
1106 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1107 self.execute_rogue_session_scenario(vpp_session,
1108 legitimate_test_session,
1111 def test_mismatched_auth_type(self):
1112 """ session is not brought down by msg with wrong auth type """
1113 key = self.factory.create_random_key(self)
1114 key.add_vpp_config()
1115 vpp_session = VppBFDUDPSession(
1116 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1117 legitimate_test_session = BFDTestSession(
1118 self, self.pg0, AF_INET, sha1_key=key,
1119 bfd_key_id=vpp_session.bfd_key_id)
1120 rogue_test_session = BFDTestSession(
1121 self, self.pg0, AF_INET, sha1_key=key,
1122 bfd_key_id=vpp_session.bfd_key_id)
1123 self.execute_rogue_session_scenario(
1124 vpp_session, legitimate_test_session, rogue_test_session,
1125 {'auth_type': BFDAuthType.keyed_md5})
1127 def test_restart(self):
1128 """ simulate remote peer restart and resynchronization """
1129 key = self.factory.create_random_key(
1130 self, BFDAuthType.meticulous_keyed_sha1)
1131 key.add_vpp_config()
1132 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1133 self.pg0.remote_ip4, sha1_key=key)
1134 self.vpp_session.add_vpp_config()
1135 self.vpp_session.admin_up()
1136 self.test_session = BFDTestSession(
1137 self, self.pg0, AF_INET, sha1_key=key,
1138 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1139 bfd_session_up(self)
1140 # don't send any packets for 2*detection_time
1141 detection_time = self.vpp_session.detect_mult *\
1142 self.vpp_session.required_min_rx / USEC_IN_SEC
1143 self.sleep(detection_time, "simulating peer restart")
1144 events = self.vapi.collect_events()
1145 self.assert_equal(len(events), 1, "number of bfd events")
1146 verify_event(self, events[0], expected_state=BFDState.down)
1147 self.test_session.update(state=BFDState.down)
1148 # reset sequence number
1149 self.test_session.our_seq_number = 0
1150 self.test_session.vpp_seq_number = None
1151 # now throw away any pending packets
1152 self.pg0.enable_capture()
1153 bfd_session_up(self)
1156 class BFDAuthOnOffTestCase(VppTestCase):
1157 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1164 def setUpClass(cls):
1165 super(BFDAuthOnOffTestCase, cls).setUpClass()
1167 cls.create_pg_interfaces([0])
1168 cls.pg0.config_ip4()
1170 cls.pg0.resolve_arp()
1173 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1177 super(BFDAuthOnOffTestCase, self).setUp()
1178 self.factory = AuthKeyFactory()
1179 self.vapi.want_bfd_events()
1180 self.pg0.enable_capture()
1183 if not self.vpp_dead:
1184 self.vapi.want_bfd_events(enable_disable=0)
1185 self.vapi.collect_events() # clear the event queue
1186 super(BFDAuthOnOffTestCase, self).tearDown()
1188 def test_auth_on_immediate(self):
1189 """ turn auth on without disturbing session state (immediate) """
1190 key = self.factory.create_random_key(self)
1191 key.add_vpp_config()
1192 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1193 self.pg0.remote_ip4)
1194 self.vpp_session.add_vpp_config()
1195 self.vpp_session.admin_up()
1196 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1197 bfd_session_up(self)
1198 for dummy in range(self.test_session.detect_mult*2):
1199 p = wait_for_bfd_packet(self)
1200 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1201 self.test_session.send_packet()
1202 self.vpp_session.activate_auth(key)
1203 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1204 self.test_session.sha1_key = key
1205 for dummy in range(self.test_session.detect_mult*2):
1206 p = wait_for_bfd_packet(self)
1207 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1208 self.test_session.send_packet()
1209 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1210 self.assert_equal(len(self.vapi.collect_events()), 0,
1211 "number of bfd events")
1213 def test_auth_off_immediate(self):
1214 """ turn auth off without disturbing session state (immediate) """
1215 key = self.factory.create_random_key(self)
1216 key.add_vpp_config()
1217 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1218 self.pg0.remote_ip4, sha1_key=key)
1219 self.vpp_session.add_vpp_config()
1220 self.vpp_session.admin_up()
1221 self.test_session = BFDTestSession(
1222 self, self.pg0, AF_INET, sha1_key=key,
1223 bfd_key_id=self.vpp_session.bfd_key_id)
1224 bfd_session_up(self)
1225 # self.vapi.want_bfd_events(enable_disable=0)
1226 for dummy in range(self.test_session.detect_mult*2):
1227 p = wait_for_bfd_packet(self)
1228 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1229 self.test_session.inc_seq_num()
1230 self.test_session.send_packet()
1231 self.vpp_session.deactivate_auth()
1232 self.test_session.bfd_key_id = None
1233 self.test_session.sha1_key = None
1234 for dummy in range(self.test_session.detect_mult*2):
1235 p = wait_for_bfd_packet(self)
1236 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1237 self.test_session.inc_seq_num()
1238 self.test_session.send_packet()
1239 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1240 self.assert_equal(len(self.vapi.collect_events()), 0,
1241 "number of bfd events")
1243 def test_auth_change_key_immediate(self):
1244 """ change auth key without disturbing session state (immediate) """
1245 key1 = self.factory.create_random_key(self)
1246 key1.add_vpp_config()
1247 key2 = self.factory.create_random_key(self)
1248 key2.add_vpp_config()
1249 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1250 self.pg0.remote_ip4, sha1_key=key1)
1251 self.vpp_session.add_vpp_config()
1252 self.vpp_session.admin_up()
1253 self.test_session = BFDTestSession(
1254 self, self.pg0, AF_INET, sha1_key=key1,
1255 bfd_key_id=self.vpp_session.bfd_key_id)
1256 bfd_session_up(self)
1257 for dummy in range(self.test_session.detect_mult*2):
1258 p = wait_for_bfd_packet(self)
1259 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1260 self.test_session.send_packet()
1261 self.vpp_session.activate_auth(key2)
1262 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1263 self.test_session.sha1_key = key2
1264 for dummy in range(self.test_session.detect_mult*2):
1265 p = wait_for_bfd_packet(self)
1266 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1267 self.test_session.send_packet()
1268 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1269 self.assert_equal(len(self.vapi.collect_events()), 0,
1270 "number of bfd events")
1272 def test_auth_on_delayed(self):
1273 """ turn auth on without disturbing session state (delayed) """
1274 key = self.factory.create_random_key(self)
1275 key.add_vpp_config()
1276 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1277 self.pg0.remote_ip4)
1278 self.vpp_session.add_vpp_config()
1279 self.vpp_session.admin_up()
1280 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1281 bfd_session_up(self)
1282 for dummy in range(self.test_session.detect_mult*2):
1283 wait_for_bfd_packet(self)
1284 self.test_session.send_packet()
1285 self.vpp_session.activate_auth(key, delayed=True)
1286 for dummy in range(self.test_session.detect_mult*2):
1287 p = wait_for_bfd_packet(self)
1288 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1289 self.test_session.send_packet()
1290 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1291 self.test_session.sha1_key = key
1292 self.test_session.send_packet()
1293 for dummy in range(self.test_session.detect_mult*2):
1294 p = wait_for_bfd_packet(self)
1295 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1296 self.test_session.send_packet()
1297 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1298 self.assert_equal(len(self.vapi.collect_events()), 0,
1299 "number of bfd events")
1301 def test_auth_off_delayed(self):
1302 """ turn auth off without disturbing session state (delayed) """
1303 key = self.factory.create_random_key(self)
1304 key.add_vpp_config()
1305 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1306 self.pg0.remote_ip4, sha1_key=key)
1307 self.vpp_session.add_vpp_config()
1308 self.vpp_session.admin_up()
1309 self.test_session = BFDTestSession(
1310 self, self.pg0, AF_INET, sha1_key=key,
1311 bfd_key_id=self.vpp_session.bfd_key_id)
1312 bfd_session_up(self)
1313 for dummy in range(self.test_session.detect_mult*2):
1314 p = wait_for_bfd_packet(self)
1315 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1316 self.test_session.send_packet()
1317 self.vpp_session.deactivate_auth(delayed=True)
1318 for dummy in range(self.test_session.detect_mult*2):
1319 p = wait_for_bfd_packet(self)
1320 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1321 self.test_session.send_packet()
1322 self.test_session.bfd_key_id = None
1323 self.test_session.sha1_key = None
1324 self.test_session.send_packet()
1325 for dummy in range(self.test_session.detect_mult*2):
1326 p = wait_for_bfd_packet(self)
1327 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1328 self.test_session.send_packet()
1329 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1330 self.assert_equal(len(self.vapi.collect_events()), 0,
1331 "number of bfd events")
1333 def test_auth_change_key_delayed(self):
1334 """ change auth key without disturbing session state (delayed) """
1335 key1 = self.factory.create_random_key(self)
1336 key1.add_vpp_config()
1337 key2 = self.factory.create_random_key(self)
1338 key2.add_vpp_config()
1339 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1340 self.pg0.remote_ip4, sha1_key=key1)
1341 self.vpp_session.add_vpp_config()
1342 self.vpp_session.admin_up()
1343 self.test_session = BFDTestSession(
1344 self, self.pg0, AF_INET, sha1_key=key1,
1345 bfd_key_id=self.vpp_session.bfd_key_id)
1346 bfd_session_up(self)
1347 for dummy in range(self.test_session.detect_mult*2):
1348 p = wait_for_bfd_packet(self)
1349 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1350 self.test_session.send_packet()
1351 self.vpp_session.activate_auth(key2, delayed=True)
1352 for dummy in range(self.test_session.detect_mult*2):
1353 p = wait_for_bfd_packet(self)
1354 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1355 self.test_session.send_packet()
1356 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1357 self.test_session.sha1_key = key2
1358 self.test_session.send_packet()
1359 for dummy in range(self.test_session.detect_mult*2):
1360 p = wait_for_bfd_packet(self)
1361 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1362 self.test_session.send_packet()
1363 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1364 self.assert_equal(len(self.vapi.collect_events()), 0,
1365 "number of bfd events")
1367 if __name__ == '__main__':
1368 unittest.main(testRunner=VppTestRunner)