4 from __future__ import division
9 from random import randint, shuffle
10 from socket import AF_INET, AF_INET6
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import UDP, IP
14 from scapy.layers.inet6 import IPv6
15 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 from framework import VppTestCase, VppTestRunner
18 from vpp_pg_interface import CaptureTimeoutError
24 class AuthKeyFactory(object):
25 """Factory class for creating auth keys with unique conf key ID"""
28 self._conf_key_ids = {}
30 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
31 """ create a random key with unique conf key id """
32 conf_key_id = randint(0, 0xFFFFFFFF)
33 while conf_key_id in self._conf_key_ids:
34 conf_key_id = randint(0, 0xFFFFFFFF)
35 self._conf_key_ids[conf_key_id] = 1
36 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
37 return VppBFDAuthKey(test=test, auth_type=auth_type,
38 conf_key_id=conf_key_id, key=key)
41 class BFDAPITestCase(VppTestCase):
42 """Bidirectional Forwarding Detection (BFD) - API"""
49 super(BFDAPITestCase, cls).setUpClass()
52 cls.create_pg_interfaces(range(2))
53 for i in cls.pg_interfaces:
59 super(BFDAPITestCase, cls).tearDownClass()
63 super(BFDAPITestCase, self).setUp()
64 self.factory = AuthKeyFactory()
66 def test_add_bfd(self):
67 """ create a BFD session """
68 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
69 session.add_vpp_config()
70 self.logger.debug("Session state is %s", session.state)
71 session.remove_vpp_config()
72 session.add_vpp_config()
73 self.logger.debug("Session state is %s", session.state)
74 session.remove_vpp_config()
76 def test_double_add(self):
77 """ create the same BFD session twice (negative case) """
78 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
79 session.add_vpp_config()
81 with self.vapi.expect_negative_api_retval():
82 session.add_vpp_config()
84 session.remove_vpp_config()
86 def test_add_bfd6(self):
87 """ create IPv6 BFD session """
88 session = VppBFDUDPSession(
89 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
90 session.add_vpp_config()
91 self.logger.debug("Session state is %s", session.state)
92 session.remove_vpp_config()
93 session.add_vpp_config()
94 self.logger.debug("Session state is %s", session.state)
95 session.remove_vpp_config()
97 def test_mod_bfd(self):
98 """ modify BFD session parameters """
99 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
100 desired_min_tx=50000,
101 required_min_rx=10000,
103 session.add_vpp_config()
104 s = session.get_bfd_udp_session_dump_entry()
105 self.assert_equal(session.desired_min_tx,
107 "desired min transmit interval")
108 self.assert_equal(session.required_min_rx,
110 "required min receive interval")
111 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
112 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
113 required_min_rx=session.required_min_rx * 2,
114 detect_mult=session.detect_mult * 2)
115 s = session.get_bfd_udp_session_dump_entry()
116 self.assert_equal(session.desired_min_tx,
118 "desired min transmit interval")
119 self.assert_equal(session.required_min_rx,
121 "required min receive interval")
122 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
124 def test_add_sha1_keys(self):
125 """ add SHA1 keys """
127 keys = [self.factory.create_random_key(
128 self) for i in range(0, key_count)]
130 self.assertFalse(key.query_vpp_config())
134 self.assertTrue(key.query_vpp_config())
136 indexes = range(key_count)
141 key.remove_vpp_config()
143 for j in range(key_count):
146 self.assertFalse(key.query_vpp_config())
148 self.assertTrue(key.query_vpp_config())
149 # should be removed now
151 self.assertFalse(key.query_vpp_config())
152 # add back and remove again
156 self.assertTrue(key.query_vpp_config())
158 key.remove_vpp_config()
160 self.assertFalse(key.query_vpp_config())
162 def test_add_bfd_sha1(self):
163 """ create a BFD session (SHA1) """
164 key = self.factory.create_random_key(self)
166 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
168 session.add_vpp_config()
169 self.logger.debug("Session state is %s", session.state)
170 session.remove_vpp_config()
171 session.add_vpp_config()
172 self.logger.debug("Session state is %s", session.state)
173 session.remove_vpp_config()
175 def test_double_add_sha1(self):
176 """ create the same BFD session twice (negative case) (SHA1) """
177 key = self.factory.create_random_key(self)
179 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
181 session.add_vpp_config()
182 with self.assertRaises(Exception):
183 session.add_vpp_config()
185 def test_add_auth_nonexistent_key(self):
186 """ create BFD session using non-existent SHA1 (negative case) """
187 session = VppBFDUDPSession(
188 self, self.pg0, self.pg0.remote_ip4,
189 sha1_key=self.factory.create_random_key(self))
190 with self.assertRaises(Exception):
191 session.add_vpp_config()
193 def test_shared_sha1_key(self):
194 """ share single SHA1 key between multiple BFD sessions """
195 key = self.factory.create_random_key(self)
198 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
200 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
201 sha1_key=key, af=AF_INET6),
202 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
204 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
205 sha1_key=key, af=AF_INET6)]
210 e = key.get_bfd_auth_keys_dump_entry()
211 self.assert_equal(e.use_count, len(sessions) - removed,
212 "Use count for shared key")
213 s.remove_vpp_config()
215 e = key.get_bfd_auth_keys_dump_entry()
216 self.assert_equal(e.use_count, len(sessions) - removed,
217 "Use count for shared key")
219 def test_activate_auth(self):
220 """ activate SHA1 authentication """
221 key = self.factory.create_random_key(self)
223 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
224 session.add_vpp_config()
225 session.activate_auth(key)
227 def test_deactivate_auth(self):
228 """ deactivate SHA1 authentication """
229 key = self.factory.create_random_key(self)
231 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
232 session.add_vpp_config()
233 session.activate_auth(key)
234 session.deactivate_auth()
236 def test_change_key(self):
237 """ change SHA1 key """
238 key1 = self.factory.create_random_key(self)
239 key2 = self.factory.create_random_key(self)
240 while key2.conf_key_id == key1.conf_key_id:
241 key2 = self.factory.create_random_key(self)
242 key1.add_vpp_config()
243 key2.add_vpp_config()
244 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
246 session.add_vpp_config()
247 session.activate_auth(key2)
250 class BFDTestSession(object):
251 """ BFD session as seen from test framework side """
253 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
254 bfd_key_id=None, our_seq_number=None):
257 self.sha1_key = sha1_key
258 self.bfd_key_id = bfd_key_id
259 self.interface = interface
260 self.udp_sport = randint(49152, 65535)
261 if our_seq_number is None:
262 self.our_seq_number = randint(0, 40000000)
264 self.our_seq_number = our_seq_number
265 self.vpp_seq_number = None
266 self.my_discriminator = 0
267 self.desired_min_tx = 100000
268 self.required_min_rx = 100000
269 self.detect_mult = detect_mult
270 self.diag = BFDDiagCode.no_diagnostic
271 self.your_discriminator = None
272 self.state = BFDState.down
273 self.auth_type = BFDAuthType.no_auth
275 def inc_seq_num(self):
276 """ increment sequence number, wrapping if needed """
277 if self.our_seq_number == 0xFFFFFFFF:
278 self.our_seq_number = 0
280 self.our_seq_number += 1
282 def update(self, my_discriminator=None, your_discriminator=None,
283 desired_min_tx=None, required_min_rx=None, detect_mult=None,
284 diag=None, state=None, auth_type=None):
285 """ update BFD parameters associated with session """
287 self.my_discriminator = my_discriminator
288 if your_discriminator:
289 self.your_discriminator = your_discriminator
291 self.required_min_rx = required_min_rx
293 self.desired_min_tx = desired_min_tx
295 self.detect_mult = detect_mult
301 self.auth_type = auth_type
303 def fill_packet_fields(self, packet):
304 """ set packet fields with known values in packet """
306 if self.my_discriminator:
307 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
308 self.my_discriminator)
309 bfd.my_discriminator = self.my_discriminator
310 if self.your_discriminator:
311 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
312 self.your_discriminator)
313 bfd.your_discriminator = self.your_discriminator
314 if self.required_min_rx:
315 self.test.logger.debug(
316 "BFD: setting packet.required_min_rx_interval=%s",
317 self.required_min_rx)
318 bfd.required_min_rx_interval = self.required_min_rx
319 if self.desired_min_tx:
320 self.test.logger.debug(
321 "BFD: setting packet.desired_min_tx_interval=%s",
323 bfd.desired_min_tx_interval = self.desired_min_tx
325 self.test.logger.debug(
326 "BFD: setting packet.detect_mult=%s", self.detect_mult)
327 bfd.detect_mult = self.detect_mult
329 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
332 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
333 bfd.state = self.state
335 # this is used by a negative test-case
336 self.test.logger.debug("BFD: setting packet.auth_type=%s",
338 bfd.auth_type = self.auth_type
340 def create_packet(self):
341 """ create a BFD packet, reflecting the current state of session """
344 bfd.auth_type = self.sha1_key.auth_type
345 bfd.auth_len = BFD.sha1_auth_len
346 bfd.auth_key_id = self.bfd_key_id
347 bfd.auth_seq_num = self.our_seq_number
348 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
351 if self.af == AF_INET6:
352 packet = (Ether(src=self.interface.remote_mac,
353 dst=self.interface.local_mac) /
354 IPv6(src=self.interface.remote_ip6,
355 dst=self.interface.local_ip6,
357 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
360 packet = (Ether(src=self.interface.remote_mac,
361 dst=self.interface.local_mac) /
362 IP(src=self.interface.remote_ip4,
363 dst=self.interface.local_ip4,
365 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
367 self.test.logger.debug("BFD: Creating packet")
368 self.fill_packet_fields(packet)
370 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
371 "\0" * (20 - len(self.sha1_key.key))
372 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
373 hashlib.sha1(hash_material).hexdigest())
374 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
377 def send_packet(self, packet=None, interface=None):
378 """ send packet on interface, creating the packet if needed """
380 packet = self.create_packet()
381 if interface is None:
382 interface = self.test.pg0
383 self.test.logger.debug(ppp("Sending packet:", packet))
384 interface.add_stream(packet)
387 def verify_sha1_auth(self, packet):
388 """ Verify correctness of authentication in BFD layer. """
390 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
391 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
393 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
394 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
395 if self.vpp_seq_number is None:
396 self.vpp_seq_number = bfd.auth_seq_num
397 self.test.logger.debug("Received initial sequence number: %s" %
400 recvd_seq_num = bfd.auth_seq_num
401 self.test.logger.debug("Received followup sequence number: %s" %
403 if self.vpp_seq_number < 0xffffffff:
404 if self.sha1_key.auth_type == \
405 BFDAuthType.meticulous_keyed_sha1:
406 self.test.assert_equal(recvd_seq_num,
407 self.vpp_seq_number + 1,
408 "BFD sequence number")
410 self.test.assert_in_range(recvd_seq_num,
412 self.vpp_seq_number + 1,
413 "BFD sequence number")
415 if self.sha1_key.auth_type == \
416 BFDAuthType.meticulous_keyed_sha1:
417 self.test.assert_equal(recvd_seq_num, 0,
418 "BFD sequence number")
420 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
421 "BFD sequence number not one of "
422 "(%s, 0)" % self.vpp_seq_number)
423 self.vpp_seq_number = recvd_seq_num
424 # last 20 bytes represent the hash - so replace them with the key,
425 # pad the result with zeros and hash the result
426 hash_material = bfd.original[:-20] + self.sha1_key.key + \
427 "\0" * (20 - len(self.sha1_key.key))
428 expected_hash = hashlib.sha1(hash_material).hexdigest()
429 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
430 expected_hash, "Auth key hash")
432 def verify_bfd(self, packet):
433 """ Verify correctness of BFD layer. """
435 self.test.assert_equal(bfd.version, 1, "BFD version")
436 self.test.assert_equal(bfd.your_discriminator,
437 self.my_discriminator,
438 "BFD - your discriminator")
440 self.verify_sha1_auth(packet)
443 def bfd_session_up(test):
444 """ Bring BFD session up """
445 test.logger.info("BFD: Waiting for slow hello")
446 p = wait_for_bfd_packet(test, 2)
448 if hasattr(test, 'vpp_clock_offset'):
449 old_offset = test.vpp_clock_offset
450 test.vpp_clock_offset = time.time() - p.time
451 test.logger.debug("BFD: Calculated vpp clock offset: %s",
452 test.vpp_clock_offset)
454 test.assertAlmostEqual(
455 old_offset, test.vpp_clock_offset, delta=0.1,
456 msg="vpp clock offset not stable (new: %s, old: %s)" %
457 (test.vpp_clock_offset, old_offset))
458 test.logger.info("BFD: Sending Init")
459 test.test_session.update(my_discriminator=randint(0, 40000000),
460 your_discriminator=p[BFD].my_discriminator,
462 test.test_session.send_packet()
463 test.logger.info("BFD: Waiting for event")
464 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
465 verify_event(test, e, expected_state=BFDState.up)
466 test.logger.info("BFD: Session is Up")
467 test.test_session.update(state=BFDState.up)
468 test.test_session.send_packet()
469 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
472 def bfd_session_down(test):
473 """ Bring BFD session down """
474 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
475 test.test_session.update(state=BFDState.down)
476 test.test_session.send_packet()
477 test.logger.info("BFD: Waiting for event")
478 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
479 verify_event(test, e, expected_state=BFDState.down)
480 test.logger.info("BFD: Session is Down")
481 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
484 def verify_ip(test, packet):
485 """ Verify correctness of IP layer. """
486 if test.vpp_session.af == AF_INET6:
488 local_ip = test.pg0.local_ip6
489 remote_ip = test.pg0.remote_ip6
490 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
493 local_ip = test.pg0.local_ip4
494 remote_ip = test.pg0.remote_ip4
495 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
496 test.assert_equal(ip.src, local_ip, "IP source address")
497 test.assert_equal(ip.dst, remote_ip, "IP destination address")
500 def verify_udp(test, packet):
501 """ Verify correctness of UDP layer. """
503 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
504 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
508 def verify_event(test, event, expected_state):
509 """ Verify correctness of event values. """
511 test.logger.debug("BFD: Event: %s" % repr(e))
512 test.assert_equal(e.sw_if_index,
513 test.vpp_session.interface.sw_if_index,
514 "BFD interface index")
516 if test.vpp_session.af == AF_INET6:
518 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
519 if test.vpp_session.af == AF_INET:
520 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
521 "Local IPv4 address")
522 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
525 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
526 "Local IPv6 address")
527 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
529 test.assert_equal(e.state, expected_state, BFDState)
532 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
533 """ wait for BFD packet and verify its correctness
535 :param timeout: how long to wait
536 :param pcap_time_min: ignore packets with pcap timestamp lower than this
538 :returns: tuple (packet, time spent waiting for packet)
540 test.logger.info("BFD: Waiting for BFD packet")
541 deadline = time.time() + timeout
546 test.assert_in_range(counter, 0, 100, "number of packets ignored")
547 time_left = deadline - time.time()
549 raise CaptureTimeoutError("Packet did not arrive within timeout")
550 p = test.pg0.wait_for_packet(timeout=time_left)
551 test.logger.debug(ppp("BFD: Got packet:", p))
552 if pcap_time_min is not None and p.time < pcap_time_min:
553 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
554 "pcap time min %s):" %
555 (p.time, pcap_time_min), p))
560 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
562 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
565 test.test_session.verify_bfd(p)
569 class BFD4TestCase(VppTestCase):
570 """Bidirectional Forwarding Detection (BFD)"""
573 vpp_clock_offset = None
579 super(BFD4TestCase, cls).setUpClass()
581 cls.create_pg_interfaces([0])
583 cls.pg0.configure_ipv4_neighbors()
585 cls.pg0.resolve_arp()
588 super(BFD4TestCase, cls).tearDownClass()
592 super(BFD4TestCase, self).setUp()
593 self.factory = AuthKeyFactory()
594 self.vapi.want_bfd_events()
595 self.pg0.enable_capture()
597 self.vpp_session = VppBFDUDPSession(self, self.pg0,
599 self.vpp_session.add_vpp_config()
600 self.vpp_session.admin_up()
601 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
603 self.vapi.want_bfd_events(enable_disable=0)
607 if not self.vpp_dead:
608 self.vapi.want_bfd_events(enable_disable=0)
609 self.vapi.collect_events() # clear the event queue
610 super(BFD4TestCase, self).tearDown()
612 def test_session_up(self):
613 """ bring BFD session up """
616 def test_session_down(self):
617 """ bring BFD session down """
619 bfd_session_down(self)
621 def test_hold_up(self):
622 """ hold BFD session up """
624 for dummy in range(self.test_session.detect_mult * 2):
625 wait_for_bfd_packet(self)
626 self.test_session.send_packet()
627 self.assert_equal(len(self.vapi.collect_events()), 0,
628 "number of bfd events")
630 def test_slow_timer(self):
631 """ verify slow periodic control frames while session down """
633 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
634 prev_packet = wait_for_bfd_packet(self, 2)
635 for dummy in range(packet_count):
636 next_packet = wait_for_bfd_packet(self, 2)
637 time_diff = next_packet.time - prev_packet.time
638 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
639 # to work around timing issues
640 self.assert_in_range(
641 time_diff, 0.70, 1.05, "time between slow packets")
642 prev_packet = next_packet
644 def test_zero_remote_min_rx(self):
645 """ no packets when zero remote required min rx interval """
647 self.test_session.update(required_min_rx=0)
648 self.test_session.send_packet()
649 cap = 2 * self.vpp_session.desired_min_tx *\
650 self.test_session.detect_mult
651 time_mark = time.time()
653 # busy wait here, trying to collect a packet or event, vpp is not
654 # allowed to send packets and the session will timeout first - so the
655 # Up->Down event must arrive before any packets do
656 while time.time() < time_mark + cap / USEC_IN_SEC:
658 p = wait_for_bfd_packet(
660 pcap_time_min=time_mark - self.vpp_clock_offset)
661 self.logger.error(ppp("Received unexpected packet:", p))
663 except CaptureTimeoutError:
665 events = self.vapi.collect_events()
667 verify_event(self, events[0], BFDState.down)
669 self.assert_equal(count, 0, "number of packets received")
671 def test_conn_down(self):
672 """ verify session goes down after inactivity """
674 for dummy in range(self.test_session.detect_mult):
675 wait_for_bfd_packet(self)
676 self.assert_equal(len(self.vapi.collect_events()), 0,
677 "number of bfd events")
678 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
679 verify_event(self, e, expected_state=BFDState.down)
681 def test_large_required_min_rx(self):
682 """ large remote required min rx interval """
684 p = wait_for_bfd_packet(self)
686 self.test_session.update(required_min_rx=interval)
687 self.test_session.send_packet()
688 time_mark = time.time()
690 # busy wait here, trying to collect a packet or event, vpp is not
691 # allowed to send packets and the session will timeout first - so the
692 # Up->Down event must arrive before any packets do
693 while time.time() < time_mark + interval / USEC_IN_SEC:
695 p = wait_for_bfd_packet(self, timeout=0)
696 # if vpp managed to send a packet before we did the session
697 # session update, then that's fine, ignore it
698 if p.time < time_mark - self.vpp_clock_offset:
700 self.logger.error(ppp("Received unexpected packet:", p))
702 except CaptureTimeoutError:
704 events = self.vapi.collect_events()
706 verify_event(self, events[0], BFDState.down)
708 self.assert_equal(count, 0, "number of packets received")
710 def test_immediate_remote_min_rx_reduction(self):
711 """ immediately honor remote required min rx reduction """
712 self.vpp_session.remove_vpp_config()
713 self.vpp_session = VppBFDUDPSession(
714 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
715 self.pg0.enable_capture()
716 self.vpp_session.add_vpp_config()
717 self.test_session.update(desired_min_tx=1000000,
718 required_min_rx=1000000)
720 reference_packet = wait_for_bfd_packet(self)
721 time_mark = time.time()
723 self.test_session.update(required_min_rx=interval)
724 self.test_session.send_packet()
725 extra_time = time.time() - time_mark
726 p = wait_for_bfd_packet(self)
727 # first packet is allowed to be late by time we spent doing the update
728 # calculated in extra_time
729 self.assert_in_range(p.time - reference_packet.time,
730 .95 * 0.75 * interval / USEC_IN_SEC,
731 1.05 * interval / USEC_IN_SEC + extra_time,
732 "time between BFD packets")
734 for dummy in range(3):
735 p = wait_for_bfd_packet(self)
736 diff = p.time - reference_packet.time
737 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
738 1.05 * interval / USEC_IN_SEC,
739 "time between BFD packets")
742 def test_modify_req_min_rx_double(self):
743 """ modify session - double required min rx """
745 p = wait_for_bfd_packet(self)
746 self.test_session.update(desired_min_tx=10000,
747 required_min_rx=10000)
748 self.test_session.send_packet()
749 # double required min rx
750 self.vpp_session.modify_parameters(
751 required_min_rx=2 * self.vpp_session.required_min_rx)
752 p = wait_for_bfd_packet(
753 self, pcap_time_min=time.time() - self.vpp_clock_offset)
754 # poll bit needs to be set
755 self.assertIn("P", p.sprintf("%BFD.flags%"),
756 "Poll bit not set in BFD packet")
757 # finish poll sequence with final packet
758 final = self.test_session.create_packet()
759 final[BFD].flags = "F"
760 timeout = self.test_session.detect_mult * \
761 max(self.test_session.desired_min_tx,
762 self.vpp_session.required_min_rx) / USEC_IN_SEC
763 self.test_session.send_packet(final)
764 time_mark = time.time()
765 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
766 verify_event(self, e, expected_state=BFDState.down)
767 time_to_event = time.time() - time_mark
768 self.assert_in_range(time_to_event, .9 * timeout,
769 1.1 * timeout, "session timeout")
771 def test_modify_req_min_rx_halve(self):
772 """ modify session - halve required min rx """
773 self.vpp_session.modify_parameters(
774 required_min_rx=2 * self.vpp_session.required_min_rx)
776 p = wait_for_bfd_packet(self)
777 self.test_session.update(desired_min_tx=10000,
778 required_min_rx=10000)
779 self.test_session.send_packet()
780 p = wait_for_bfd_packet(
781 self, pcap_time_min=time.time() - self.vpp_clock_offset)
782 # halve required min rx
783 old_required_min_rx = self.vpp_session.required_min_rx
784 self.vpp_session.modify_parameters(
785 required_min_rx=0.5 * self.vpp_session.required_min_rx)
786 # now we wait 0.8*3*old-req-min-rx and the session should still be up
787 self.sleep(0.8 * self.vpp_session.detect_mult *
788 old_required_min_rx / USEC_IN_SEC)
789 self.assert_equal(len(self.vapi.collect_events()), 0,
790 "number of bfd events")
791 p = wait_for_bfd_packet(self)
792 # poll bit needs to be set
793 self.assertIn("P", p.sprintf("%BFD.flags%"),
794 "Poll bit not set in BFD packet")
795 # finish poll sequence with final packet
796 final = self.test_session.create_packet()
797 final[BFD].flags = "F"
798 self.test_session.send_packet(final)
799 # now the session should time out under new conditions
801 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
803 detection_time = self.vpp_session.detect_mult *\
804 self.vpp_session.required_min_rx / USEC_IN_SEC
805 self.assert_in_range(after - before,
806 0.9 * detection_time,
807 1.1 * detection_time,
808 "time before bfd session goes down")
809 verify_event(self, e, expected_state=BFDState.down)
811 def test_modify_des_min_tx(self):
812 """ modify desired min tx interval """
815 def test_modify_detect_mult(self):
816 """ modify detect multiplier """
818 p = wait_for_bfd_packet(self)
819 self.vpp_session.modify_parameters(detect_mult=1)
820 p = wait_for_bfd_packet(
821 self, pcap_time_min=time.time() - self.vpp_clock_offset)
822 self.assert_equal(self.vpp_session.detect_mult,
825 # poll bit must not be set
826 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
827 "Poll bit not set in BFD packet")
828 self.vpp_session.modify_parameters(detect_mult=10)
829 p = wait_for_bfd_packet(
830 self, pcap_time_min=time.time() - self.vpp_clock_offset)
831 self.assert_equal(self.vpp_session.detect_mult,
834 # poll bit must not be set
835 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
836 "Poll bit not set in BFD packet")
838 def test_no_periodic_if_remote_demand(self):
839 """ no periodic frames outside poll sequence if remote demand set """
841 demand = self.test_session.create_packet()
842 demand[BFD].flags = "D"
843 self.test_session.send_packet(demand)
844 transmit_time = 0.9 \
845 * max(self.vpp_session.required_min_rx,
846 self.test_session.desired_min_tx) \
849 for dummy in range(self.test_session.detect_mult * 2):
850 time.sleep(transmit_time)
851 self.test_session.send_packet(demand)
853 p = wait_for_bfd_packet(self, timeout=0)
854 self.logger.error(ppp("Received unexpected packet:", p))
856 except CaptureTimeoutError:
858 events = self.vapi.collect_events()
860 self.logger.error("Received unexpected event: %s", e)
861 self.assert_equal(count, 0, "number of packets received")
862 self.assert_equal(len(events), 0, "number of events received")
864 def test_echo_looped_back(self):
865 """ echo packets looped back """
866 # don't need a session in this case..
867 self.vpp_session.remove_vpp_config()
868 self.pg0.enable_capture()
869 echo_packet_count = 10
870 # random source port low enough to increment a few times..
871 udp_sport_tx = randint(1, 50000)
872 udp_sport_rx = udp_sport_tx
873 echo_packet = (Ether(src=self.pg0.remote_mac,
874 dst=self.pg0.local_mac) /
875 IP(src=self.pg0.remote_ip4,
876 dst=self.pg0.local_ip4) /
877 UDP(dport=BFD.udp_dport_echo) /
878 Raw("this should be looped back"))
879 for dummy in range(echo_packet_count):
880 self.sleep(.01, "delay between echo packets")
881 echo_packet[UDP].sport = udp_sport_tx
883 self.logger.debug(ppp("Sending packet:", echo_packet))
884 self.pg0.add_stream(echo_packet)
886 for dummy in range(echo_packet_count):
887 p = self.pg0.wait_for_packet(1)
888 self.logger.debug(ppp("Got packet:", p))
890 self.assert_equal(self.pg0.remote_mac,
891 ether.dst, "Destination MAC")
892 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
894 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
895 self.assert_equal(self.pg0.local_ip4, ip.src, "Destination IP")
897 self.assert_equal(udp.dport, BFD.udp_dport_echo,
898 "UDP destination port")
899 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
901 self.assertTrue(p.haslayer(Raw) and p[Raw] == echo_packet[Raw],
902 "Received packet is not the echo packet sent")
903 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
904 "ECHO packet identifier for test purposes)")
907 class BFD6TestCase(VppTestCase):
908 """Bidirectional Forwarding Detection (BFD) (IPv6) """
911 vpp_clock_offset = None
917 super(BFD6TestCase, cls).setUpClass()
919 cls.create_pg_interfaces([0])
921 cls.pg0.configure_ipv6_neighbors()
923 cls.pg0.resolve_ndp()
926 super(BFD6TestCase, cls).tearDownClass()
930 super(BFD6TestCase, self).setUp()
931 self.factory = AuthKeyFactory()
932 self.vapi.want_bfd_events()
933 self.pg0.enable_capture()
935 self.vpp_session = VppBFDUDPSession(self, self.pg0,
938 self.vpp_session.add_vpp_config()
939 self.vpp_session.admin_up()
940 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
941 self.logger.debug(self.vapi.cli("show adj nbr"))
943 self.vapi.want_bfd_events(enable_disable=0)
947 if not self.vpp_dead:
948 self.vapi.want_bfd_events(enable_disable=0)
949 self.vapi.collect_events() # clear the event queue
950 super(BFD6TestCase, self).tearDown()
952 def test_session_up(self):
953 """ bring BFD session up """
956 def test_hold_up(self):
957 """ hold BFD session up """
959 for dummy in range(self.test_session.detect_mult * 2):
960 wait_for_bfd_packet(self)
961 self.test_session.send_packet()
962 self.assert_equal(len(self.vapi.collect_events()), 0,
963 "number of bfd events")
964 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
966 def test_echo_looped_back(self):
967 """ echo packets looped back """
968 # don't need a session in this case..
969 self.vpp_session.remove_vpp_config()
970 self.pg0.enable_capture()
971 echo_packet_count = 10
972 # random source port low enough to increment a few times..
973 udp_sport_tx = randint(1, 50000)
974 udp_sport_rx = udp_sport_tx
975 echo_packet = (Ether(src=self.pg0.remote_mac,
976 dst=self.pg0.local_mac) /
977 IPv6(src=self.pg0.remote_ip6,
978 dst=self.pg0.local_ip6) /
979 UDP(dport=BFD.udp_dport_echo) /
980 Raw("this should be looped back"))
981 for dummy in range(echo_packet_count):
982 self.sleep(.01, "delay between echo packets")
983 echo_packet[UDP].sport = udp_sport_tx
985 self.logger.debug(ppp("Sending packet:", echo_packet))
986 self.pg0.add_stream(echo_packet)
988 for dummy in range(echo_packet_count):
989 p = self.pg0.wait_for_packet(1)
990 self.logger.debug(ppp("Got packet:", p))
992 self.assert_equal(self.pg0.remote_mac,
993 ether.dst, "Destination MAC")
994 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
996 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
997 self.assert_equal(self.pg0.local_ip6, ip.src, "Destination IP")
999 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1000 "UDP destination port")
1001 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1003 self.assertTrue(p.haslayer(Raw) and p[Raw] == echo_packet[Raw],
1004 "Received packet is not the echo packet sent")
1005 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1006 "ECHO packet identifier for test purposes)")
1009 class BFDSHA1TestCase(VppTestCase):
1010 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1013 vpp_clock_offset = None
1018 def setUpClass(cls):
1019 super(BFDSHA1TestCase, cls).setUpClass()
1021 cls.create_pg_interfaces([0])
1022 cls.pg0.config_ip4()
1024 cls.pg0.resolve_arp()
1027 super(BFDSHA1TestCase, cls).tearDownClass()
1031 super(BFDSHA1TestCase, self).setUp()
1032 self.factory = AuthKeyFactory()
1033 self.vapi.want_bfd_events()
1034 self.pg0.enable_capture()
1037 if not self.vpp_dead:
1038 self.vapi.want_bfd_events(enable_disable=0)
1039 self.vapi.collect_events() # clear the event queue
1040 super(BFDSHA1TestCase, self).tearDown()
1042 def test_session_up(self):
1043 """ bring BFD session up """
1044 key = self.factory.create_random_key(self)
1045 key.add_vpp_config()
1046 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1047 self.pg0.remote_ip4,
1049 self.vpp_session.add_vpp_config()
1050 self.vpp_session.admin_up()
1051 self.test_session = BFDTestSession(
1052 self, self.pg0, AF_INET, sha1_key=key,
1053 bfd_key_id=self.vpp_session.bfd_key_id)
1054 bfd_session_up(self)
1056 def test_hold_up(self):
1057 """ hold BFD session up """
1058 key = self.factory.create_random_key(self)
1059 key.add_vpp_config()
1060 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1061 self.pg0.remote_ip4,
1063 self.vpp_session.add_vpp_config()
1064 self.vpp_session.admin_up()
1065 self.test_session = BFDTestSession(
1066 self, self.pg0, AF_INET, sha1_key=key,
1067 bfd_key_id=self.vpp_session.bfd_key_id)
1068 bfd_session_up(self)
1069 for dummy in range(self.test_session.detect_mult * 2):
1070 wait_for_bfd_packet(self)
1071 self.test_session.send_packet()
1072 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1074 def test_hold_up_meticulous(self):
1075 """ hold BFD session up - meticulous auth """
1076 key = self.factory.create_random_key(
1077 self, BFDAuthType.meticulous_keyed_sha1)
1078 key.add_vpp_config()
1079 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1080 self.pg0.remote_ip4, sha1_key=key)
1081 self.vpp_session.add_vpp_config()
1082 self.vpp_session.admin_up()
1083 # specify sequence number so that it wraps
1084 self.test_session = BFDTestSession(
1085 self, self.pg0, AF_INET, sha1_key=key,
1086 bfd_key_id=self.vpp_session.bfd_key_id,
1087 our_seq_number=0xFFFFFFFF - 4)
1088 bfd_session_up(self)
1089 for dummy in range(30):
1090 wait_for_bfd_packet(self)
1091 self.test_session.inc_seq_num()
1092 self.test_session.send_packet()
1093 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1095 def test_send_bad_seq_number(self):
1096 """ session is not kept alive by msgs with bad seq numbers"""
1097 key = self.factory.create_random_key(
1098 self, BFDAuthType.meticulous_keyed_sha1)
1099 key.add_vpp_config()
1100 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1101 self.pg0.remote_ip4, sha1_key=key)
1102 self.vpp_session.add_vpp_config()
1103 self.vpp_session.admin_up()
1104 self.test_session = BFDTestSession(
1105 self, self.pg0, AF_INET, sha1_key=key,
1106 bfd_key_id=self.vpp_session.bfd_key_id)
1107 bfd_session_up(self)
1108 detection_time = self.vpp_session.detect_mult *\
1109 self.vpp_session.required_min_rx / USEC_IN_SEC
1110 session_timeout = time.time() + detection_time
1111 while time.time() < session_timeout:
1112 self.assert_equal(len(self.vapi.collect_events()), 0,
1113 "number of bfd events")
1114 wait_for_bfd_packet(self)
1115 self.test_session.send_packet()
1116 wait_for_bfd_packet(self)
1117 self.test_session.send_packet()
1118 e = self.vapi.collect_events()
1119 # session should be down now, because the sequence numbers weren't
1121 self.assert_equal(len(e), 1, "number of bfd events")
1122 verify_event(self, e[0], expected_state=BFDState.down)
1124 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1125 legitimate_test_session,
1127 rogue_bfd_values=None):
1128 """ execute a rogue session interaction scenario
1130 1. create vpp session, add config
1131 2. bring the legitimate session up
1132 3. copy the bfd values from legitimate session to rogue session
1133 4. apply rogue_bfd_values to rogue session
1134 5. set rogue session state to down
1135 6. send message to take the session down from the rogue session
1136 7. assert that the legitimate session is unaffected
1139 self.vpp_session = vpp_bfd_udp_session
1140 self.vpp_session.add_vpp_config()
1141 self.vpp_session.admin_up()
1142 self.test_session = legitimate_test_session
1143 # bring vpp session up
1144 bfd_session_up(self)
1145 # send packet from rogue session
1146 rogue_test_session.update(
1147 my_discriminator=self.test_session.my_discriminator,
1148 your_discriminator=self.test_session.your_discriminator,
1149 desired_min_tx=self.test_session.desired_min_tx,
1150 required_min_rx=self.test_session.required_min_rx,
1151 detect_mult=self.test_session.detect_mult,
1152 diag=self.test_session.diag,
1153 state=self.test_session.state,
1154 auth_type=self.test_session.auth_type)
1155 if rogue_bfd_values:
1156 rogue_test_session.update(**rogue_bfd_values)
1157 rogue_test_session.update(state=BFDState.down)
1158 rogue_test_session.send_packet()
1159 wait_for_bfd_packet(self)
1160 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1162 def test_mismatch_auth(self):
1163 """ session is not brought down by unauthenticated msg """
1164 key = self.factory.create_random_key(self)
1165 key.add_vpp_config()
1166 vpp_session = VppBFDUDPSession(
1167 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1168 legitimate_test_session = BFDTestSession(
1169 self, self.pg0, AF_INET, sha1_key=key,
1170 bfd_key_id=vpp_session.bfd_key_id)
1171 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1172 self.execute_rogue_session_scenario(vpp_session,
1173 legitimate_test_session,
1176 def test_mismatch_bfd_key_id(self):
1177 """ session is not brought down by msg with non-existent key-id """
1178 key = self.factory.create_random_key(self)
1179 key.add_vpp_config()
1180 vpp_session = VppBFDUDPSession(
1181 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1182 # pick a different random bfd key id
1184 while x == vpp_session.bfd_key_id:
1186 legitimate_test_session = BFDTestSession(
1187 self, self.pg0, AF_INET, sha1_key=key,
1188 bfd_key_id=vpp_session.bfd_key_id)
1189 rogue_test_session = BFDTestSession(
1190 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1191 self.execute_rogue_session_scenario(vpp_session,
1192 legitimate_test_session,
1195 def test_mismatched_auth_type(self):
1196 """ session is not brought down by msg with wrong auth type """
1197 key = self.factory.create_random_key(self)
1198 key.add_vpp_config()
1199 vpp_session = VppBFDUDPSession(
1200 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1201 legitimate_test_session = BFDTestSession(
1202 self, self.pg0, AF_INET, sha1_key=key,
1203 bfd_key_id=vpp_session.bfd_key_id)
1204 rogue_test_session = BFDTestSession(
1205 self, self.pg0, AF_INET, sha1_key=key,
1206 bfd_key_id=vpp_session.bfd_key_id)
1207 self.execute_rogue_session_scenario(
1208 vpp_session, legitimate_test_session, rogue_test_session,
1209 {'auth_type': BFDAuthType.keyed_md5})
1211 def test_restart(self):
1212 """ simulate remote peer restart and resynchronization """
1213 key = self.factory.create_random_key(
1214 self, BFDAuthType.meticulous_keyed_sha1)
1215 key.add_vpp_config()
1216 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1217 self.pg0.remote_ip4, sha1_key=key)
1218 self.vpp_session.add_vpp_config()
1219 self.vpp_session.admin_up()
1220 self.test_session = BFDTestSession(
1221 self, self.pg0, AF_INET, sha1_key=key,
1222 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1223 bfd_session_up(self)
1224 # don't send any packets for 2*detection_time
1225 detection_time = self.vpp_session.detect_mult *\
1226 self.vpp_session.required_min_rx / USEC_IN_SEC
1227 self.sleep(detection_time, "simulating peer restart")
1228 events = self.vapi.collect_events()
1229 self.assert_equal(len(events), 1, "number of bfd events")
1230 verify_event(self, events[0], expected_state=BFDState.down)
1231 self.test_session.update(state=BFDState.down)
1232 # reset sequence number
1233 self.test_session.our_seq_number = 0
1234 self.test_session.vpp_seq_number = None
1235 # now throw away any pending packets
1236 self.pg0.enable_capture()
1237 bfd_session_up(self)
1240 class BFDAuthOnOffTestCase(VppTestCase):
1241 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1248 def setUpClass(cls):
1249 super(BFDAuthOnOffTestCase, cls).setUpClass()
1251 cls.create_pg_interfaces([0])
1252 cls.pg0.config_ip4()
1254 cls.pg0.resolve_arp()
1257 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1261 super(BFDAuthOnOffTestCase, self).setUp()
1262 self.factory = AuthKeyFactory()
1263 self.vapi.want_bfd_events()
1264 self.pg0.enable_capture()
1267 if not self.vpp_dead:
1268 self.vapi.want_bfd_events(enable_disable=0)
1269 self.vapi.collect_events() # clear the event queue
1270 super(BFDAuthOnOffTestCase, self).tearDown()
1272 def test_auth_on_immediate(self):
1273 """ turn auth on without disturbing session state (immediate) """
1274 key = self.factory.create_random_key(self)
1275 key.add_vpp_config()
1276 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1277 self.pg0.remote_ip4)
1278 self.vpp_session.add_vpp_config()
1279 self.vpp_session.admin_up()
1280 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1281 bfd_session_up(self)
1282 for dummy in range(self.test_session.detect_mult * 2):
1283 p = wait_for_bfd_packet(self)
1284 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1285 self.test_session.send_packet()
1286 self.vpp_session.activate_auth(key)
1287 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1288 self.test_session.sha1_key = key
1289 for dummy in range(self.test_session.detect_mult * 2):
1290 p = wait_for_bfd_packet(self)
1291 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1292 self.test_session.send_packet()
1293 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1294 self.assert_equal(len(self.vapi.collect_events()), 0,
1295 "number of bfd events")
1297 def test_auth_off_immediate(self):
1298 """ turn auth off without disturbing session state (immediate) """
1299 key = self.factory.create_random_key(self)
1300 key.add_vpp_config()
1301 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1302 self.pg0.remote_ip4, sha1_key=key)
1303 self.vpp_session.add_vpp_config()
1304 self.vpp_session.admin_up()
1305 self.test_session = BFDTestSession(
1306 self, self.pg0, AF_INET, sha1_key=key,
1307 bfd_key_id=self.vpp_session.bfd_key_id)
1308 bfd_session_up(self)
1309 # self.vapi.want_bfd_events(enable_disable=0)
1310 for dummy in range(self.test_session.detect_mult * 2):
1311 p = wait_for_bfd_packet(self)
1312 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1313 self.test_session.inc_seq_num()
1314 self.test_session.send_packet()
1315 self.vpp_session.deactivate_auth()
1316 self.test_session.bfd_key_id = None
1317 self.test_session.sha1_key = None
1318 for dummy in range(self.test_session.detect_mult * 2):
1319 p = wait_for_bfd_packet(self)
1320 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1321 self.test_session.inc_seq_num()
1322 self.test_session.send_packet()
1323 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1324 self.assert_equal(len(self.vapi.collect_events()), 0,
1325 "number of bfd events")
1327 def test_auth_change_key_immediate(self):
1328 """ change auth key without disturbing session state (immediate) """
1329 key1 = self.factory.create_random_key(self)
1330 key1.add_vpp_config()
1331 key2 = self.factory.create_random_key(self)
1332 key2.add_vpp_config()
1333 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1334 self.pg0.remote_ip4, sha1_key=key1)
1335 self.vpp_session.add_vpp_config()
1336 self.vpp_session.admin_up()
1337 self.test_session = BFDTestSession(
1338 self, self.pg0, AF_INET, sha1_key=key1,
1339 bfd_key_id=self.vpp_session.bfd_key_id)
1340 bfd_session_up(self)
1341 for dummy in range(self.test_session.detect_mult * 2):
1342 p = wait_for_bfd_packet(self)
1343 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1344 self.test_session.send_packet()
1345 self.vpp_session.activate_auth(key2)
1346 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1347 self.test_session.sha1_key = key2
1348 for dummy in range(self.test_session.detect_mult * 2):
1349 p = wait_for_bfd_packet(self)
1350 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1351 self.test_session.send_packet()
1352 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1353 self.assert_equal(len(self.vapi.collect_events()), 0,
1354 "number of bfd events")
1356 def test_auth_on_delayed(self):
1357 """ turn auth on without disturbing session state (delayed) """
1358 key = self.factory.create_random_key(self)
1359 key.add_vpp_config()
1360 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1361 self.pg0.remote_ip4)
1362 self.vpp_session.add_vpp_config()
1363 self.vpp_session.admin_up()
1364 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1365 bfd_session_up(self)
1366 for dummy in range(self.test_session.detect_mult * 2):
1367 wait_for_bfd_packet(self)
1368 self.test_session.send_packet()
1369 self.vpp_session.activate_auth(key, delayed=True)
1370 for dummy in range(self.test_session.detect_mult * 2):
1371 p = wait_for_bfd_packet(self)
1372 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1373 self.test_session.send_packet()
1374 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1375 self.test_session.sha1_key = key
1376 self.test_session.send_packet()
1377 for dummy in range(self.test_session.detect_mult * 2):
1378 p = wait_for_bfd_packet(self)
1379 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1380 self.test_session.send_packet()
1381 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1382 self.assert_equal(len(self.vapi.collect_events()), 0,
1383 "number of bfd events")
1385 def test_auth_off_delayed(self):
1386 """ turn auth off without disturbing session state (delayed) """
1387 key = self.factory.create_random_key(self)
1388 key.add_vpp_config()
1389 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1390 self.pg0.remote_ip4, sha1_key=key)
1391 self.vpp_session.add_vpp_config()
1392 self.vpp_session.admin_up()
1393 self.test_session = BFDTestSession(
1394 self, self.pg0, AF_INET, sha1_key=key,
1395 bfd_key_id=self.vpp_session.bfd_key_id)
1396 bfd_session_up(self)
1397 for dummy in range(self.test_session.detect_mult * 2):
1398 p = wait_for_bfd_packet(self)
1399 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1400 self.test_session.send_packet()
1401 self.vpp_session.deactivate_auth(delayed=True)
1402 for dummy in range(self.test_session.detect_mult * 2):
1403 p = wait_for_bfd_packet(self)
1404 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1405 self.test_session.send_packet()
1406 self.test_session.bfd_key_id = None
1407 self.test_session.sha1_key = None
1408 self.test_session.send_packet()
1409 for dummy in range(self.test_session.detect_mult * 2):
1410 p = wait_for_bfd_packet(self)
1411 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1412 self.test_session.send_packet()
1413 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1414 self.assert_equal(len(self.vapi.collect_events()), 0,
1415 "number of bfd events")
1417 def test_auth_change_key_delayed(self):
1418 """ change auth key without disturbing session state (delayed) """
1419 key1 = self.factory.create_random_key(self)
1420 key1.add_vpp_config()
1421 key2 = self.factory.create_random_key(self)
1422 key2.add_vpp_config()
1423 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1424 self.pg0.remote_ip4, sha1_key=key1)
1425 self.vpp_session.add_vpp_config()
1426 self.vpp_session.admin_up()
1427 self.test_session = BFDTestSession(
1428 self, self.pg0, AF_INET, sha1_key=key1,
1429 bfd_key_id=self.vpp_session.bfd_key_id)
1430 bfd_session_up(self)
1431 for dummy in range(self.test_session.detect_mult * 2):
1432 p = wait_for_bfd_packet(self)
1433 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1434 self.test_session.send_packet()
1435 self.vpp_session.activate_auth(key2, delayed=True)
1436 for dummy in range(self.test_session.detect_mult * 2):
1437 p = wait_for_bfd_packet(self)
1438 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1439 self.test_session.send_packet()
1440 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1441 self.test_session.sha1_key = key2
1442 self.test_session.send_packet()
1443 for dummy in range(self.test_session.detect_mult * 2):
1444 p = wait_for_bfd_packet(self)
1445 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1446 self.test_session.send_packet()
1447 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1448 self.assert_equal(len(self.vapi.collect_events()), 0,
1449 "number of bfd events")
1451 if __name__ == '__main__':
1452 unittest.main(testRunner=VppTestRunner)