4 from __future__ import division
9 from random import randint, shuffle
10 from socket import AF_INET, AF_INET6
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import UDP, IP
14 from scapy.layers.inet6 import IPv6
15 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 from framework import VppTestCase, VppTestRunner
18 from vpp_pg_interface import CaptureTimeoutError
24 class AuthKeyFactory(object):
25 """Factory class for creating auth keys with unique conf key ID"""
28 self._conf_key_ids = {}
30 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
31 """ create a random key with unique conf key id """
32 conf_key_id = randint(0, 0xFFFFFFFF)
33 while conf_key_id in self._conf_key_ids:
34 conf_key_id = randint(0, 0xFFFFFFFF)
35 self._conf_key_ids[conf_key_id] = 1
36 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
37 return VppBFDAuthKey(test=test, auth_type=auth_type,
38 conf_key_id=conf_key_id, key=key)
41 class BFDAPITestCase(VppTestCase):
42 """Bidirectional Forwarding Detection (BFD) - API"""
49 super(BFDAPITestCase, cls).setUpClass()
52 cls.create_pg_interfaces(range(2))
53 for i in cls.pg_interfaces:
59 super(BFDAPITestCase, cls).tearDownClass()
63 super(BFDAPITestCase, self).setUp()
64 self.factory = AuthKeyFactory()
66 def test_add_bfd(self):
67 """ create a BFD session """
68 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
69 session.add_vpp_config()
70 self.logger.debug("Session state is %s", session.state)
71 session.remove_vpp_config()
72 session.add_vpp_config()
73 self.logger.debug("Session state is %s", session.state)
74 session.remove_vpp_config()
76 def test_double_add(self):
77 """ create the same BFD session twice (negative case) """
78 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
79 session.add_vpp_config()
81 with self.vapi.expect_negative_api_retval():
82 session.add_vpp_config()
84 session.remove_vpp_config()
86 def test_add_bfd6(self):
87 """ create IPv6 BFD session """
88 session = VppBFDUDPSession(
89 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
90 session.add_vpp_config()
91 self.logger.debug("Session state is %s", session.state)
92 session.remove_vpp_config()
93 session.add_vpp_config()
94 self.logger.debug("Session state is %s", session.state)
95 session.remove_vpp_config()
97 def test_mod_bfd(self):
98 """ modify BFD session parameters """
99 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
100 desired_min_tx=50000,
101 required_min_rx=10000,
103 session.add_vpp_config()
104 s = session.get_bfd_udp_session_dump_entry()
105 self.assert_equal(session.desired_min_tx,
107 "desired min transmit interval")
108 self.assert_equal(session.required_min_rx,
110 "required min receive interval")
111 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
112 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
113 required_min_rx=session.required_min_rx * 2,
114 detect_mult=session.detect_mult * 2)
115 s = session.get_bfd_udp_session_dump_entry()
116 self.assert_equal(session.desired_min_tx,
118 "desired min transmit interval")
119 self.assert_equal(session.required_min_rx,
121 "required min receive interval")
122 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
124 def test_add_sha1_keys(self):
125 """ add SHA1 keys """
127 keys = [self.factory.create_random_key(
128 self) for i in range(0, key_count)]
130 self.assertFalse(key.query_vpp_config())
134 self.assertTrue(key.query_vpp_config())
136 indexes = range(key_count)
141 key.remove_vpp_config()
143 for j in range(key_count):
146 self.assertFalse(key.query_vpp_config())
148 self.assertTrue(key.query_vpp_config())
149 # should be removed now
151 self.assertFalse(key.query_vpp_config())
152 # add back and remove again
156 self.assertTrue(key.query_vpp_config())
158 key.remove_vpp_config()
160 self.assertFalse(key.query_vpp_config())
162 def test_add_bfd_sha1(self):
163 """ create a BFD session (SHA1) """
164 key = self.factory.create_random_key(self)
166 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
168 session.add_vpp_config()
169 self.logger.debug("Session state is %s", session.state)
170 session.remove_vpp_config()
171 session.add_vpp_config()
172 self.logger.debug("Session state is %s", session.state)
173 session.remove_vpp_config()
175 def test_double_add_sha1(self):
176 """ create the same BFD session twice (negative case) (SHA1) """
177 key = self.factory.create_random_key(self)
179 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
181 session.add_vpp_config()
182 with self.assertRaises(Exception):
183 session.add_vpp_config()
185 def test_add_auth_nonexistent_key(self):
186 """ create BFD session using non-existent SHA1 (negative case) """
187 session = VppBFDUDPSession(
188 self, self.pg0, self.pg0.remote_ip4,
189 sha1_key=self.factory.create_random_key(self))
190 with self.assertRaises(Exception):
191 session.add_vpp_config()
193 def test_shared_sha1_key(self):
194 """ share single SHA1 key between multiple BFD sessions """
195 key = self.factory.create_random_key(self)
198 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
200 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
201 sha1_key=key, af=AF_INET6),
202 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
204 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
205 sha1_key=key, af=AF_INET6)]
210 e = key.get_bfd_auth_keys_dump_entry()
211 self.assert_equal(e.use_count, len(sessions) - removed,
212 "Use count for shared key")
213 s.remove_vpp_config()
215 e = key.get_bfd_auth_keys_dump_entry()
216 self.assert_equal(e.use_count, len(sessions) - removed,
217 "Use count for shared key")
219 def test_activate_auth(self):
220 """ activate SHA1 authentication """
221 key = self.factory.create_random_key(self)
223 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
224 session.add_vpp_config()
225 session.activate_auth(key)
227 def test_deactivate_auth(self):
228 """ deactivate SHA1 authentication """
229 key = self.factory.create_random_key(self)
231 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
232 session.add_vpp_config()
233 session.activate_auth(key)
234 session.deactivate_auth()
236 def test_change_key(self):
237 """ change SHA1 key """
238 key1 = self.factory.create_random_key(self)
239 key2 = self.factory.create_random_key(self)
240 while key2.conf_key_id == key1.conf_key_id:
241 key2 = self.factory.create_random_key(self)
242 key1.add_vpp_config()
243 key2.add_vpp_config()
244 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
246 session.add_vpp_config()
247 session.activate_auth(key2)
250 class BFDTestSession(object):
251 """ BFD session as seen from test framework side """
253 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
254 bfd_key_id=None, our_seq_number=None):
257 self.sha1_key = sha1_key
258 self.bfd_key_id = bfd_key_id
259 self.interface = interface
260 self.udp_sport = randint(49152, 65535)
261 if our_seq_number is None:
262 self.our_seq_number = randint(0, 40000000)
264 self.our_seq_number = our_seq_number
265 self.vpp_seq_number = None
266 self.my_discriminator = 0
267 self.desired_min_tx = 100000
268 self.required_min_rx = 100000
269 self.detect_mult = detect_mult
270 self.diag = BFDDiagCode.no_diagnostic
271 self.your_discriminator = None
272 self.state = BFDState.down
273 self.auth_type = BFDAuthType.no_auth
275 def inc_seq_num(self):
276 """ increment sequence number, wrapping if needed """
277 if self.our_seq_number == 0xFFFFFFFF:
278 self.our_seq_number = 0
280 self.our_seq_number += 1
282 def update(self, my_discriminator=None, your_discriminator=None,
283 desired_min_tx=None, required_min_rx=None, detect_mult=None,
284 diag=None, state=None, auth_type=None):
285 """ update BFD parameters associated with session """
287 self.my_discriminator = my_discriminator
288 if your_discriminator:
289 self.your_discriminator = your_discriminator
291 self.required_min_rx = required_min_rx
293 self.desired_min_tx = desired_min_tx
295 self.detect_mult = detect_mult
301 self.auth_type = auth_type
303 def fill_packet_fields(self, packet):
304 """ set packet fields with known values in packet """
306 if self.my_discriminator:
307 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
308 self.my_discriminator)
309 bfd.my_discriminator = self.my_discriminator
310 if self.your_discriminator:
311 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
312 self.your_discriminator)
313 bfd.your_discriminator = self.your_discriminator
314 if self.required_min_rx:
315 self.test.logger.debug(
316 "BFD: setting packet.required_min_rx_interval=%s",
317 self.required_min_rx)
318 bfd.required_min_rx_interval = self.required_min_rx
319 if self.desired_min_tx:
320 self.test.logger.debug(
321 "BFD: setting packet.desired_min_tx_interval=%s",
323 bfd.desired_min_tx_interval = self.desired_min_tx
325 self.test.logger.debug(
326 "BFD: setting packet.detect_mult=%s", self.detect_mult)
327 bfd.detect_mult = self.detect_mult
329 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
332 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
333 bfd.state = self.state
335 # this is used by a negative test-case
336 self.test.logger.debug("BFD: setting packet.auth_type=%s",
338 bfd.auth_type = self.auth_type
340 def create_packet(self):
341 """ create a BFD packet, reflecting the current state of session """
344 bfd.auth_type = self.sha1_key.auth_type
345 bfd.auth_len = BFD.sha1_auth_len
346 bfd.auth_key_id = self.bfd_key_id
347 bfd.auth_seq_num = self.our_seq_number
348 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
351 if self.af == AF_INET6:
352 packet = (Ether(src=self.interface.remote_mac,
353 dst=self.interface.local_mac) /
354 IPv6(src=self.interface.remote_ip6,
355 dst=self.interface.local_ip6,
357 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
360 packet = (Ether(src=self.interface.remote_mac,
361 dst=self.interface.local_mac) /
362 IP(src=self.interface.remote_ip4,
363 dst=self.interface.local_ip4,
365 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
367 self.test.logger.debug("BFD: Creating packet")
368 self.fill_packet_fields(packet)
370 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
371 "\0" * (20 - len(self.sha1_key.key))
372 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
373 hashlib.sha1(hash_material).hexdigest())
374 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
377 def send_packet(self, packet=None, interface=None):
378 """ send packet on interface, creating the packet if needed """
380 packet = self.create_packet()
381 if interface is None:
382 interface = self.test.pg0
383 self.test.logger.debug(ppp("Sending packet:", packet))
384 interface.add_stream(packet)
387 def verify_sha1_auth(self, packet):
388 """ Verify correctness of authentication in BFD layer. """
390 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
391 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
393 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
394 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
395 if self.vpp_seq_number is None:
396 self.vpp_seq_number = bfd.auth_seq_num
397 self.test.logger.debug("Received initial sequence number: %s" %
400 recvd_seq_num = bfd.auth_seq_num
401 self.test.logger.debug("Received followup sequence number: %s" %
403 if self.vpp_seq_number < 0xffffffff:
404 if self.sha1_key.auth_type == \
405 BFDAuthType.meticulous_keyed_sha1:
406 self.test.assert_equal(recvd_seq_num,
407 self.vpp_seq_number + 1,
408 "BFD sequence number")
410 self.test.assert_in_range(recvd_seq_num,
412 self.vpp_seq_number + 1,
413 "BFD sequence number")
415 if self.sha1_key.auth_type == \
416 BFDAuthType.meticulous_keyed_sha1:
417 self.test.assert_equal(recvd_seq_num, 0,
418 "BFD sequence number")
420 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
421 "BFD sequence number not one of "
422 "(%s, 0)" % self.vpp_seq_number)
423 self.vpp_seq_number = recvd_seq_num
424 # last 20 bytes represent the hash - so replace them with the key,
425 # pad the result with zeros and hash the result
426 hash_material = bfd.original[:-20] + self.sha1_key.key + \
427 "\0" * (20 - len(self.sha1_key.key))
428 expected_hash = hashlib.sha1(hash_material).hexdigest()
429 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
430 expected_hash, "Auth key hash")
432 def verify_bfd(self, packet):
433 """ Verify correctness of BFD layer. """
435 self.test.assert_equal(bfd.version, 1, "BFD version")
436 self.test.assert_equal(bfd.your_discriminator,
437 self.my_discriminator,
438 "BFD - your discriminator")
440 self.verify_sha1_auth(packet)
443 def bfd_session_up(test):
444 """ Bring BFD session up """
445 test.logger.info("BFD: Waiting for slow hello")
446 p = wait_for_bfd_packet(test, 2)
448 if hasattr(test, 'vpp_clock_offset'):
449 old_offset = test.vpp_clock_offset
450 test.vpp_clock_offset = time.time() - p.time
451 test.logger.debug("BFD: Calculated vpp clock offset: %s",
452 test.vpp_clock_offset)
454 test.assertAlmostEqual(
455 old_offset, test.vpp_clock_offset, delta=0.1,
456 msg="vpp clock offset not stable (new: %s, old: %s)" %
457 (test.vpp_clock_offset, old_offset))
458 test.logger.info("BFD: Sending Init")
459 test.test_session.update(my_discriminator=randint(0, 40000000),
460 your_discriminator=p[BFD].my_discriminator,
462 test.test_session.send_packet()
463 test.logger.info("BFD: Waiting for event")
464 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
465 verify_event(test, e, expected_state=BFDState.up)
466 test.logger.info("BFD: Session is Up")
467 test.test_session.update(state=BFDState.up)
468 test.test_session.send_packet()
469 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
472 def bfd_session_down(test):
473 """ Bring BFD session down """
474 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
475 test.test_session.update(state=BFDState.down)
476 test.test_session.send_packet()
477 test.logger.info("BFD: Waiting for event")
478 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
479 verify_event(test, e, expected_state=BFDState.down)
480 test.logger.info("BFD: Session is Down")
481 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
484 def verify_ip(test, packet):
485 """ Verify correctness of IP layer. """
486 if test.vpp_session.af == AF_INET6:
488 local_ip = test.pg0.local_ip6
489 remote_ip = test.pg0.remote_ip6
490 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
493 local_ip = test.pg0.local_ip4
494 remote_ip = test.pg0.remote_ip4
495 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
496 test.assert_equal(ip.src, local_ip, "IP source address")
497 test.assert_equal(ip.dst, remote_ip, "IP destination address")
500 def verify_udp(test, packet):
501 """ Verify correctness of UDP layer. """
503 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
504 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
508 def verify_event(test, event, expected_state):
509 """ Verify correctness of event values. """
511 test.logger.debug("BFD: Event: %s" % repr(e))
512 test.assert_equal(e.sw_if_index,
513 test.vpp_session.interface.sw_if_index,
514 "BFD interface index")
516 if test.vpp_session.af == AF_INET6:
518 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
519 if test.vpp_session.af == AF_INET:
520 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
521 "Local IPv4 address")
522 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
525 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
526 "Local IPv6 address")
527 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
529 test.assert_equal(e.state, expected_state, BFDState)
532 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
533 """ wait for BFD packet and verify its correctness
535 :param timeout: how long to wait
536 :param pcap_time_min: ignore packets with pcap timestamp lower than this
538 :returns: tuple (packet, time spent waiting for packet)
540 test.logger.info("BFD: Waiting for BFD packet")
541 deadline = time.time() + timeout
546 test.assert_in_range(counter, 0, 100, "number of packets ignored")
547 time_left = deadline - time.time()
549 raise CaptureTimeoutError("Packet did not arrive within timeout")
550 p = test.pg0.wait_for_packet(timeout=time_left)
551 test.logger.debug(ppp("BFD: Got packet:", p))
552 if pcap_time_min is not None and p.time < pcap_time_min:
553 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
554 "pcap time min %s):" %
555 (p.time, pcap_time_min), p))
560 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
562 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
565 test.test_session.verify_bfd(p)
569 class BFD4TestCase(VppTestCase):
570 """Bidirectional Forwarding Detection (BFD)"""
573 vpp_clock_offset = None
579 super(BFD4TestCase, cls).setUpClass()
581 cls.create_pg_interfaces([0])
583 cls.pg0.configure_ipv4_neighbors()
585 cls.pg0.resolve_arp()
588 super(BFD4TestCase, cls).tearDownClass()
592 super(BFD4TestCase, self).setUp()
593 self.factory = AuthKeyFactory()
594 self.vapi.want_bfd_events()
595 self.pg0.enable_capture()
597 self.vpp_session = VppBFDUDPSession(self, self.pg0,
599 self.vpp_session.add_vpp_config()
600 self.vpp_session.admin_up()
601 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
603 self.vapi.want_bfd_events(enable_disable=0)
607 if not self.vpp_dead:
608 self.vapi.want_bfd_events(enable_disable=0)
609 self.vapi.collect_events() # clear the event queue
610 super(BFD4TestCase, self).tearDown()
612 def test_session_up(self):
613 """ bring BFD session up """
616 def test_session_down(self):
617 """ bring BFD session down """
619 bfd_session_down(self)
621 def test_hold_up(self):
622 """ hold BFD session up """
624 for dummy in range(self.test_session.detect_mult * 2):
625 wait_for_bfd_packet(self)
626 self.test_session.send_packet()
627 self.assert_equal(len(self.vapi.collect_events()), 0,
628 "number of bfd events")
630 def test_slow_timer(self):
631 """ verify slow periodic control frames while session down """
633 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
634 prev_packet = wait_for_bfd_packet(self, 2)
635 for dummy in range(packet_count):
636 next_packet = wait_for_bfd_packet(self, 2)
637 time_diff = next_packet.time - prev_packet.time
638 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
639 # to work around timing issues
640 self.assert_in_range(
641 time_diff, 0.70, 1.05, "time between slow packets")
642 prev_packet = next_packet
644 def test_zero_remote_min_rx(self):
645 """ no packets when zero remote required min rx interval """
647 self.test_session.update(required_min_rx=0)
648 self.test_session.send_packet()
649 cap = 2 * self.vpp_session.desired_min_tx *\
650 self.test_session.detect_mult
651 time_mark = time.time()
653 # busy wait here, trying to collect a packet or event, vpp is not
654 # allowed to send packets and the session will timeout first - so the
655 # Up->Down event must arrive before any packets do
656 while time.time() < time_mark + cap / USEC_IN_SEC:
658 p = wait_for_bfd_packet(
660 pcap_time_min=time_mark - self.vpp_clock_offset)
661 self.logger.error(ppp("Received unexpected packet:", p))
663 except CaptureTimeoutError:
665 events = self.vapi.collect_events()
667 verify_event(self, events[0], BFDState.down)
669 self.assert_equal(count, 0, "number of packets received")
671 def test_conn_down(self):
672 """ verify session goes down after inactivity """
674 detection_time = self.vpp_session.detect_mult *\
675 self.vpp_session.required_min_rx / USEC_IN_SEC
676 self.sleep(detection_time, "waiting for BFD session time-out")
677 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
678 verify_event(self, e, expected_state=BFDState.down)
680 def test_large_required_min_rx(self):
681 """ large remote required min rx interval """
683 p = wait_for_bfd_packet(self)
685 self.test_session.update(required_min_rx=interval)
686 self.test_session.send_packet()
687 time_mark = time.time()
689 # busy wait here, trying to collect a packet or event, vpp is not
690 # allowed to send packets and the session will timeout first - so the
691 # Up->Down event must arrive before any packets do
692 while time.time() < time_mark + interval / USEC_IN_SEC:
694 p = wait_for_bfd_packet(self, timeout=0)
695 # if vpp managed to send a packet before we did the session
696 # session update, then that's fine, ignore it
697 if p.time < time_mark - self.vpp_clock_offset:
699 self.logger.error(ppp("Received unexpected packet:", p))
701 except CaptureTimeoutError:
703 events = self.vapi.collect_events()
705 verify_event(self, events[0], BFDState.down)
707 self.assert_equal(count, 0, "number of packets received")
709 def test_immediate_remote_min_rx_reduction(self):
710 """ immediately honor remote required min rx reduction """
711 self.vpp_session.remove_vpp_config()
712 self.vpp_session = VppBFDUDPSession(
713 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
714 self.pg0.enable_capture()
715 self.vpp_session.add_vpp_config()
716 self.test_session.update(desired_min_tx=1000000,
717 required_min_rx=1000000)
719 reference_packet = wait_for_bfd_packet(self)
720 time_mark = time.time()
722 self.test_session.update(required_min_rx=interval)
723 self.test_session.send_packet()
724 extra_time = time.time() - time_mark
725 p = wait_for_bfd_packet(self)
726 # first packet is allowed to be late by time we spent doing the update
727 # calculated in extra_time
728 self.assert_in_range(p.time - reference_packet.time,
729 .95 * 0.75 * interval / USEC_IN_SEC,
730 1.05 * interval / USEC_IN_SEC + extra_time,
731 "time between BFD packets")
733 for dummy in range(3):
734 p = wait_for_bfd_packet(self)
735 diff = p.time - reference_packet.time
736 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
737 1.05 * interval / USEC_IN_SEC,
738 "time between BFD packets")
741 def test_modify_req_min_rx_double(self):
742 """ modify session - double required min rx """
744 p = wait_for_bfd_packet(self)
745 self.test_session.update(desired_min_tx=10000,
746 required_min_rx=10000)
747 self.test_session.send_packet()
748 # double required min rx
749 self.vpp_session.modify_parameters(
750 required_min_rx=2 * self.vpp_session.required_min_rx)
751 p = wait_for_bfd_packet(
752 self, pcap_time_min=time.time() - self.vpp_clock_offset)
753 # poll bit needs to be set
754 self.assertIn("P", p.sprintf("%BFD.flags%"),
755 "Poll bit not set in BFD packet")
756 # finish poll sequence with final packet
757 final = self.test_session.create_packet()
758 final[BFD].flags = "F"
759 timeout = self.test_session.detect_mult * \
760 max(self.test_session.desired_min_tx,
761 self.vpp_session.required_min_rx) / USEC_IN_SEC
762 self.test_session.send_packet(final)
763 time_mark = time.time()
764 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
765 verify_event(self, e, expected_state=BFDState.down)
766 time_to_event = time.time() - time_mark
767 self.assert_in_range(time_to_event, .9 * timeout,
768 1.1 * timeout, "session timeout")
770 def test_modify_req_min_rx_halve(self):
771 """ modify session - halve required min rx """
772 self.vpp_session.modify_parameters(
773 required_min_rx=2 * self.vpp_session.required_min_rx)
775 p = wait_for_bfd_packet(self)
776 self.test_session.update(desired_min_tx=10000,
777 required_min_rx=10000)
778 self.test_session.send_packet()
779 p = wait_for_bfd_packet(
780 self, pcap_time_min=time.time() - self.vpp_clock_offset)
781 # halve required min rx
782 old_required_min_rx = self.vpp_session.required_min_rx
783 self.vpp_session.modify_parameters(
784 required_min_rx=0.5 * self.vpp_session.required_min_rx)
785 # now we wait 0.8*3*old-req-min-rx and the session should still be up
786 self.sleep(0.8 * self.vpp_session.detect_mult *
787 old_required_min_rx / USEC_IN_SEC)
788 self.assert_equal(len(self.vapi.collect_events()), 0,
789 "number of bfd events")
790 p = wait_for_bfd_packet(self)
791 # poll bit needs to be set
792 self.assertIn("P", p.sprintf("%BFD.flags%"),
793 "Poll bit not set in BFD packet")
794 # finish poll sequence with final packet
795 final = self.test_session.create_packet()
796 final[BFD].flags = "F"
797 self.test_session.send_packet(final)
798 # now the session should time out under new conditions
800 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
802 detection_time = self.vpp_session.detect_mult *\
803 self.vpp_session.required_min_rx / USEC_IN_SEC
804 self.assert_in_range(after - before,
805 0.9 * detection_time,
806 1.1 * detection_time,
807 "time before bfd session goes down")
808 verify_event(self, e, expected_state=BFDState.down)
810 def test_modify_detect_mult(self):
811 """ modify detect multiplier """
813 p = wait_for_bfd_packet(self)
814 self.vpp_session.modify_parameters(detect_mult=1)
815 p = wait_for_bfd_packet(
816 self, pcap_time_min=time.time() - self.vpp_clock_offset)
817 self.assert_equal(self.vpp_session.detect_mult,
820 # poll bit must not be set
821 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
822 "Poll bit not set in BFD packet")
823 self.vpp_session.modify_parameters(detect_mult=10)
824 p = wait_for_bfd_packet(
825 self, pcap_time_min=time.time() - self.vpp_clock_offset)
826 self.assert_equal(self.vpp_session.detect_mult,
829 # poll bit must not be set
830 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
831 "Poll bit not set in BFD packet")
833 def test_no_periodic_if_remote_demand(self):
834 """ no periodic frames outside poll sequence if remote demand set """
836 demand = self.test_session.create_packet()
837 demand[BFD].flags = "D"
838 self.test_session.send_packet(demand)
839 transmit_time = 0.9 \
840 * max(self.vpp_session.required_min_rx,
841 self.test_session.desired_min_tx) \
844 for dummy in range(self.test_session.detect_mult * 2):
845 time.sleep(transmit_time)
846 self.test_session.send_packet(demand)
848 p = wait_for_bfd_packet(self, timeout=0)
849 self.logger.error(ppp("Received unexpected packet:", p))
851 except CaptureTimeoutError:
853 events = self.vapi.collect_events()
855 self.logger.error("Received unexpected event: %s", e)
856 self.assert_equal(count, 0, "number of packets received")
857 self.assert_equal(len(events), 0, "number of events received")
859 def test_echo_looped_back(self):
860 """ echo packets looped back """
861 # don't need a session in this case..
862 self.vpp_session.remove_vpp_config()
863 self.pg0.enable_capture()
864 echo_packet_count = 10
865 # random source port low enough to increment a few times..
866 udp_sport_tx = randint(1, 50000)
867 udp_sport_rx = udp_sport_tx
868 echo_packet = (Ether(src=self.pg0.remote_mac,
869 dst=self.pg0.local_mac) /
870 IP(src=self.pg0.remote_ip4,
871 dst=self.pg0.local_ip4) /
872 UDP(dport=BFD.udp_dport_echo) /
873 Raw("this should be looped back"))
874 for dummy in range(echo_packet_count):
875 self.sleep(.01, "delay between echo packets")
876 echo_packet[UDP].sport = udp_sport_tx
878 self.logger.debug(ppp("Sending packet:", echo_packet))
879 self.pg0.add_stream(echo_packet)
881 for dummy in range(echo_packet_count):
882 p = self.pg0.wait_for_packet(1)
883 self.logger.debug(ppp("Got packet:", p))
885 self.assert_equal(self.pg0.remote_mac,
886 ether.dst, "Destination MAC")
887 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
889 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
890 self.assert_equal(self.pg0.local_ip4, ip.src, "Destination IP")
892 self.assert_equal(udp.dport, BFD.udp_dport_echo,
893 "UDP destination port")
894 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
896 self.assertTrue(p.haslayer(Raw) and p[Raw] == echo_packet[Raw],
897 "Received packet is not the echo packet sent")
898 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
899 "ECHO packet identifier for test purposes)")
901 def test_admin_up_down(self):
903 self.vpp_session.admin_down()
904 self.pg0.enable_capture()
905 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
906 verify_event(self, e, expected_state=BFDState.admin_down)
907 for dummy in range(2):
908 p = wait_for_bfd_packet(self)
909 self.assert_equal(BFDState.admin_down, p[BFD].state, BFDState)
910 # try to bring session up - shouldn't be possible
911 self.test_session.update(state=BFDState.init)
912 self.test_session.send_packet()
913 for dummy in range(2):
914 p = wait_for_bfd_packet(self)
915 self.assert_equal(BFDState.admin_down, p[BFD].state, BFDState)
916 self.vpp_session.admin_up()
917 self.test_session.update(state=BFDState.down)
918 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
919 verify_event(self, e, expected_state=BFDState.down)
920 p = wait_for_bfd_packet(self)
921 self.assert_equal(BFDState.down, p[BFD].state, BFDState)
922 self.test_session.send_packet()
923 p = wait_for_bfd_packet(self)
924 self.assert_equal(BFDState.init, p[BFD].state, BFDState)
925 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
926 verify_event(self, e, expected_state=BFDState.init)
927 self.test_session.update(state=BFDState.up)
928 self.test_session.send_packet()
929 p = wait_for_bfd_packet(self)
930 self.assert_equal(BFDState.up, p[BFD].state, BFDState)
931 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
932 verify_event(self, e, expected_state=BFDState.up)
935 class BFD6TestCase(VppTestCase):
936 """Bidirectional Forwarding Detection (BFD) (IPv6) """
939 vpp_clock_offset = None
945 super(BFD6TestCase, cls).setUpClass()
947 cls.create_pg_interfaces([0])
949 cls.pg0.configure_ipv6_neighbors()
951 cls.pg0.resolve_ndp()
954 super(BFD6TestCase, cls).tearDownClass()
958 super(BFD6TestCase, self).setUp()
959 self.factory = AuthKeyFactory()
960 self.vapi.want_bfd_events()
961 self.pg0.enable_capture()
963 self.vpp_session = VppBFDUDPSession(self, self.pg0,
966 self.vpp_session.add_vpp_config()
967 self.vpp_session.admin_up()
968 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
969 self.logger.debug(self.vapi.cli("show adj nbr"))
971 self.vapi.want_bfd_events(enable_disable=0)
975 if not self.vpp_dead:
976 self.vapi.want_bfd_events(enable_disable=0)
977 self.vapi.collect_events() # clear the event queue
978 super(BFD6TestCase, self).tearDown()
980 def test_session_up(self):
981 """ bring BFD session up """
984 def test_hold_up(self):
985 """ hold BFD session up """
987 for dummy in range(self.test_session.detect_mult * 2):
988 wait_for_bfd_packet(self)
989 self.test_session.send_packet()
990 self.assert_equal(len(self.vapi.collect_events()), 0,
991 "number of bfd events")
992 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
994 def test_echo_looped_back(self):
995 """ echo packets looped back """
996 # don't need a session in this case..
997 self.vpp_session.remove_vpp_config()
998 self.pg0.enable_capture()
999 echo_packet_count = 10
1000 # random source port low enough to increment a few times..
1001 udp_sport_tx = randint(1, 50000)
1002 udp_sport_rx = udp_sport_tx
1003 echo_packet = (Ether(src=self.pg0.remote_mac,
1004 dst=self.pg0.local_mac) /
1005 IPv6(src=self.pg0.remote_ip6,
1006 dst=self.pg0.local_ip6) /
1007 UDP(dport=BFD.udp_dport_echo) /
1008 Raw("this should be looped back"))
1009 for dummy in range(echo_packet_count):
1010 self.sleep(.01, "delay between echo packets")
1011 echo_packet[UDP].sport = udp_sport_tx
1013 self.logger.debug(ppp("Sending packet:", echo_packet))
1014 self.pg0.add_stream(echo_packet)
1016 for dummy in range(echo_packet_count):
1017 p = self.pg0.wait_for_packet(1)
1018 self.logger.debug(ppp("Got packet:", p))
1020 self.assert_equal(self.pg0.remote_mac,
1021 ether.dst, "Destination MAC")
1022 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1024 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1025 self.assert_equal(self.pg0.local_ip6, ip.src, "Destination IP")
1027 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1028 "UDP destination port")
1029 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1031 self.assertTrue(p.haslayer(Raw) and p[Raw] == echo_packet[Raw],
1032 "Received packet is not the echo packet sent")
1033 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1034 "ECHO packet identifier for test purposes)")
1037 class BFDSHA1TestCase(VppTestCase):
1038 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1041 vpp_clock_offset = None
1046 def setUpClass(cls):
1047 super(BFDSHA1TestCase, cls).setUpClass()
1049 cls.create_pg_interfaces([0])
1050 cls.pg0.config_ip4()
1052 cls.pg0.resolve_arp()
1055 super(BFDSHA1TestCase, cls).tearDownClass()
1059 super(BFDSHA1TestCase, self).setUp()
1060 self.factory = AuthKeyFactory()
1061 self.vapi.want_bfd_events()
1062 self.pg0.enable_capture()
1065 if not self.vpp_dead:
1066 self.vapi.want_bfd_events(enable_disable=0)
1067 self.vapi.collect_events() # clear the event queue
1068 super(BFDSHA1TestCase, self).tearDown()
1070 def test_session_up(self):
1071 """ bring BFD session up """
1072 key = self.factory.create_random_key(self)
1073 key.add_vpp_config()
1074 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1075 self.pg0.remote_ip4,
1077 self.vpp_session.add_vpp_config()
1078 self.vpp_session.admin_up()
1079 self.test_session = BFDTestSession(
1080 self, self.pg0, AF_INET, sha1_key=key,
1081 bfd_key_id=self.vpp_session.bfd_key_id)
1082 bfd_session_up(self)
1084 def test_hold_up(self):
1085 """ hold BFD session up """
1086 key = self.factory.create_random_key(self)
1087 key.add_vpp_config()
1088 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1089 self.pg0.remote_ip4,
1091 self.vpp_session.add_vpp_config()
1092 self.vpp_session.admin_up()
1093 self.test_session = BFDTestSession(
1094 self, self.pg0, AF_INET, sha1_key=key,
1095 bfd_key_id=self.vpp_session.bfd_key_id)
1096 bfd_session_up(self)
1097 for dummy in range(self.test_session.detect_mult * 2):
1098 wait_for_bfd_packet(self)
1099 self.test_session.send_packet()
1100 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1102 def test_hold_up_meticulous(self):
1103 """ hold BFD session up - meticulous auth """
1104 key = self.factory.create_random_key(
1105 self, BFDAuthType.meticulous_keyed_sha1)
1106 key.add_vpp_config()
1107 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1108 self.pg0.remote_ip4, sha1_key=key)
1109 self.vpp_session.add_vpp_config()
1110 self.vpp_session.admin_up()
1111 # specify sequence number so that it wraps
1112 self.test_session = BFDTestSession(
1113 self, self.pg0, AF_INET, sha1_key=key,
1114 bfd_key_id=self.vpp_session.bfd_key_id,
1115 our_seq_number=0xFFFFFFFF - 4)
1116 bfd_session_up(self)
1117 for dummy in range(30):
1118 wait_for_bfd_packet(self)
1119 self.test_session.inc_seq_num()
1120 self.test_session.send_packet()
1121 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1123 def test_send_bad_seq_number(self):
1124 """ session is not kept alive by msgs with bad seq numbers"""
1125 key = self.factory.create_random_key(
1126 self, BFDAuthType.meticulous_keyed_sha1)
1127 key.add_vpp_config()
1128 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1129 self.pg0.remote_ip4, sha1_key=key)
1130 self.vpp_session.add_vpp_config()
1131 self.vpp_session.admin_up()
1132 self.test_session = BFDTestSession(
1133 self, self.pg0, AF_INET, sha1_key=key,
1134 bfd_key_id=self.vpp_session.bfd_key_id)
1135 bfd_session_up(self)
1136 detection_time = self.vpp_session.detect_mult *\
1137 self.vpp_session.required_min_rx / USEC_IN_SEC
1138 session_timeout = time.time() + detection_time
1139 while time.time() < session_timeout:
1140 self.assert_equal(len(self.vapi.collect_events()), 0,
1141 "number of bfd events")
1142 wait_for_bfd_packet(self)
1143 self.test_session.send_packet()
1144 wait_for_bfd_packet(self)
1145 self.test_session.send_packet()
1146 e = self.vapi.collect_events()
1147 # session should be down now, because the sequence numbers weren't
1149 self.assert_equal(len(e), 1, "number of bfd events")
1150 verify_event(self, e[0], expected_state=BFDState.down)
1152 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1153 legitimate_test_session,
1155 rogue_bfd_values=None):
1156 """ execute a rogue session interaction scenario
1158 1. create vpp session, add config
1159 2. bring the legitimate session up
1160 3. copy the bfd values from legitimate session to rogue session
1161 4. apply rogue_bfd_values to rogue session
1162 5. set rogue session state to down
1163 6. send message to take the session down from the rogue session
1164 7. assert that the legitimate session is unaffected
1167 self.vpp_session = vpp_bfd_udp_session
1168 self.vpp_session.add_vpp_config()
1169 self.vpp_session.admin_up()
1170 self.test_session = legitimate_test_session
1171 # bring vpp session up
1172 bfd_session_up(self)
1173 # send packet from rogue session
1174 rogue_test_session.update(
1175 my_discriminator=self.test_session.my_discriminator,
1176 your_discriminator=self.test_session.your_discriminator,
1177 desired_min_tx=self.test_session.desired_min_tx,
1178 required_min_rx=self.test_session.required_min_rx,
1179 detect_mult=self.test_session.detect_mult,
1180 diag=self.test_session.diag,
1181 state=self.test_session.state,
1182 auth_type=self.test_session.auth_type)
1183 if rogue_bfd_values:
1184 rogue_test_session.update(**rogue_bfd_values)
1185 rogue_test_session.update(state=BFDState.down)
1186 rogue_test_session.send_packet()
1187 wait_for_bfd_packet(self)
1188 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1190 def test_mismatch_auth(self):
1191 """ session is not brought down by unauthenticated msg """
1192 key = self.factory.create_random_key(self)
1193 key.add_vpp_config()
1194 vpp_session = VppBFDUDPSession(
1195 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1196 legitimate_test_session = BFDTestSession(
1197 self, self.pg0, AF_INET, sha1_key=key,
1198 bfd_key_id=vpp_session.bfd_key_id)
1199 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1200 self.execute_rogue_session_scenario(vpp_session,
1201 legitimate_test_session,
1204 def test_mismatch_bfd_key_id(self):
1205 """ session is not brought down by msg with non-existent key-id """
1206 key = self.factory.create_random_key(self)
1207 key.add_vpp_config()
1208 vpp_session = VppBFDUDPSession(
1209 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1210 # pick a different random bfd key id
1212 while x == vpp_session.bfd_key_id:
1214 legitimate_test_session = BFDTestSession(
1215 self, self.pg0, AF_INET, sha1_key=key,
1216 bfd_key_id=vpp_session.bfd_key_id)
1217 rogue_test_session = BFDTestSession(
1218 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1219 self.execute_rogue_session_scenario(vpp_session,
1220 legitimate_test_session,
1223 def test_mismatched_auth_type(self):
1224 """ session is not brought down by msg with wrong auth type """
1225 key = self.factory.create_random_key(self)
1226 key.add_vpp_config()
1227 vpp_session = VppBFDUDPSession(
1228 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1229 legitimate_test_session = BFDTestSession(
1230 self, self.pg0, AF_INET, sha1_key=key,
1231 bfd_key_id=vpp_session.bfd_key_id)
1232 rogue_test_session = BFDTestSession(
1233 self, self.pg0, AF_INET, sha1_key=key,
1234 bfd_key_id=vpp_session.bfd_key_id)
1235 self.execute_rogue_session_scenario(
1236 vpp_session, legitimate_test_session, rogue_test_session,
1237 {'auth_type': BFDAuthType.keyed_md5})
1239 def test_restart(self):
1240 """ simulate remote peer restart and resynchronization """
1241 key = self.factory.create_random_key(
1242 self, BFDAuthType.meticulous_keyed_sha1)
1243 key.add_vpp_config()
1244 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1245 self.pg0.remote_ip4, sha1_key=key)
1246 self.vpp_session.add_vpp_config()
1247 self.vpp_session.admin_up()
1248 self.test_session = BFDTestSession(
1249 self, self.pg0, AF_INET, sha1_key=key,
1250 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1251 bfd_session_up(self)
1252 # don't send any packets for 2*detection_time
1253 detection_time = self.vpp_session.detect_mult *\
1254 self.vpp_session.required_min_rx / USEC_IN_SEC
1255 self.sleep(detection_time, "simulating peer restart")
1256 events = self.vapi.collect_events()
1257 self.assert_equal(len(events), 1, "number of bfd events")
1258 verify_event(self, events[0], expected_state=BFDState.down)
1259 self.test_session.update(state=BFDState.down)
1260 # reset sequence number
1261 self.test_session.our_seq_number = 0
1262 self.test_session.vpp_seq_number = None
1263 # now throw away any pending packets
1264 self.pg0.enable_capture()
1265 bfd_session_up(self)
1268 class BFDAuthOnOffTestCase(VppTestCase):
1269 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1276 def setUpClass(cls):
1277 super(BFDAuthOnOffTestCase, cls).setUpClass()
1279 cls.create_pg_interfaces([0])
1280 cls.pg0.config_ip4()
1282 cls.pg0.resolve_arp()
1285 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1289 super(BFDAuthOnOffTestCase, self).setUp()
1290 self.factory = AuthKeyFactory()
1291 self.vapi.want_bfd_events()
1292 self.pg0.enable_capture()
1295 if not self.vpp_dead:
1296 self.vapi.want_bfd_events(enable_disable=0)
1297 self.vapi.collect_events() # clear the event queue
1298 super(BFDAuthOnOffTestCase, self).tearDown()
1300 def test_auth_on_immediate(self):
1301 """ turn auth on without disturbing session state (immediate) """
1302 key = self.factory.create_random_key(self)
1303 key.add_vpp_config()
1304 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1305 self.pg0.remote_ip4)
1306 self.vpp_session.add_vpp_config()
1307 self.vpp_session.admin_up()
1308 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1309 bfd_session_up(self)
1310 for dummy in range(self.test_session.detect_mult * 2):
1311 p = wait_for_bfd_packet(self)
1312 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1313 self.test_session.send_packet()
1314 self.vpp_session.activate_auth(key)
1315 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1316 self.test_session.sha1_key = key
1317 for dummy in range(self.test_session.detect_mult * 2):
1318 p = wait_for_bfd_packet(self)
1319 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1320 self.test_session.send_packet()
1321 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1322 self.assert_equal(len(self.vapi.collect_events()), 0,
1323 "number of bfd events")
1325 def test_auth_off_immediate(self):
1326 """ turn auth off without disturbing session state (immediate) """
1327 key = self.factory.create_random_key(self)
1328 key.add_vpp_config()
1329 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1330 self.pg0.remote_ip4, sha1_key=key)
1331 self.vpp_session.add_vpp_config()
1332 self.vpp_session.admin_up()
1333 self.test_session = BFDTestSession(
1334 self, self.pg0, AF_INET, sha1_key=key,
1335 bfd_key_id=self.vpp_session.bfd_key_id)
1336 bfd_session_up(self)
1337 # self.vapi.want_bfd_events(enable_disable=0)
1338 for dummy in range(self.test_session.detect_mult * 2):
1339 p = wait_for_bfd_packet(self)
1340 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1341 self.test_session.inc_seq_num()
1342 self.test_session.send_packet()
1343 self.vpp_session.deactivate_auth()
1344 self.test_session.bfd_key_id = None
1345 self.test_session.sha1_key = None
1346 for dummy in range(self.test_session.detect_mult * 2):
1347 p = wait_for_bfd_packet(self)
1348 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1349 self.test_session.inc_seq_num()
1350 self.test_session.send_packet()
1351 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1352 self.assert_equal(len(self.vapi.collect_events()), 0,
1353 "number of bfd events")
1355 def test_auth_change_key_immediate(self):
1356 """ change auth key without disturbing session state (immediate) """
1357 key1 = self.factory.create_random_key(self)
1358 key1.add_vpp_config()
1359 key2 = self.factory.create_random_key(self)
1360 key2.add_vpp_config()
1361 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1362 self.pg0.remote_ip4, sha1_key=key1)
1363 self.vpp_session.add_vpp_config()
1364 self.vpp_session.admin_up()
1365 self.test_session = BFDTestSession(
1366 self, self.pg0, AF_INET, sha1_key=key1,
1367 bfd_key_id=self.vpp_session.bfd_key_id)
1368 bfd_session_up(self)
1369 for dummy in range(self.test_session.detect_mult * 2):
1370 p = wait_for_bfd_packet(self)
1371 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1372 self.test_session.send_packet()
1373 self.vpp_session.activate_auth(key2)
1374 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1375 self.test_session.sha1_key = key2
1376 for dummy in range(self.test_session.detect_mult * 2):
1377 p = wait_for_bfd_packet(self)
1378 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1379 self.test_session.send_packet()
1380 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1381 self.assert_equal(len(self.vapi.collect_events()), 0,
1382 "number of bfd events")
1384 def test_auth_on_delayed(self):
1385 """ turn auth on without disturbing session state (delayed) """
1386 key = self.factory.create_random_key(self)
1387 key.add_vpp_config()
1388 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1389 self.pg0.remote_ip4)
1390 self.vpp_session.add_vpp_config()
1391 self.vpp_session.admin_up()
1392 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1393 bfd_session_up(self)
1394 for dummy in range(self.test_session.detect_mult * 2):
1395 wait_for_bfd_packet(self)
1396 self.test_session.send_packet()
1397 self.vpp_session.activate_auth(key, delayed=True)
1398 for dummy in range(self.test_session.detect_mult * 2):
1399 p = wait_for_bfd_packet(self)
1400 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1401 self.test_session.send_packet()
1402 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1403 self.test_session.sha1_key = key
1404 self.test_session.send_packet()
1405 for dummy in range(self.test_session.detect_mult * 2):
1406 p = wait_for_bfd_packet(self)
1407 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1408 self.test_session.send_packet()
1409 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1410 self.assert_equal(len(self.vapi.collect_events()), 0,
1411 "number of bfd events")
1413 def test_auth_off_delayed(self):
1414 """ turn auth off without disturbing session state (delayed) """
1415 key = self.factory.create_random_key(self)
1416 key.add_vpp_config()
1417 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1418 self.pg0.remote_ip4, sha1_key=key)
1419 self.vpp_session.add_vpp_config()
1420 self.vpp_session.admin_up()
1421 self.test_session = BFDTestSession(
1422 self, self.pg0, AF_INET, sha1_key=key,
1423 bfd_key_id=self.vpp_session.bfd_key_id)
1424 bfd_session_up(self)
1425 for dummy in range(self.test_session.detect_mult * 2):
1426 p = wait_for_bfd_packet(self)
1427 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1428 self.test_session.send_packet()
1429 self.vpp_session.deactivate_auth(delayed=True)
1430 for dummy in range(self.test_session.detect_mult * 2):
1431 p = wait_for_bfd_packet(self)
1432 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1433 self.test_session.send_packet()
1434 self.test_session.bfd_key_id = None
1435 self.test_session.sha1_key = None
1436 self.test_session.send_packet()
1437 for dummy in range(self.test_session.detect_mult * 2):
1438 p = wait_for_bfd_packet(self)
1439 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1440 self.test_session.send_packet()
1441 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1442 self.assert_equal(len(self.vapi.collect_events()), 0,
1443 "number of bfd events")
1445 def test_auth_change_key_delayed(self):
1446 """ change auth key without disturbing session state (delayed) """
1447 key1 = self.factory.create_random_key(self)
1448 key1.add_vpp_config()
1449 key2 = self.factory.create_random_key(self)
1450 key2.add_vpp_config()
1451 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1452 self.pg0.remote_ip4, sha1_key=key1)
1453 self.vpp_session.add_vpp_config()
1454 self.vpp_session.admin_up()
1455 self.test_session = BFDTestSession(
1456 self, self.pg0, AF_INET, sha1_key=key1,
1457 bfd_key_id=self.vpp_session.bfd_key_id)
1458 bfd_session_up(self)
1459 for dummy in range(self.test_session.detect_mult * 2):
1460 p = wait_for_bfd_packet(self)
1461 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1462 self.test_session.send_packet()
1463 self.vpp_session.activate_auth(key2, delayed=True)
1464 for dummy in range(self.test_session.detect_mult * 2):
1465 p = wait_for_bfd_packet(self)
1466 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1467 self.test_session.send_packet()
1468 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1469 self.test_session.sha1_key = key2
1470 self.test_session.send_packet()
1471 for dummy in range(self.test_session.detect_mult * 2):
1472 p = wait_for_bfd_packet(self)
1473 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1474 self.test_session.send_packet()
1475 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1476 self.assert_equal(len(self.vapi.collect_events()), 0,
1477 "number of bfd events")
1479 if __name__ == '__main__':
1480 unittest.main(testRunner=VppTestRunner)