3 from __future__ import division
8 from random import randint
10 from framework import *
16 class AuthKeyFactory(object):
17 """Factory class for creating auth keys with unique conf key ID"""
20 self._conf_key_ids = {}
22 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
23 conf_key_id = randint(0, 0xFFFFFFFF)
24 while conf_key_id in self._conf_key_ids:
25 conf_key_id = randint(0, 0xFFFFFFFF)
26 self._conf_key_ids[conf_key_id] = 1
27 key = str(bytearray([randint(0, 255) for j in range(randint(1, 20))]))
28 return VppBFDAuthKey(test=test, auth_type=auth_type,
29 conf_key_id=conf_key_id, key=key)
32 class BFDAPITestCase(VppTestCase):
33 """Bidirectional Forwarding Detection (BFD) - API"""
37 super(BFDAPITestCase, cls).setUpClass()
40 cls.create_pg_interfaces(range(2))
41 for i in cls.pg_interfaces:
47 super(BFDAPITestCase, cls).tearDownClass()
51 super(BFDAPITestCase, self).setUp()
52 self.factory = AuthKeyFactory()
54 def test_add_bfd(self):
55 """ create a BFD session """
56 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
57 session.add_vpp_config()
58 self.logger.debug("Session state is %s" % str(session.state))
59 session.remove_vpp_config()
60 session.add_vpp_config()
61 self.logger.debug("Session state is %s" % str(session.state))
62 session.remove_vpp_config()
64 def test_double_add(self):
65 """ create the same BFD session twice (negative case) """
66 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
67 session.add_vpp_config()
69 with self.vapi.expect_negative_api_retval():
70 session.add_vpp_config()
72 session.remove_vpp_config()
74 def test_add_bfd6(self):
75 """ create IPv6 BFD session """
76 session = VppBFDUDPSession(
77 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
78 session.add_vpp_config()
79 self.logger.debug("Session state is %s" % str(session.state))
80 session.remove_vpp_config()
81 session.add_vpp_config()
82 self.logger.debug("Session state is %s" % str(session.state))
83 session.remove_vpp_config()
85 def test_mod_bfd(self):
86 """ modify BFD session parameters """
87 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
89 required_min_rx=10000,
91 session.add_vpp_config()
92 e = session.get_bfd_udp_session_dump_entry()
93 self.assert_equal(session.desired_min_tx,
95 "desired min transmit interval")
96 self.assert_equal(session.required_min_rx,
98 "required min receive interval")
99 self.assert_equal(session.detect_mult, e.detect_mult, "detect mult")
100 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
101 required_min_rx=session.required_min_rx * 2,
102 detect_mult=session.detect_mult * 2)
103 e = session.get_bfd_udp_session_dump_entry()
104 self.assert_equal(session.desired_min_tx,
106 "desired min transmit interval")
107 self.assert_equal(session.required_min_rx,
109 "required min receive interval")
110 self.assert_equal(session.detect_mult, e.detect_mult, "detect mult")
112 def test_add_sha1_keys(self):
113 """ add SHA1 keys """
115 keys = [self.factory.create_random_key(
116 self) for i in range(0, key_count)]
118 self.assertFalse(key.query_vpp_config())
122 self.assertTrue(key.query_vpp_config())
124 indexes = range(key_count)
125 random.shuffle(indexes)
129 key.remove_vpp_config()
131 for j in range(key_count):
134 self.assertFalse(key.query_vpp_config())
136 self.assertTrue(key.query_vpp_config())
137 # should be removed now
139 self.assertFalse(key.query_vpp_config())
140 # add back and remove again
144 self.assertTrue(key.query_vpp_config())
146 key.remove_vpp_config()
148 self.assertFalse(key.query_vpp_config())
150 def test_add_bfd_sha1(self):
151 """ create a BFD session (SHA1) """
152 key = self.factory.create_random_key(self)
154 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
156 session.add_vpp_config()
157 self.logger.debug("Session state is %s" % str(session.state))
158 session.remove_vpp_config()
159 session.add_vpp_config()
160 self.logger.debug("Session state is %s" % str(session.state))
161 session.remove_vpp_config()
163 def test_double_add_sha1(self):
164 """ create the same BFD session twice (negative case) (SHA1) """
165 key = self.factory.create_random_key(self)
167 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
169 session.add_vpp_config()
170 with self.assertRaises(Exception):
171 session.add_vpp_config()
173 def test_add_authenticated_with_nonexistent_key(self):
174 """ create BFD session using non-existent SHA1 (negative case) """
175 session = VppBFDUDPSession(
176 self, self.pg0, self.pg0.remote_ip4,
177 sha1_key=self.factory.create_random_key(self))
178 with self.assertRaises(Exception):
179 session.add_vpp_config()
181 def test_shared_sha1_key(self):
182 """ share single SHA1 key between multiple BFD sessions """
183 key = self.factory.create_random_key(self)
186 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
188 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
189 sha1_key=key, af=AF_INET6),
190 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
192 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
193 sha1_key=key, af=AF_INET6)]
198 e = key.get_bfd_auth_keys_dump_entry()
199 self.assert_equal(e.use_count, len(sessions) - removed,
200 "Use count for shared key")
201 s.remove_vpp_config()
203 e = key.get_bfd_auth_keys_dump_entry()
204 self.assert_equal(e.use_count, len(sessions) - removed,
205 "Use count for shared key")
207 def test_activate_auth(self):
208 """ activate SHA1 authentication """
209 key = self.factory.create_random_key(self)
211 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
212 session.add_vpp_config()
213 session.activate_auth(key)
215 def test_deactivate_auth(self):
216 """ deactivate SHA1 authentication """
217 key = self.factory.create_random_key(self)
219 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
220 session.add_vpp_config()
221 session.activate_auth(key)
222 session.deactivate_auth()
224 def test_change_key(self):
225 """ change SHA1 key """
226 key1 = self.factory.create_random_key(self)
227 key2 = self.factory.create_random_key(self)
228 while key2.conf_key_id == key1.conf_key_id:
229 key2 = self.factory.create_random_key(self)
230 key1.add_vpp_config()
231 key2.add_vpp_config()
232 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
234 session.add_vpp_config()
235 session.activate_auth(key2)
238 class BFDTestSession(object):
239 """ BFD session as seen from test framework side """
241 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
242 bfd_key_id=None, our_seq_number=0xFFFFFFFF - 4):
245 self.sha1_key = sha1_key
246 self.bfd_key_id = bfd_key_id
247 self.interface = interface
248 self.udp_sport = 50000
249 self.our_seq_number = our_seq_number
250 self.vpp_seq_number = None
252 'my_discriminator': 0,
253 'desired_min_tx_interval': 100000,
254 'detect_mult': detect_mult,
255 'diag': BFDDiagCode.no_diagnostic,
258 def inc_seq_num(self):
259 if self.our_seq_number == 0xFFFFFFFF:
260 self.our_seq_number = 0
262 self.our_seq_number += 1
264 def update(self, **kwargs):
265 self.bfd_values.update(kwargs)
267 def create_packet(self):
270 bfd.auth_type = self.sha1_key.auth_type
271 bfd.auth_len = BFD.sha1_auth_len
272 bfd.auth_key_id = self.bfd_key_id
273 bfd.auth_seq_num = self.our_seq_number
274 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
277 if self.af == AF_INET6:
278 packet = (Ether(src=self.interface.remote_mac,
279 dst=self.interface.local_mac) /
280 IPv6(src=self.interface.remote_ip6,
281 dst=self.interface.local_ip6,
283 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
286 packet = (Ether(src=self.interface.remote_mac,
287 dst=self.interface.local_mac) /
288 IP(src=self.interface.remote_ip4,
289 dst=self.interface.local_ip4,
291 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
293 self.test.logger.debug("BFD: Creating packet")
294 for name, value in self.bfd_values.iteritems():
295 self.test.logger.debug("BFD: setting packet.%s=%s", name, value)
296 packet[BFD].setfieldval(name, value)
298 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
299 "\0" * (20 - len(self.sha1_key.key))
300 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
301 hashlib.sha1(hash_material).hexdigest())
302 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
305 def send_packet(self, packet=None):
307 packet = self.create_packet()
308 self.test.logger.debug(ppp("Sending packet:", packet))
309 self.test.pg0.add_stream([packet])
312 def verify_sha1_auth(self, packet):
313 """ Verify correctness of authentication in BFD layer. """
315 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
316 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
318 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
319 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
320 if self.vpp_seq_number is None:
321 self.vpp_seq_number = bfd.auth_seq_num
322 self.test.logger.debug("Received initial sequence number: %s" %
325 recvd_seq_num = bfd.auth_seq_num
326 self.test.logger.debug("Received followup sequence number: %s" %
328 if self.vpp_seq_number < 0xffffffff:
329 if self.sha1_key.auth_type == \
330 BFDAuthType.meticulous_keyed_sha1:
331 self.test.assert_equal(recvd_seq_num,
332 self.vpp_seq_number + 1,
333 "BFD sequence number")
335 self.test.assert_in_range(recvd_seq_num,
337 self.vpp_seq_number + 1,
338 "BFD sequence number")
340 if self.sha1_key.auth_type == \
341 BFDAuthType.meticulous_keyed_sha1:
342 self.test.assert_equal(recvd_seq_num, 0,
343 "BFD sequence number")
345 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
346 "BFD sequence number not one of "
347 "(%s, 0)" % self.vpp_seq_number)
348 self.vpp_seq_number = recvd_seq_num
349 # last 20 bytes represent the hash - so replace them with the key,
350 # pad the result with zeros and hash the result
351 hash_material = bfd.original[:-20] + self.sha1_key.key + \
352 "\0" * (20 - len(self.sha1_key.key))
353 expected_hash = hashlib.sha1(hash_material).hexdigest()
354 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
355 expected_hash, "Auth key hash")
357 def verify_bfd(self, packet):
358 """ Verify correctness of BFD layer. """
360 self.test.assert_equal(bfd.version, 1, "BFD version")
361 self.test.assert_equal(bfd.your_discriminator,
362 self.bfd_values['my_discriminator'],
363 "BFD - your discriminator")
365 self.verify_sha1_auth(packet)
369 """Common code used by both IPv4 and IPv6 Test Cases"""
372 self.vapi.collect_events() # clear the event queue
373 if not self.vpp_dead:
374 self.vapi.want_bfd_events(enable_disable=0)
376 def bfd_session_up(self):
377 """ Bring BFD session up """
378 self.logger.info("BFD: Waiting for slow hello")
379 p, timeout = self.wait_for_bfd_packet(2)
380 self.logger.info("BFD: Sending Init")
381 self.test_session.update(my_discriminator=randint(0, 40000000),
382 your_discriminator=p[BFD].my_discriminator,
384 required_min_rx_interval=100000)
385 self.test_session.send_packet()
386 self.logger.info("BFD: Waiting for event")
387 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
388 self.verify_event(e, expected_state=BFDState.up)
389 self.logger.info("BFD: Session is Up")
390 self.test_session.update(state=BFDState.up)
391 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
393 def bfd_session_down(self):
394 """ Bring BFD session down """
395 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
396 self.test_session.update(state=BFDState.down)
397 self.test_session.send_packet()
398 self.logger.info("BFD: Waiting for event")
399 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
400 self.verify_event(e, expected_state=BFDState.down)
401 self.logger.info("BFD: Session is Down")
402 self.assert_equal(self.vpp_session.state, BFDState.down, BFDState)
404 def verify_ip(self, packet):
405 """ Verify correctness of IP layer. """
406 if self.vpp_session.af == AF_INET6:
408 local_ip = self.pg0.local_ip6
409 remote_ip = self.pg0.remote_ip6
410 self.assert_equal(ip.hlim, 255, "IPv6 hop limit")
413 local_ip = self.pg0.local_ip4
414 remote_ip = self.pg0.remote_ip4
415 self.assert_equal(ip.ttl, 255, "IPv4 TTL")
416 self.assert_equal(ip.src, local_ip, "IP source address")
417 self.assert_equal(ip.dst, remote_ip, "IP destination address")
419 def verify_udp(self, packet):
420 """ Verify correctness of UDP layer. """
422 self.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
423 self.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
426 def verify_event(self, event, expected_state):
427 """ Verify correctness of event values. """
429 self.logger.debug("BFD: Event: %s" % repr(e))
430 self.assert_equal(e.sw_if_index,
431 self.vpp_session.interface.sw_if_index,
432 "BFD interface index")
434 if self.vpp_session.af == AF_INET6:
436 self.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
437 if self.vpp_session.af == AF_INET:
438 self.assert_equal(e.local_addr[:4], self.vpp_session.local_addr_n,
439 "Local IPv4 address")
440 self.assert_equal(e.peer_addr[:4], self.vpp_session.peer_addr_n,
443 self.assert_equal(e.local_addr, self.vpp_session.local_addr_n,
444 "Local IPv6 address")
445 self.assert_equal(e.peer_addr, self.vpp_session.peer_addr_n,
447 self.assert_equal(e.state, expected_state, BFDState)
449 def wait_for_bfd_packet(self, timeout=1):
450 """ wait for BFD packet
452 :param timeout: how long to wait max
454 :returns: tuple (packet, time spent waiting for packet)
456 self.logger.info("BFD: Waiting for BFD packet")
458 p = self.pg0.wait_for_packet(timeout=timeout)
460 self.logger.debug(ppp("BFD: Got packet:", p))
463 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
465 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
468 self.test_session.verify_bfd(p)
469 return p, after - before
472 class BFD4TestCase(VppTestCase, BFDCommonCode):
473 """Bidirectional Forwarding Detection (BFD)"""
477 super(BFD4TestCase, cls).setUpClass()
479 cls.create_pg_interfaces([0])
481 cls.pg0.configure_ipv4_neighbors()
483 cls.pg0.resolve_arp()
486 super(BFD4TestCase, cls).tearDownClass()
490 super(BFD4TestCase, self).setUp()
491 self.factory = AuthKeyFactory()
492 self.vapi.want_bfd_events()
493 self.pg_enable_capture([self.pg0])
495 self.vpp_session = VppBFDUDPSession(self, self.pg0,
497 self.vpp_session.add_vpp_config()
498 self.vpp_session.admin_up()
499 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
501 self.vapi.want_bfd_events(enable_disable=0)
505 BFDCommonCode.tearDown(self)
506 VppTestCase.tearDown(self)
508 def test_session_up(self):
509 """ bring BFD session up """
510 self.bfd_session_up()
512 def test_session_down(self):
513 """ bring BFD session down """
514 self.bfd_session_up()
515 self.bfd_session_down()
517 def test_hold_up(self):
518 """ hold BFD session up """
519 self.bfd_session_up()
521 self.wait_for_bfd_packet()
522 self.test_session.send_packet()
523 self.assert_equal(len(self.vapi.collect_events()), 0,
524 "number of bfd events")
526 def test_slow_timer(self):
527 """ verify slow periodic control frames while session down """
529 self.logger.info("BFD: Waiting for %d BFD packets" % expected_packets)
530 self.wait_for_bfd_packet(2)
531 for i in range(expected_packets):
533 self.wait_for_bfd_packet(2)
535 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
536 # to work around timing issues
537 self.assert_in_range(
538 after - before, 0.70, 1.05, "time between slow packets")
541 def test_zero_remote_min_rx(self):
542 """ no packets when zero BFD RemoteMinRxInterval """
543 self.pg_enable_capture([self.pg0])
544 p, timeout = self.wait_for_bfd_packet(2)
545 self.test_session.update(my_discriminator=randint(0, 40000000),
546 your_discriminator=p[BFD].my_discriminator,
548 required_min_rx_interval=0)
549 self.test_session.send_packet()
550 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
551 self.verify_event(e, expected_state=BFDState.up)
553 cap = 2 * self.vpp_session.desired_min_tx *\
554 self.vpp_session.detect_mult
557 # busy wait here, trying to collect a packet or event, vpp is not
558 # allowed to send packets and the session will timeout first - so the
559 # Up->Down event must arrive before any packets do
560 while time.time() < now + cap / us_in_sec:
562 p, ttp = self.wait_for_bfd_packet(timeout=0)
563 self.logger.error(ppp("Received unexpected packet:", p))
567 events = self.vapi.collect_events()
569 self.verify_event(events[0], BFDState.down)
571 self.assert_equal(count, 0, "number of packets received")
573 def test_conn_down(self):
574 """ verify session goes down after inactivity """
575 self.bfd_session_up()
576 self.wait_for_bfd_packet()
577 self.assert_equal(len(self.vapi.collect_events()), 0,
578 "number of bfd events")
579 self.wait_for_bfd_packet()
580 self.assert_equal(len(self.vapi.collect_events()), 0,
581 "number of bfd events")
582 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
583 self.verify_event(e, expected_state=BFDState.down)
585 def test_large_required_min_rx(self):
586 """ large remote RequiredMinRxInterval """
587 self.bfd_session_up()
588 self.wait_for_bfd_packet()
590 self.test_session.update(required_min_rx_interval=interval)
591 self.test_session.send_packet()
594 # busy wait here, trying to collect a packet or event, vpp is not
595 # allowed to send packets and the session will timeout first - so the
596 # Up->Down event must arrive before any packets do
597 while time.time() < now + interval / us_in_sec:
599 p, ttp = self.wait_for_bfd_packet(timeout=0)
600 self.logger.error(ppp("Received unexpected packet:", p))
604 events = self.vapi.collect_events()
606 self.verify_event(events[0], BFDState.down)
608 self.assert_equal(count, 0, "number of packets received")
610 def test_immediate_remote_min_rx_reduce(self):
611 """ immediately honor remote min rx reduction """
612 self.vpp_session.remove_vpp_config()
613 self.vpp_session = VppBFDUDPSession(
614 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
615 self.pg_enable_capture([self.pg0])
616 self.vpp_session.add_vpp_config()
617 self.test_session.update(desired_min_tx_interval=1000000,
618 required_min_rx_interval=1000000)
619 self.bfd_session_up()
620 self.wait_for_bfd_packet()
622 self.test_session.update(required_min_rx_interval=interval)
623 self.test_session.send_packet()
624 p, ttp = self.wait_for_bfd_packet()
625 # allow extra 10% to work around timing issues, first packet is special
626 self.assert_in_range(ttp, 0, 1.10 * interval / us_in_sec,
627 "time between BFD packets")
628 p, ttp = self.wait_for_bfd_packet()
629 self.assert_in_range(ttp, .9 * .75 * interval / us_in_sec,
630 1.10 * interval / us_in_sec,
631 "time between BFD packets")
632 p, ttp = self.wait_for_bfd_packet()
633 self.assert_in_range(ttp, .9 * .75 * interval / us_in_sec,
634 1.10 * interval / us_in_sec,
635 "time between BFD packets")
637 def test_modify_req_min_rx_double(self):
638 """ modify session - double required min rx """
639 self.bfd_session_up()
640 self.wait_for_bfd_packet()
641 self.test_session.update(desired_min_tx_interval=10000,
642 required_min_rx_interval=10000)
643 self.test_session.send_packet()
644 # first double required min rx
645 self.vpp_session.modify_parameters(
646 required_min_rx=2 * self.vpp_session.required_min_rx)
647 p, ttp = self.wait_for_bfd_packet()
648 # poll bit needs to be set
649 self.assertIn("P", p.sprintf("%BFD.flags%"),
650 "Poll bit not set in BFD packet")
651 # finish poll sequence with final packet
652 final = self.test_session.create_packet()
653 final[BFD].flags = "F"
654 self.test_session.send_packet(final)
655 # now we can wait 0.9*3*req-min-rx and the session should still be up
656 self.sleep(0.9 * self.vpp_session.detect_mult *
657 self.vpp_session.required_min_rx / us_in_sec)
658 self.assert_equal(len(self.vapi.collect_events()), 0,
659 "number of bfd events")
661 def test_modify_req_min_rx_halve(self):
662 """ modify session - halve required min rx """
663 self.vpp_session.modify_parameters(
664 required_min_rx=2 * self.vpp_session.required_min_rx)
665 self.bfd_session_up()
666 self.wait_for_bfd_packet()
667 self.test_session.update(desired_min_tx_interval=10000,
668 required_min_rx_interval=10000)
669 self.test_session.send_packet()
670 p, ttp = self.wait_for_bfd_packet()
671 # halve required min rx
672 old_required_min_rx = self.vpp_session.required_min_rx
673 self.vpp_session.modify_parameters(
674 required_min_rx=0.5 * self.vpp_session.required_min_rx)
675 # now we wait 0.8*3*old-req-min-rx and the session should still be up
676 self.sleep(0.8 * self.vpp_session.detect_mult *
677 old_required_min_rx / us_in_sec)
678 self.assert_equal(len(self.vapi.collect_events()), 0,
679 "number of bfd events")
680 p, ttp = self.wait_for_bfd_packet()
681 # poll bit needs to be set
682 self.assertIn("P", p.sprintf("%BFD.flags%"),
683 "Poll bit not set in BFD packet")
684 # finish poll sequence with final packet
685 final = self.test_session.create_packet()
686 final[BFD].flags = "F"
687 self.test_session.send_packet(final)
688 # now the session should time out under new conditions
690 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
692 detection_time = self.vpp_session.detect_mult *\
693 self.vpp_session.required_min_rx / us_in_sec
694 self.assert_in_range(after - before,
695 0.9 * detection_time,
696 1.1 * detection_time,
697 "time before bfd session goes down")
698 self.verify_event(e, expected_state=BFDState.down)
700 def test_modify_des_min_tx(self):
701 """ modify desired min tx interval """
704 def test_modify_detect_mult(self):
705 """ modify detect multiplier """
706 self.bfd_session_up()
707 self.vpp_session.modify_parameters(detect_mult=1)
708 p, ttp = self.wait_for_bfd_packet()
709 self.assert_equal(self.vpp_session.detect_mult,
712 # poll bit must not be set
713 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
714 "Poll bit not set in BFD packet")
715 self.vpp_session.modify_parameters(detect_mult=10)
716 p, ttp = self.wait_for_bfd_packet()
717 self.assert_equal(self.vpp_session.detect_mult,
720 # poll bit must not be set
721 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
722 "Poll bit not set in BFD packet")
725 class BFD6TestCase(VppTestCase, BFDCommonCode):
726 """Bidirectional Forwarding Detection (BFD) (IPv6) """
730 super(BFD6TestCase, cls).setUpClass()
732 cls.create_pg_interfaces([0])
734 cls.pg0.configure_ipv6_neighbors()
736 cls.pg0.resolve_ndp()
739 super(BFD6TestCase, cls).tearDownClass()
743 super(BFD6TestCase, self).setUp()
744 self.factory = AuthKeyFactory()
745 self.vapi.want_bfd_events()
746 self.pg_enable_capture([self.pg0])
748 self.vpp_session = VppBFDUDPSession(self, self.pg0,
751 self.vpp_session.add_vpp_config()
752 self.vpp_session.admin_up()
753 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
754 self.logger.debug(self.vapi.cli("show adj nbr"))
756 self.vapi.want_bfd_events(enable_disable=0)
760 BFDCommonCode.tearDown(self)
761 VppTestCase.tearDown(self)
763 def test_session_up(self):
764 """ bring BFD session up """
765 self.bfd_session_up()
767 def test_hold_up(self):
768 """ hold BFD session up """
769 self.bfd_session_up()
771 self.wait_for_bfd_packet()
772 self.test_session.send_packet()
773 self.assert_equal(len(self.vapi.collect_events()), 0,
774 "number of bfd events")
775 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
778 class BFDSHA1TestCase(VppTestCase, BFDCommonCode):
779 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
783 super(BFDSHA1TestCase, cls).setUpClass()
785 cls.create_pg_interfaces([0])
788 cls.pg0.resolve_arp()
791 super(BFDSHA1TestCase, cls).tearDownClass()
795 super(BFDSHA1TestCase, self).setUp()
796 self.factory = AuthKeyFactory()
797 self.vapi.want_bfd_events()
798 self.pg_enable_capture([self.pg0])
801 BFDCommonCode.tearDown(self)
802 VppTestCase.tearDown(self)
804 def test_session_up(self):
805 """ bring BFD session up """
806 key = self.factory.create_random_key(self)
808 self.vpp_session = VppBFDUDPSession(self, self.pg0,
811 self.vpp_session.add_vpp_config()
812 self.vpp_session.admin_up()
813 self.test_session = BFDTestSession(
814 self, self.pg0, AF_INET, sha1_key=key,
815 bfd_key_id=self.vpp_session.bfd_key_id)
816 self.bfd_session_up()
818 def test_hold_up(self):
819 """ hold BFD session up """
820 key = self.factory.create_random_key(self)
822 self.vpp_session = VppBFDUDPSession(self, self.pg0,
825 self.vpp_session.add_vpp_config()
826 self.vpp_session.admin_up()
827 self.test_session = BFDTestSession(
828 self, self.pg0, AF_INET, sha1_key=key,
829 bfd_key_id=self.vpp_session.bfd_key_id)
830 self.bfd_session_up()
832 self.wait_for_bfd_packet()
833 self.test_session.send_packet()
834 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
836 def test_hold_up_meticulous(self):
837 """ hold BFD session up - meticulous auth """
838 key = self.factory.create_random_key(
839 self, BFDAuthType.meticulous_keyed_sha1)
841 self.vpp_session = VppBFDUDPSession(self, self.pg0,
842 self.pg0.remote_ip4, sha1_key=key)
843 self.vpp_session.add_vpp_config()
844 self.vpp_session.admin_up()
845 self.test_session = BFDTestSession(
846 self, self.pg0, AF_INET, sha1_key=key,
847 bfd_key_id=self.vpp_session.bfd_key_id)
848 self.bfd_session_up()
850 self.wait_for_bfd_packet()
851 self.test_session.inc_seq_num()
852 self.test_session.send_packet()
853 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
855 def test_send_bad_seq_number(self):
856 """ session is not kept alive by msgs with bad seq numbers"""
857 key = self.factory.create_random_key(
858 self, BFDAuthType.meticulous_keyed_sha1)
860 self.vpp_session = VppBFDUDPSession(self, self.pg0,
861 self.pg0.remote_ip4, sha1_key=key)
862 self.vpp_session.add_vpp_config()
863 self.vpp_session.admin_up()
864 self.test_session = BFDTestSession(
865 self, self.pg0, AF_INET, sha1_key=key,
866 bfd_key_id=self.vpp_session.bfd_key_id)
867 self.bfd_session_up()
868 self.wait_for_bfd_packet()
869 self.test_session.send_packet()
870 self.assert_equal(len(self.vapi.collect_events()), 0,
871 "number of bfd events")
872 self.wait_for_bfd_packet()
873 self.test_session.send_packet()
874 self.assert_equal(len(self.vapi.collect_events()), 0,
875 "number of bfd events")
876 self.wait_for_bfd_packet()
877 self.test_session.send_packet()
878 self.wait_for_bfd_packet()
879 self.test_session.send_packet()
880 e = self.vapi.collect_events()
881 # session should be down now, because the sequence numbers weren't
883 self.assert_equal(len(e), 1, "number of bfd events")
884 self.verify_event(e[0], expected_state=BFDState.down)
886 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
887 legitimate_test_session,
889 rogue_bfd_values=None):
890 """ execute a rogue session interaction scenario
892 1. create vpp session, add config
893 2. bring the legitimate session up
894 3. copy the bfd values from legitimate session to rogue session
895 4. apply rogue_bfd_values to rogue session
896 5. set rogue session state to down
897 6. send message to take the session down from the rogue session
898 7. assert that the legitimate session is unaffected
901 self.vpp_session = vpp_bfd_udp_session
902 self.vpp_session.add_vpp_config()
903 self.vpp_session.admin_up()
904 self.test_session = legitimate_test_session
905 # bring vpp session up
906 self.bfd_session_up()
907 # send packet from rogue session
908 rogue_test_session.bfd_values = self.test_session.bfd_values.copy()
910 rogue_test_session.update(**rogue_bfd_values)
911 rogue_test_session.update(state=BFDState.down)
912 rogue_test_session.send_packet()
913 self.wait_for_bfd_packet()
914 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
916 def test_mismatch_auth(self):
917 """ session is not brought down by unauthenticated msg """
918 key = self.factory.create_random_key(self)
920 vpp_session = VppBFDUDPSession(
921 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
922 legitimate_test_session = BFDTestSession(
923 self, self.pg0, AF_INET, sha1_key=key,
924 bfd_key_id=vpp_session.bfd_key_id)
925 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
926 self.execute_rogue_session_scenario(vpp_session,
927 legitimate_test_session,
930 def test_mismatch_bfd_key_id(self):
931 """ session is not brought down by msg with non-existent key-id """
932 key = self.factory.create_random_key(self)
934 vpp_session = VppBFDUDPSession(
935 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
936 # pick a different random bfd key id
938 while x == vpp_session.bfd_key_id:
940 legitimate_test_session = BFDTestSession(
941 self, self.pg0, AF_INET, sha1_key=key,
942 bfd_key_id=vpp_session.bfd_key_id)
943 rogue_test_session = BFDTestSession(
944 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
945 self.execute_rogue_session_scenario(vpp_session,
946 legitimate_test_session,
949 def test_mismatched_auth_type(self):
950 """ session is not brought down by msg with wrong auth type """
951 key = self.factory.create_random_key(self)
953 vpp_session = VppBFDUDPSession(
954 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
955 legitimate_test_session = BFDTestSession(
956 self, self.pg0, AF_INET, sha1_key=key,
957 bfd_key_id=vpp_session.bfd_key_id)
958 rogue_test_session = BFDTestSession(
959 self, self.pg0, AF_INET, sha1_key=key,
960 bfd_key_id=vpp_session.bfd_key_id)
961 self.execute_rogue_session_scenario(
962 vpp_session, legitimate_test_session, rogue_test_session,
963 {'auth_type': BFDAuthType.keyed_md5})
965 def test_restart(self):
966 """ simulate remote peer restart and resynchronization """
967 key = self.factory.create_random_key(
968 self, BFDAuthType.meticulous_keyed_sha1)
970 self.vpp_session = VppBFDUDPSession(self, self.pg0,
971 self.pg0.remote_ip4, sha1_key=key)
972 self.vpp_session.add_vpp_config()
973 self.vpp_session.admin_up()
974 self.test_session = BFDTestSession(
975 self, self.pg0, AF_INET, sha1_key=key,
976 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
977 self.bfd_session_up()
978 # now we need to not respond for 2*detection_time (4 packets)
979 self.wait_for_bfd_packet()
980 self.assert_equal(len(self.vapi.collect_events()), 0,
981 "number of bfd events")
982 self.wait_for_bfd_packet()
983 self.assert_equal(len(self.vapi.collect_events()), 0,
984 "number of bfd events")
985 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
986 self.verify_event(e, expected_state=BFDState.down)
987 self.test_session.update(state=BFDState.down)
988 self.wait_for_bfd_packet()
989 self.assert_equal(len(self.vapi.collect_events()), 0,
990 "number of bfd events")
991 self.wait_for_bfd_packet()
992 self.assert_equal(len(self.vapi.collect_events()), 0,
993 "number of bfd events")
994 # reset sequence number
995 self.test_session.our_seq_number = 0
996 self.bfd_session_up()
999 class BFDAuthOnOffTestCase(VppTestCase, BFDCommonCode):
1000 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1003 def setUpClass(cls):
1004 super(BFDAuthOnOffTestCase, cls).setUpClass()
1006 cls.create_pg_interfaces([0])
1007 cls.pg0.config_ip4()
1009 cls.pg0.resolve_arp()
1012 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1016 super(BFDAuthOnOffTestCase, self).setUp()
1017 self.factory = AuthKeyFactory()
1018 self.vapi.want_bfd_events()
1021 BFDCommonCode.tearDown(self)
1022 VppTestCase.tearDown(self)
1024 def test_auth_on_immediate(self):
1025 """ turn auth on without disturbing session state (immediate) """
1026 key = self.factory.create_random_key(self)
1027 key.add_vpp_config()
1028 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1029 self.pg0.remote_ip4)
1030 self.vpp_session.add_vpp_config()
1031 self.vpp_session.admin_up()
1032 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1033 self.bfd_session_up()
1035 self.wait_for_bfd_packet()
1036 self.test_session.send_packet()
1037 self.vpp_session.activate_auth(key)
1038 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1039 self.test_session.sha1_key = key
1041 self.wait_for_bfd_packet()
1042 self.test_session.send_packet()
1043 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1044 self.assert_equal(len(self.vapi.collect_events()), 0,
1045 "number of bfd events")
1047 def test_auth_off_immediate(self):
1048 """ turn auth off without disturbing session state (immediate) """
1049 key = self.factory.create_random_key(self)
1050 key.add_vpp_config()
1051 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1052 self.pg0.remote_ip4, sha1_key=key)
1053 self.vpp_session.add_vpp_config()
1054 self.vpp_session.admin_up()
1055 self.test_session = BFDTestSession(
1056 self, self.pg0, AF_INET, sha1_key=key,
1057 bfd_key_id=self.vpp_session.bfd_key_id)
1058 self.bfd_session_up()
1060 self.wait_for_bfd_packet()
1061 self.test_session.send_packet()
1062 self.vpp_session.deactivate_auth()
1063 self.test_session.bfd_key_id = None
1064 self.test_session.sha1_key = None
1066 self.wait_for_bfd_packet()
1067 self.test_session.send_packet()
1068 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1069 self.assert_equal(len(self.vapi.collect_events()), 0,
1070 "number of bfd events")
1072 def test_auth_change_key_immediate(self):
1073 """ change auth key without disturbing session state (immediate) """
1074 key1 = self.factory.create_random_key(self)
1075 key1.add_vpp_config()
1076 key2 = self.factory.create_random_key(self)
1077 key2.add_vpp_config()
1078 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1079 self.pg0.remote_ip4, sha1_key=key1)
1080 self.vpp_session.add_vpp_config()
1081 self.vpp_session.admin_up()
1082 self.test_session = BFDTestSession(
1083 self, self.pg0, AF_INET, sha1_key=key1,
1084 bfd_key_id=self.vpp_session.bfd_key_id)
1085 self.bfd_session_up()
1087 self.wait_for_bfd_packet()
1088 self.test_session.send_packet()
1089 self.vpp_session.activate_auth(key2)
1090 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1091 self.test_session.sha1_key = key2
1093 self.wait_for_bfd_packet()
1094 self.test_session.send_packet()
1095 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1096 self.assert_equal(len(self.vapi.collect_events()), 0,
1097 "number of bfd events")
1099 def test_auth_on_delayed(self):
1100 """ turn auth on without disturbing session state (delayed) """
1101 key = self.factory.create_random_key(self)
1102 key.add_vpp_config()
1103 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1104 self.pg0.remote_ip4)
1105 self.vpp_session.add_vpp_config()
1106 self.vpp_session.admin_up()
1107 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1108 self.bfd_session_up()
1110 self.wait_for_bfd_packet()
1111 self.test_session.send_packet()
1112 self.vpp_session.activate_auth(key, delayed=True)
1114 self.wait_for_bfd_packet()
1115 self.test_session.send_packet()
1116 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1117 self.test_session.sha1_key = key
1118 self.test_session.send_packet()
1120 self.wait_for_bfd_packet()
1121 self.test_session.send_packet()
1122 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1123 self.assert_equal(len(self.vapi.collect_events()), 0,
1124 "number of bfd events")
1126 def test_auth_off_delayed(self):
1127 """ turn auth off without disturbing session state (delayed) """
1128 key = self.factory.create_random_key(self)
1129 key.add_vpp_config()
1130 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1131 self.pg0.remote_ip4, sha1_key=key)
1132 self.vpp_session.add_vpp_config()
1133 self.vpp_session.admin_up()
1134 self.test_session = BFDTestSession(
1135 self, self.pg0, AF_INET, sha1_key=key,
1136 bfd_key_id=self.vpp_session.bfd_key_id)
1137 self.bfd_session_up()
1139 self.wait_for_bfd_packet()
1140 self.test_session.send_packet()
1141 self.vpp_session.deactivate_auth(delayed=True)
1143 self.wait_for_bfd_packet()
1144 self.test_session.send_packet()
1145 self.test_session.bfd_key_id = None
1146 self.test_session.sha1_key = None
1147 self.test_session.send_packet()
1149 self.wait_for_bfd_packet()
1150 self.test_session.send_packet()
1151 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1152 self.assert_equal(len(self.vapi.collect_events()), 0,
1153 "number of bfd events")
1155 def test_auth_change_key_delayed(self):
1156 """ change auth key without disturbing session state (delayed) """
1157 key1 = self.factory.create_random_key(self)
1158 key1.add_vpp_config()
1159 key2 = self.factory.create_random_key(self)
1160 key2.add_vpp_config()
1161 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1162 self.pg0.remote_ip4, sha1_key=key1)
1163 self.vpp_session.add_vpp_config()
1164 self.vpp_session.admin_up()
1165 self.test_session = BFDTestSession(
1166 self, self.pg0, AF_INET, sha1_key=key1,
1167 bfd_key_id=self.vpp_session.bfd_key_id)
1168 self.bfd_session_up()
1170 self.wait_for_bfd_packet()
1171 self.test_session.send_packet()
1172 self.vpp_session.activate_auth(key2, delayed=True)
1174 self.wait_for_bfd_packet()
1175 self.test_session.send_packet()
1176 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1177 self.test_session.sha1_key = key2
1178 self.test_session.send_packet()
1180 self.wait_for_bfd_packet()
1181 self.test_session.send_packet()
1182 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1183 self.assert_equal(len(self.vapi.collect_events()), 0,
1184 "number of bfd events")
1186 if __name__ == '__main__':
1187 unittest.main(testRunner=VppTestRunner)