3 from __future__ import division
8 from random import randint
10 from framework import *
16 class AuthKeyFactory(object):
17 """Factory class for creating auth keys with unique conf key ID"""
20 self._conf_key_ids = {}
22 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
23 conf_key_id = randint(0, 0xFFFFFFFF)
24 while conf_key_id in self._conf_key_ids:
25 conf_key_id = randint(0, 0xFFFFFFFF)
26 self._conf_key_ids[conf_key_id] = 1
27 key = str(bytearray([randint(0, 255) for j in range(randint(1, 20))]))
28 return VppBFDAuthKey(test=test, auth_type=auth_type,
29 conf_key_id=conf_key_id, key=key)
32 class BFDAPITestCase(VppTestCase):
33 """Bidirectional Forwarding Detection (BFD) - API"""
37 super(BFDAPITestCase, cls).setUpClass()
40 cls.create_pg_interfaces(range(2))
41 for i in cls.pg_interfaces:
47 super(BFDAPITestCase, cls).tearDownClass()
51 super(BFDAPITestCase, self).setUp()
52 self.factory = AuthKeyFactory()
54 def test_add_bfd(self):
55 """ create a BFD session """
56 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
57 session.add_vpp_config()
58 self.logger.debug("Session state is %s" % str(session.state))
59 session.remove_vpp_config()
60 session.add_vpp_config()
61 self.logger.debug("Session state is %s" % str(session.state))
62 session.remove_vpp_config()
64 def test_double_add(self):
65 """ create the same BFD session twice (negative case) """
66 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
67 session.add_vpp_config()
69 with self.vapi.expect_negative_api_retval():
70 session.add_vpp_config()
72 session.remove_vpp_config()
74 def test_add_bfd6(self):
75 """ create IPv6 BFD session """
76 session = VppBFDUDPSession(
77 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
78 session.add_vpp_config()
79 self.logger.debug("Session state is %s" % str(session.state))
80 session.remove_vpp_config()
81 session.add_vpp_config()
82 self.logger.debug("Session state is %s" % str(session.state))
83 session.remove_vpp_config()
85 def test_mod_bfd(self):
86 """ modify BFD session parameters """
87 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
89 required_min_rx=10000,
91 session.add_vpp_config()
92 e = session.get_bfd_udp_session_dump_entry()
93 self.assert_equal(session.desired_min_tx,
95 "desired min transmit interval")
96 self.assert_equal(session.required_min_rx,
98 "required min receive interval")
99 self.assert_equal(session.detect_mult, e.detect_mult, "detect mult")
100 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
101 required_min_rx=session.required_min_rx * 2,
102 detect_mult=session.detect_mult * 2)
103 e = session.get_bfd_udp_session_dump_entry()
104 self.assert_equal(session.desired_min_tx,
106 "desired min transmit interval")
107 self.assert_equal(session.required_min_rx,
109 "required min receive interval")
110 self.assert_equal(session.detect_mult, e.detect_mult, "detect mult")
112 def test_add_sha1_keys(self):
113 """ add SHA1 keys """
115 keys = [self.factory.create_random_key(
116 self) for i in range(0, key_count)]
118 self.assertFalse(key.query_vpp_config())
122 self.assertTrue(key.query_vpp_config())
124 indexes = range(key_count)
125 random.shuffle(indexes)
129 key.remove_vpp_config()
131 for j in range(key_count):
134 self.assertFalse(key.query_vpp_config())
136 self.assertTrue(key.query_vpp_config())
137 # should be removed now
139 self.assertFalse(key.query_vpp_config())
140 # add back and remove again
144 self.assertTrue(key.query_vpp_config())
146 key.remove_vpp_config()
148 self.assertFalse(key.query_vpp_config())
150 def test_add_bfd_sha1(self):
151 """ create a BFD session (SHA1) """
152 key = self.factory.create_random_key(self)
154 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
156 session.add_vpp_config()
157 self.logger.debug("Session state is %s" % str(session.state))
158 session.remove_vpp_config()
159 session.add_vpp_config()
160 self.logger.debug("Session state is %s" % str(session.state))
161 session.remove_vpp_config()
163 def test_double_add_sha1(self):
164 """ create the same BFD session twice (negative case) (SHA1) """
165 key = self.factory.create_random_key(self)
167 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
169 session.add_vpp_config()
170 with self.assertRaises(Exception):
171 session.add_vpp_config()
173 def test_add_authenticated_with_nonexistent_key(self):
174 """ create BFD session using non-existent SHA1 (negative case) """
175 session = VppBFDUDPSession(
176 self, self.pg0, self.pg0.remote_ip4,
177 sha1_key=self.factory.create_random_key(self))
178 with self.assertRaises(Exception):
179 session.add_vpp_config()
181 def test_shared_sha1_key(self):
182 """ share single SHA1 key between multiple BFD sessions """
183 key = self.factory.create_random_key(self)
186 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
188 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
189 sha1_key=key, af=AF_INET6),
190 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
192 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
193 sha1_key=key, af=AF_INET6)]
198 e = key.get_bfd_auth_keys_dump_entry()
199 self.assert_equal(e.use_count, len(sessions) - removed,
200 "Use count for shared key")
201 s.remove_vpp_config()
203 e = key.get_bfd_auth_keys_dump_entry()
204 self.assert_equal(e.use_count, len(sessions) - removed,
205 "Use count for shared key")
207 def test_activate_auth(self):
208 """ activate SHA1 authentication """
209 key = self.factory.create_random_key(self)
211 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
212 session.add_vpp_config()
213 session.activate_auth(key)
215 def test_deactivate_auth(self):
216 """ deactivate SHA1 authentication """
217 key = self.factory.create_random_key(self)
219 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
220 session.add_vpp_config()
221 session.activate_auth(key)
222 session.deactivate_auth()
224 def test_change_key(self):
225 """ change SHA1 key """
226 key1 = self.factory.create_random_key(self)
227 key2 = self.factory.create_random_key(self)
228 while key2.conf_key_id == key1.conf_key_id:
229 key2 = self.factory.create_random_key(self)
230 key1.add_vpp_config()
231 key2.add_vpp_config()
232 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
234 session.add_vpp_config()
235 session.activate_auth(key2)
238 class BFDTestSession(object):
239 """ BFD session as seen from test framework side """
241 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
242 bfd_key_id=None, our_seq_number=0xFFFFFFFF - 4):
245 self.sha1_key = sha1_key
246 self.bfd_key_id = bfd_key_id
247 self.interface = interface
248 self.udp_sport = 50000
249 self.our_seq_number = our_seq_number
250 self.vpp_seq_number = None
252 'my_discriminator': 0,
253 'desired_min_tx_interval': 100000,
254 'detect_mult': detect_mult,
255 'diag': BFDDiagCode.no_diagnostic,
258 def inc_seq_num(self):
259 if self.our_seq_number == 0xFFFFFFFF:
260 self.our_seq_number = 0
262 self.our_seq_number += 1
264 def update(self, **kwargs):
265 self.bfd_values.update(kwargs)
267 def create_packet(self):
270 bfd.auth_type = self.sha1_key.auth_type
271 bfd.auth_len = BFD.sha1_auth_len
272 bfd.auth_key_id = self.bfd_key_id
273 bfd.auth_seq_num = self.our_seq_number
274 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
277 if self.af == AF_INET6:
278 packet = (Ether(src=self.interface.remote_mac,
279 dst=self.interface.local_mac) /
280 IPv6(src=self.interface.remote_ip6,
281 dst=self.interface.local_ip6,
283 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
286 packet = (Ether(src=self.interface.remote_mac,
287 dst=self.interface.local_mac) /
288 IP(src=self.interface.remote_ip4,
289 dst=self.interface.local_ip4,
291 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
293 self.test.logger.debug("BFD: Creating packet")
294 for name, value in self.bfd_values.iteritems():
295 self.test.logger.debug("BFD: setting packet.%s=%s", name, value)
296 packet[BFD].setfieldval(name, value)
298 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
299 "\0" * (20 - len(self.sha1_key.key))
300 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
301 hashlib.sha1(hash_material).hexdigest())
302 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
305 def send_packet(self, packet=None):
307 packet = self.create_packet()
308 self.test.logger.debug(ppp("Sending packet:", packet))
309 self.test.pg0.add_stream([packet])
312 def verify_sha1_auth(self, packet):
313 """ Verify correctness of authentication in BFD layer. """
315 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
316 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
318 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
319 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
320 if self.vpp_seq_number is None:
321 self.vpp_seq_number = bfd.auth_seq_num
322 self.test.logger.debug("Received initial sequence number: %s" %
325 recvd_seq_num = bfd.auth_seq_num
326 self.test.logger.debug("Received followup sequence number: %s" %
328 if self.vpp_seq_number < 0xffffffff:
329 if self.sha1_key.auth_type == \
330 BFDAuthType.meticulous_keyed_sha1:
331 self.test.assert_equal(recvd_seq_num,
332 self.vpp_seq_number + 1,
333 "BFD sequence number")
335 self.test.assert_in_range(recvd_seq_num,
337 self.vpp_seq_number + 1,
338 "BFD sequence number")
340 if self.sha1_key.auth_type == \
341 BFDAuthType.meticulous_keyed_sha1:
342 self.test.assert_equal(recvd_seq_num, 0,
343 "BFD sequence number")
345 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
346 "BFD sequence number not one of "
347 "(%s, 0)" % self.vpp_seq_number)
348 self.vpp_seq_number = recvd_seq_num
349 # last 20 bytes represent the hash - so replace them with the key,
350 # pad the result with zeros and hash the result
351 hash_material = bfd.original[:-20] + self.sha1_key.key + \
352 "\0" * (20 - len(self.sha1_key.key))
353 expected_hash = hashlib.sha1(hash_material).hexdigest()
354 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
355 expected_hash, "Auth key hash")
357 def verify_bfd(self, packet):
358 """ Verify correctness of BFD layer. """
360 self.test.assert_equal(bfd.version, 1, "BFD version")
361 self.test.assert_equal(bfd.your_discriminator,
362 self.bfd_values['my_discriminator'],
363 "BFD - your discriminator")
365 self.verify_sha1_auth(packet)
369 """Common code used by both IPv4 and IPv6 Test Cases"""
372 self.vapi.collect_events() # clear the event queue
373 if not self.vpp_dead:
374 self.vapi.want_bfd_events(enable_disable=0)
376 def bfd_session_up(self):
377 """ Bring BFD session up """
378 self.pg_enable_capture([self.pg0])
379 self.logger.info("BFD: Waiting for slow hello")
380 p, timeout = self.wait_for_bfd_packet(2)
381 self.logger.info("BFD: Sending Init")
382 self.test_session.update(my_discriminator=randint(0, 40000000),
383 your_discriminator=p[BFD].my_discriminator,
385 required_min_rx_interval=100000)
386 self.test_session.send_packet()
387 self.logger.info("BFD: Waiting for event")
388 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
389 self.verify_event(e, expected_state=BFDState.up)
390 self.logger.info("BFD: Session is Up")
391 self.test_session.update(state=BFDState.up)
392 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
394 def bfd_session_down(self):
395 """ Bring BFD session down """
396 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
397 self.test_session.update(state=BFDState.down)
398 self.test_session.send_packet()
399 self.logger.info("BFD: Waiting for event")
400 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
401 self.verify_event(e, expected_state=BFDState.down)
402 self.logger.info("BFD: Session is Down")
403 self.assert_equal(self.vpp_session.state, BFDState.down, BFDState)
405 def verify_ip(self, packet):
406 """ Verify correctness of IP layer. """
407 if self.vpp_session.af == AF_INET6:
409 local_ip = self.pg0.local_ip6
410 remote_ip = self.pg0.remote_ip6
411 self.assert_equal(ip.hlim, 255, "IPv6 hop limit")
414 local_ip = self.pg0.local_ip4
415 remote_ip = self.pg0.remote_ip4
416 self.assert_equal(ip.ttl, 255, "IPv4 TTL")
417 self.assert_equal(ip.src, local_ip, "IP source address")
418 self.assert_equal(ip.dst, remote_ip, "IP destination address")
420 def verify_udp(self, packet):
421 """ Verify correctness of UDP layer. """
423 self.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
424 self.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
427 def verify_event(self, event, expected_state):
428 """ Verify correctness of event values. """
430 self.logger.debug("BFD: Event: %s" % repr(e))
431 self.assert_equal(e.sw_if_index,
432 self.vpp_session.interface.sw_if_index,
433 "BFD interface index")
435 if self.vpp_session.af == AF_INET6:
437 self.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
438 if self.vpp_session.af == AF_INET:
439 self.assert_equal(e.local_addr[:4], self.vpp_session.local_addr_n,
440 "Local IPv4 address")
441 self.assert_equal(e.peer_addr[:4], self.vpp_session.peer_addr_n,
444 self.assert_equal(e.local_addr, self.vpp_session.local_addr_n,
445 "Local IPv6 address")
446 self.assert_equal(e.peer_addr, self.vpp_session.peer_addr_n,
448 self.assert_equal(e.state, expected_state, BFDState)
450 def wait_for_bfd_packet(self, timeout=1):
451 """ wait for BFD packet
453 :param timeout: how long to wait max
455 :returns: tuple (packet, time spent waiting for packet)
457 self.logger.info("BFD: Waiting for BFD packet")
459 p = self.pg0.wait_for_packet(timeout=timeout)
461 self.logger.debug(ppp("BFD: Got packet:", p))
464 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
466 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
469 self.test_session.verify_bfd(p)
470 return p, after - before
473 class BFD4TestCase(VppTestCase, BFDCommonCode):
474 """Bidirectional Forwarding Detection (BFD)"""
478 super(BFD4TestCase, cls).setUpClass()
480 cls.create_pg_interfaces([0])
482 cls.pg0.configure_ipv4_neighbors()
484 cls.pg0.resolve_arp()
487 super(BFD4TestCase, cls).tearDownClass()
491 super(BFD4TestCase, self).setUp()
492 self.factory = AuthKeyFactory()
493 self.vapi.want_bfd_events()
495 self.vpp_session = VppBFDUDPSession(self, self.pg0,
497 self.vpp_session.add_vpp_config()
498 self.vpp_session.admin_up()
499 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
501 self.vapi.want_bfd_events(enable_disable=0)
505 BFDCommonCode.tearDown(self)
506 VppTestCase.tearDown(self)
508 def test_session_up(self):
509 """ bring BFD session up """
510 self.bfd_session_up()
512 def test_session_down(self):
513 """ bring BFD session down """
514 self.bfd_session_up()
515 self.bfd_session_down()
517 def test_hold_up(self):
518 """ hold BFD session up """
519 self.bfd_session_up()
521 self.wait_for_bfd_packet()
522 self.test_session.send_packet()
523 self.assert_equal(len(self.vapi.collect_events()), 0,
524 "number of bfd events")
526 def test_slow_timer(self):
527 """ verify slow periodic control frames while session down """
528 self.pg_enable_capture([self.pg0])
530 self.logger.info("BFD: Waiting for %d BFD packets" % expected_packets)
531 self.wait_for_bfd_packet(2)
532 for i in range(expected_packets):
534 self.wait_for_bfd_packet(2)
536 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
537 # to work around timing issues
538 self.assert_in_range(
539 after - before, 0.70, 1.05, "time between slow packets")
542 def test_zero_remote_min_rx(self):
543 """ no packets when zero BFD RemoteMinRxInterval """
544 self.pg_enable_capture([self.pg0])
545 p, timeout = self.wait_for_bfd_packet(2)
546 self.test_session.update(my_discriminator=randint(0, 40000000),
547 your_discriminator=p[BFD].my_discriminator,
549 required_min_rx_interval=0)
550 self.test_session.send_packet()
551 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
552 self.verify_event(e, expected_state=BFDState.up)
554 cap = 2 * self.vpp_session.desired_min_tx *\
555 self.vpp_session.detect_mult
558 # busy wait here, trying to collect a packet or event, vpp is not
559 # allowed to send packets and the session will timeout first - so the
560 # Up->Down event must arrive before any packets do
561 while time.time() < now + cap / us_in_sec:
563 p, ttp = self.wait_for_bfd_packet(timeout=0)
564 self.logger.error(ppp("Received unexpected packet:", p))
568 events = self.vapi.collect_events()
570 self.verify_event(events[0], BFDState.down)
572 self.assert_equal(count, 0, "number of packets received")
574 def test_conn_down(self):
575 """ verify session goes down after inactivity """
576 self.bfd_session_up()
577 self.wait_for_bfd_packet()
578 self.assert_equal(len(self.vapi.collect_events()), 0,
579 "number of bfd events")
580 self.wait_for_bfd_packet()
581 self.assert_equal(len(self.vapi.collect_events()), 0,
582 "number of bfd events")
583 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
584 self.verify_event(e, expected_state=BFDState.down)
586 def test_large_required_min_rx(self):
587 """ large remote RequiredMinRxInterval """
588 self.bfd_session_up()
589 self.wait_for_bfd_packet()
591 self.test_session.update(required_min_rx_interval=interval)
592 self.test_session.send_packet()
595 # busy wait here, trying to collect a packet or event, vpp is not
596 # allowed to send packets and the session will timeout first - so the
597 # Up->Down event must arrive before any packets do
598 while time.time() < now + interval / us_in_sec:
600 p, ttp = self.wait_for_bfd_packet(timeout=0)
601 self.logger.error(ppp("Received unexpected packet:", p))
605 events = self.vapi.collect_events()
607 self.verify_event(events[0], BFDState.down)
609 self.assert_equal(count, 0, "number of packets received")
611 def test_immediate_remote_min_rx_reduce(self):
612 """ immediately honor remote min rx reduction """
613 self.vpp_session.remove_vpp_config()
614 self.vpp_session = VppBFDUDPSession(
615 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
616 self.vpp_session.add_vpp_config()
617 self.test_session.update(desired_min_tx_interval=1000000,
618 required_min_rx_interval=1000000)
619 self.bfd_session_up()
620 self.wait_for_bfd_packet()
622 self.test_session.update(required_min_rx_interval=interval)
623 self.test_session.send_packet()
624 p, ttp = self.wait_for_bfd_packet()
625 # allow extra 10% to work around timing issues, first packet is special
626 self.assert_in_range(ttp, 0, 1.10 * interval / us_in_sec,
627 "time between BFD packets")
628 p, ttp = self.wait_for_bfd_packet()
629 self.assert_in_range(ttp, .9 * .75 * interval / us_in_sec,
630 1.10 * interval / us_in_sec,
631 "time between BFD packets")
632 p, ttp = self.wait_for_bfd_packet()
633 self.assert_in_range(ttp, .9 * .75 * interval / us_in_sec,
634 1.10 * interval / us_in_sec,
635 "time between BFD packets")
637 def test_modify_req_min_rx_double(self):
638 """ modify session - double required min rx """
639 self.bfd_session_up()
640 self.wait_for_bfd_packet()
641 self.test_session.update(desired_min_tx_interval=10000,
642 required_min_rx_interval=10000)
643 self.test_session.send_packet()
644 # first double required min rx
645 self.vpp_session.modify_parameters(
646 required_min_rx=2 * self.vpp_session.required_min_rx)
647 p, ttp = self.wait_for_bfd_packet()
648 # poll bit needs to be set
649 self.assertIn("P", p.sprintf("%BFD.flags%"),
650 "Poll bit not set in BFD packet")
651 # finish poll sequence with final packet
652 final = self.test_session.create_packet()
653 final[BFD].flags = "F"
654 self.test_session.send_packet(final)
655 # now we can wait 0.9*3*req-min-rx and the session should still be up
656 self.sleep(0.9 * self.vpp_session.detect_mult *
657 self.vpp_session.required_min_rx / us_in_sec)
658 self.assert_equal(len(self.vapi.collect_events()), 0,
659 "number of bfd events")
661 def test_modify_req_min_rx_halve(self):
662 """ modify session - halve required min rx """
663 self.vpp_session.modify_parameters(
664 required_min_rx=2 * self.vpp_session.required_min_rx)
665 self.bfd_session_up()
666 self.wait_for_bfd_packet()
667 self.test_session.update(desired_min_tx_interval=10000,
668 required_min_rx_interval=10000)
669 self.test_session.send_packet()
670 p, ttp = self.wait_for_bfd_packet()
671 # halve required min rx
672 old_required_min_rx = self.vpp_session.required_min_rx
673 self.vpp_session.modify_parameters(
674 required_min_rx=0.5 * self.vpp_session.required_min_rx)
675 # now we wait 0.8*3*old-req-min-rx and the session should still be up
676 self.sleep(0.8 * self.vpp_session.detect_mult *
677 old_required_min_rx / us_in_sec)
678 self.assert_equal(len(self.vapi.collect_events()), 0,
679 "number of bfd events")
680 p, ttp = self.wait_for_bfd_packet()
681 # poll bit needs to be set
682 self.assertIn("P", p.sprintf("%BFD.flags%"),
683 "Poll bit not set in BFD packet")
684 # finish poll sequence with final packet
685 final = self.test_session.create_packet()
686 final[BFD].flags = "F"
687 self.test_session.send_packet(final)
688 # now the session should time out under new conditions
690 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
692 detection_time = self.vpp_session.detect_mult *\
693 self.vpp_session.required_min_rx / us_in_sec
694 self.assert_in_range(after - before,
695 0.9 * detection_time,
696 1.1 * detection_time,
697 "time before bfd session goes down")
698 self.verify_event(e, expected_state=BFDState.down)
700 def test_modify_des_min_tx(self):
701 """ modify desired min tx interval """
704 def test_modify_detect_mult(self):
705 """ modify detect multiplier """
706 self.bfd_session_up()
707 self.vpp_session.modify_parameters(detect_mult=1)
708 p, ttp = self.wait_for_bfd_packet()
709 self.assert_equal(self.vpp_session.detect_mult,
712 # poll bit must not be set
713 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
714 "Poll bit not set in BFD packet")
715 self.vpp_session.modify_parameters(detect_mult=10)
716 p, ttp = self.wait_for_bfd_packet()
717 self.assert_equal(self.vpp_session.detect_mult,
720 # poll bit must not be set
721 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
722 "Poll bit not set in BFD packet")
725 class BFD6TestCase(VppTestCase, BFDCommonCode):
726 """Bidirectional Forwarding Detection (BFD) (IPv6) """
730 super(BFD6TestCase, cls).setUpClass()
732 cls.create_pg_interfaces([0])
734 cls.pg0.configure_ipv6_neighbors()
736 cls.pg0.resolve_ndp()
739 super(BFD6TestCase, cls).tearDownClass()
743 super(BFD6TestCase, self).setUp()
744 self.factory = AuthKeyFactory()
745 self.vapi.want_bfd_events()
747 self.vpp_session = VppBFDUDPSession(self, self.pg0,
750 self.vpp_session.add_vpp_config()
751 self.vpp_session.admin_up()
752 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
753 self.logger.debug(self.vapi.cli("show adj nbr"))
755 self.vapi.want_bfd_events(enable_disable=0)
759 BFDCommonCode.tearDown(self)
760 VppTestCase.tearDown(self)
762 def test_session_up(self):
763 """ bring BFD session up """
764 self.bfd_session_up()
766 def test_hold_up(self):
767 """ hold BFD session up """
768 self.bfd_session_up()
770 self.wait_for_bfd_packet()
771 self.test_session.send_packet()
772 self.assert_equal(len(self.vapi.collect_events()), 0,
773 "number of bfd events")
774 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
777 class BFDSHA1TestCase(VppTestCase, BFDCommonCode):
778 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
782 super(BFDSHA1TestCase, cls).setUpClass()
784 cls.create_pg_interfaces([0])
787 cls.pg0.resolve_arp()
790 super(BFDSHA1TestCase, cls).tearDownClass()
794 super(BFDSHA1TestCase, self).setUp()
795 self.factory = AuthKeyFactory()
796 self.vapi.want_bfd_events()
799 BFDCommonCode.tearDown(self)
800 VppTestCase.tearDown(self)
802 def test_session_up(self):
803 """ bring BFD session up """
804 key = self.factory.create_random_key(self)
806 self.vpp_session = VppBFDUDPSession(self, self.pg0,
809 self.vpp_session.add_vpp_config()
810 self.vpp_session.admin_up()
811 self.test_session = BFDTestSession(
812 self, self.pg0, AF_INET, sha1_key=key,
813 bfd_key_id=self.vpp_session.bfd_key_id)
814 self.bfd_session_up()
816 def test_hold_up(self):
817 """ hold BFD session up """
818 key = self.factory.create_random_key(self)
820 self.vpp_session = VppBFDUDPSession(self, self.pg0,
823 self.vpp_session.add_vpp_config()
824 self.vpp_session.admin_up()
825 self.test_session = BFDTestSession(
826 self, self.pg0, AF_INET, sha1_key=key,
827 bfd_key_id=self.vpp_session.bfd_key_id)
828 self.bfd_session_up()
830 self.wait_for_bfd_packet()
831 self.test_session.send_packet()
832 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
834 def test_hold_up_meticulous(self):
835 """ hold BFD session up - meticulous auth """
836 key = self.factory.create_random_key(
837 self, BFDAuthType.meticulous_keyed_sha1)
839 self.vpp_session = VppBFDUDPSession(self, self.pg0,
840 self.pg0.remote_ip4, sha1_key=key)
841 self.vpp_session.add_vpp_config()
842 self.vpp_session.admin_up()
843 self.test_session = BFDTestSession(
844 self, self.pg0, AF_INET, sha1_key=key,
845 bfd_key_id=self.vpp_session.bfd_key_id)
846 self.bfd_session_up()
848 self.wait_for_bfd_packet()
849 self.test_session.inc_seq_num()
850 self.test_session.send_packet()
851 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
853 def test_send_bad_seq_number(self):
854 """ session is not kept alive by msgs with bad seq numbers"""
855 key = self.factory.create_random_key(
856 self, BFDAuthType.meticulous_keyed_sha1)
858 self.vpp_session = VppBFDUDPSession(self, self.pg0,
859 self.pg0.remote_ip4, sha1_key=key)
860 self.vpp_session.add_vpp_config()
861 self.vpp_session.admin_up()
862 self.test_session = BFDTestSession(
863 self, self.pg0, AF_INET, sha1_key=key,
864 bfd_key_id=self.vpp_session.bfd_key_id)
865 self.bfd_session_up()
866 self.wait_for_bfd_packet()
867 self.test_session.send_packet()
868 self.assert_equal(len(self.vapi.collect_events()), 0,
869 "number of bfd events")
870 self.wait_for_bfd_packet()
871 self.test_session.send_packet()
872 self.assert_equal(len(self.vapi.collect_events()), 0,
873 "number of bfd events")
874 self.wait_for_bfd_packet()
875 self.test_session.send_packet()
876 self.wait_for_bfd_packet()
877 self.test_session.send_packet()
878 e = self.vapi.collect_events()
879 # session should be down now, because the sequence numbers weren't
881 self.assert_equal(len(e), 1, "number of bfd events")
882 self.verify_event(e[0], expected_state=BFDState.down)
884 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
885 legitimate_test_session,
887 rogue_bfd_values=None):
888 """ execute a rogue session interaction scenario
890 1. create vpp session, add config
891 2. bring the legitimate session up
892 3. copy the bfd values from legitimate session to rogue session
893 4. apply rogue_bfd_values to rogue session
894 5. set rogue session state to down
895 6. send message to take the session down from the rogue session
896 7. assert that the legitimate session is unaffected
899 self.vpp_session = vpp_bfd_udp_session
900 self.vpp_session.add_vpp_config()
901 self.vpp_session.admin_up()
902 self.test_session = legitimate_test_session
903 # bring vpp session up
904 self.bfd_session_up()
905 # send packet from rogue session
906 rogue_test_session.bfd_values = self.test_session.bfd_values.copy()
908 rogue_test_session.update(**rogue_bfd_values)
909 rogue_test_session.update(state=BFDState.down)
910 rogue_test_session.send_packet()
911 self.wait_for_bfd_packet()
912 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
914 def test_mismatch_auth(self):
915 """ session is not brought down by unauthenticated msg """
916 key = self.factory.create_random_key(self)
918 vpp_session = VppBFDUDPSession(
919 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
920 legitimate_test_session = BFDTestSession(
921 self, self.pg0, AF_INET, sha1_key=key,
922 bfd_key_id=vpp_session.bfd_key_id)
923 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
924 self.execute_rogue_session_scenario(vpp_session,
925 legitimate_test_session,
928 def test_mismatch_bfd_key_id(self):
929 """ session is not brought down by msg with non-existent key-id """
930 key = self.factory.create_random_key(self)
932 vpp_session = VppBFDUDPSession(
933 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
934 # pick a different random bfd key id
936 while x == vpp_session.bfd_key_id:
938 legitimate_test_session = BFDTestSession(
939 self, self.pg0, AF_INET, sha1_key=key,
940 bfd_key_id=vpp_session.bfd_key_id)
941 rogue_test_session = BFDTestSession(
942 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
943 self.execute_rogue_session_scenario(vpp_session,
944 legitimate_test_session,
947 def test_mismatched_auth_type(self):
948 """ session is not brought down by msg with wrong auth type """
949 key = self.factory.create_random_key(self)
951 vpp_session = VppBFDUDPSession(
952 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
953 legitimate_test_session = BFDTestSession(
954 self, self.pg0, AF_INET, sha1_key=key,
955 bfd_key_id=vpp_session.bfd_key_id)
956 rogue_test_session = BFDTestSession(
957 self, self.pg0, AF_INET, sha1_key=key,
958 bfd_key_id=vpp_session.bfd_key_id)
959 self.execute_rogue_session_scenario(
960 vpp_session, legitimate_test_session, rogue_test_session,
961 {'auth_type': BFDAuthType.keyed_md5})
963 def test_restart(self):
964 """ simulate remote peer restart and resynchronization """
965 key = self.factory.create_random_key(
966 self, BFDAuthType.meticulous_keyed_sha1)
968 self.vpp_session = VppBFDUDPSession(self, self.pg0,
969 self.pg0.remote_ip4, sha1_key=key)
970 self.vpp_session.add_vpp_config()
971 self.vpp_session.admin_up()
972 self.test_session = BFDTestSession(
973 self, self.pg0, AF_INET, sha1_key=key,
974 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
975 self.bfd_session_up()
976 # now we need to not respond for 2*detection_time (4 packets)
977 self.wait_for_bfd_packet()
978 self.assert_equal(len(self.vapi.collect_events()), 0,
979 "number of bfd events")
980 self.wait_for_bfd_packet()
981 self.assert_equal(len(self.vapi.collect_events()), 0,
982 "number of bfd events")
983 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
984 self.verify_event(e, expected_state=BFDState.down)
985 self.test_session.update(state=BFDState.down)
986 self.wait_for_bfd_packet()
987 self.assert_equal(len(self.vapi.collect_events()), 0,
988 "number of bfd events")
989 self.wait_for_bfd_packet()
990 self.assert_equal(len(self.vapi.collect_events()), 0,
991 "number of bfd events")
992 # reset sequence number
993 self.test_session.our_seq_number = 0
994 self.bfd_session_up()
997 class BFDAuthOnOffTestCase(VppTestCase, BFDCommonCode):
998 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1001 def setUpClass(cls):
1002 super(BFDAuthOnOffTestCase, cls).setUpClass()
1004 cls.create_pg_interfaces([0])
1005 cls.pg0.config_ip4()
1007 cls.pg0.resolve_arp()
1010 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1014 super(BFDAuthOnOffTestCase, self).setUp()
1015 self.factory = AuthKeyFactory()
1016 self.vapi.want_bfd_events()
1019 BFDCommonCode.tearDown(self)
1020 VppTestCase.tearDown(self)
1022 def test_auth_on_immediate(self):
1023 """ turn auth on without disturbing session state (immediate) """
1024 key = self.factory.create_random_key(self)
1025 key.add_vpp_config()
1026 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1027 self.pg0.remote_ip4)
1028 self.vpp_session.add_vpp_config()
1029 self.vpp_session.admin_up()
1030 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1031 self.bfd_session_up()
1033 self.wait_for_bfd_packet()
1034 self.test_session.send_packet()
1035 self.vpp_session.activate_auth(key)
1036 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1037 self.test_session.sha1_key = key
1039 self.wait_for_bfd_packet()
1040 self.test_session.send_packet()
1041 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1042 self.assert_equal(len(self.vapi.collect_events()), 0,
1043 "number of bfd events")
1045 def test_auth_off_immediate(self):
1046 """ turn auth off without disturbing session state (immediate) """
1047 key = self.factory.create_random_key(self)
1048 key.add_vpp_config()
1049 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1050 self.pg0.remote_ip4, sha1_key=key)
1051 self.vpp_session.add_vpp_config()
1052 self.vpp_session.admin_up()
1053 self.test_session = BFDTestSession(
1054 self, self.pg0, AF_INET, sha1_key=key,
1055 bfd_key_id=self.vpp_session.bfd_key_id)
1056 self.bfd_session_up()
1058 self.wait_for_bfd_packet()
1059 self.test_session.send_packet()
1060 self.vpp_session.deactivate_auth()
1061 self.test_session.bfd_key_id = None
1062 self.test_session.sha1_key = None
1064 self.wait_for_bfd_packet()
1065 self.test_session.send_packet()
1066 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1067 self.assert_equal(len(self.vapi.collect_events()), 0,
1068 "number of bfd events")
1070 def test_auth_change_key_immediate(self):
1071 """ change auth key without disturbing session state (immediate) """
1072 key1 = self.factory.create_random_key(self)
1073 key1.add_vpp_config()
1074 key2 = self.factory.create_random_key(self)
1075 key2.add_vpp_config()
1076 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1077 self.pg0.remote_ip4, sha1_key=key1)
1078 self.vpp_session.add_vpp_config()
1079 self.vpp_session.admin_up()
1080 self.test_session = BFDTestSession(
1081 self, self.pg0, AF_INET, sha1_key=key1,
1082 bfd_key_id=self.vpp_session.bfd_key_id)
1083 self.bfd_session_up()
1085 self.wait_for_bfd_packet()
1086 self.test_session.send_packet()
1087 self.vpp_session.activate_auth(key2)
1088 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1089 self.test_session.sha1_key = key2
1091 self.wait_for_bfd_packet()
1092 self.test_session.send_packet()
1093 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1094 self.assert_equal(len(self.vapi.collect_events()), 0,
1095 "number of bfd events")
1097 def test_auth_on_delayed(self):
1098 """ turn auth on without disturbing session state (delayed) """
1099 key = self.factory.create_random_key(self)
1100 key.add_vpp_config()
1101 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1102 self.pg0.remote_ip4)
1103 self.vpp_session.add_vpp_config()
1104 self.vpp_session.admin_up()
1105 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1106 self.bfd_session_up()
1108 self.wait_for_bfd_packet()
1109 self.test_session.send_packet()
1110 self.vpp_session.activate_auth(key, delayed=True)
1112 self.wait_for_bfd_packet()
1113 self.test_session.send_packet()
1114 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1115 self.test_session.sha1_key = key
1116 self.test_session.send_packet()
1118 self.wait_for_bfd_packet()
1119 self.test_session.send_packet()
1120 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1121 self.assert_equal(len(self.vapi.collect_events()), 0,
1122 "number of bfd events")
1124 def test_auth_off_delayed(self):
1125 """ turn auth off without disturbing session state (delayed) """
1126 key = self.factory.create_random_key(self)
1127 key.add_vpp_config()
1128 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1129 self.pg0.remote_ip4, sha1_key=key)
1130 self.vpp_session.add_vpp_config()
1131 self.vpp_session.admin_up()
1132 self.test_session = BFDTestSession(
1133 self, self.pg0, AF_INET, sha1_key=key,
1134 bfd_key_id=self.vpp_session.bfd_key_id)
1135 self.bfd_session_up()
1137 self.wait_for_bfd_packet()
1138 self.test_session.send_packet()
1139 self.vpp_session.deactivate_auth(delayed=True)
1141 self.wait_for_bfd_packet()
1142 self.test_session.send_packet()
1143 self.test_session.bfd_key_id = None
1144 self.test_session.sha1_key = None
1145 self.test_session.send_packet()
1147 self.wait_for_bfd_packet()
1148 self.test_session.send_packet()
1149 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1150 self.assert_equal(len(self.vapi.collect_events()), 0,
1151 "number of bfd events")
1153 def test_auth_change_key_delayed(self):
1154 """ change auth key without disturbing session state (delayed) """
1155 key1 = self.factory.create_random_key(self)
1156 key1.add_vpp_config()
1157 key2 = self.factory.create_random_key(self)
1158 key2.add_vpp_config()
1159 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1160 self.pg0.remote_ip4, sha1_key=key1)
1161 self.vpp_session.add_vpp_config()
1162 self.vpp_session.admin_up()
1163 self.test_session = BFDTestSession(
1164 self, self.pg0, AF_INET, sha1_key=key1,
1165 bfd_key_id=self.vpp_session.bfd_key_id)
1166 self.bfd_session_up()
1168 self.wait_for_bfd_packet()
1169 self.test_session.send_packet()
1170 self.vpp_session.activate_auth(key2, delayed=True)
1172 self.wait_for_bfd_packet()
1173 self.test_session.send_packet()
1174 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1175 self.test_session.sha1_key = key2
1176 self.test_session.send_packet()
1178 self.wait_for_bfd_packet()
1179 self.test_session.send_packet()
1180 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1181 self.assert_equal(len(self.vapi.collect_events()), 0,
1182 "number of bfd events")
1184 if __name__ == '__main__':
1185 unittest.main(testRunner=VppTestRunner)