7 from random import randint
9 from framework import *
15 class AuthKeyFactory(object):
16 """Factory class for creating auth keys with unique conf key ID"""
19 self._conf_key_ids = {}
21 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
22 conf_key_id = randint(0, 0xFFFFFFFF)
23 while conf_key_id in self._conf_key_ids:
24 conf_key_id = randint(0, 0xFFFFFFFF)
25 self._conf_key_ids[conf_key_id] = 1
26 key = str(bytearray([randint(0, 255) for j in range(randint(1, 20))]))
27 return VppBFDAuthKey(test=test, auth_type=auth_type,
28 conf_key_id=conf_key_id, key=key)
31 class BFDAPITestCase(VppTestCase):
32 """Bidirectional Forwarding Detection (BFD) - API"""
36 super(BFDAPITestCase, cls).setUpClass()
39 cls.create_pg_interfaces(range(2))
40 for i in cls.pg_interfaces:
46 super(BFDAPITestCase, cls).tearDownClass()
50 super(BFDAPITestCase, self).setUp()
51 self.factory = AuthKeyFactory()
53 def test_add_bfd(self):
54 """ create a BFD session """
55 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
56 session.add_vpp_config()
57 self.logger.debug("Session state is %s" % str(session.state))
58 session.remove_vpp_config()
59 session.add_vpp_config()
60 self.logger.debug("Session state is %s" % str(session.state))
61 session.remove_vpp_config()
63 def test_double_add(self):
64 """ create the same BFD session twice (negative case) """
65 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
66 session.add_vpp_config()
68 with self.vapi.expect_negative_api_retval():
69 session.add_vpp_config()
71 session.remove_vpp_config()
73 def test_add_bfd6(self):
74 """ create IPv6 BFD session """
75 session = VppBFDUDPSession(
76 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
77 session.add_vpp_config()
78 self.logger.debug("Session state is %s" % str(session.state))
79 session.remove_vpp_config()
80 session.add_vpp_config()
81 self.logger.debug("Session state is %s" % str(session.state))
82 session.remove_vpp_config()
84 def test_add_sha1_keys(self):
87 keys = [self.factory.create_random_key(
88 self) for i in range(0, key_count)]
90 self.assertFalse(key.query_vpp_config())
94 self.assertTrue(key.query_vpp_config())
96 indexes = range(key_count)
97 random.shuffle(indexes)
101 key.remove_vpp_config()
103 for j in range(key_count):
106 self.assertFalse(key.query_vpp_config())
108 self.assertTrue(key.query_vpp_config())
109 # should be removed now
111 self.assertFalse(key.query_vpp_config())
112 # add back and remove again
116 self.assertTrue(key.query_vpp_config())
118 key.remove_vpp_config()
120 self.assertFalse(key.query_vpp_config())
122 def test_add_bfd_sha1(self):
123 """ create a BFD session (SHA1) """
124 key = self.factory.create_random_key(self)
126 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
128 session.add_vpp_config()
129 self.logger.debug("Session state is %s" % str(session.state))
130 session.remove_vpp_config()
131 session.add_vpp_config()
132 self.logger.debug("Session state is %s" % str(session.state))
133 session.remove_vpp_config()
135 def test_double_add_sha1(self):
136 """ create the same BFD session twice (negative case) (SHA1) """
137 key = self.factory.create_random_key(self)
139 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
141 session.add_vpp_config()
142 with self.assertRaises(Exception):
143 session.add_vpp_config()
145 def test_add_authenticated_with_nonexistent_key(self):
146 """ create BFD session using non-existent SHA1 (negative case) """
147 session = VppBFDUDPSession(
148 self, self.pg0, self.pg0.remote_ip4,
149 sha1_key=self.factory.create_random_key(self))
150 with self.assertRaises(Exception):
151 session.add_vpp_config()
153 def test_shared_sha1_key(self):
154 """ share single SHA1 key between multiple BFD sessions """
155 key = self.factory.create_random_key(self)
158 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
160 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
161 sha1_key=key, af=AF_INET6),
162 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
164 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
165 sha1_key=key, af=AF_INET6)]
170 e = key.get_bfd_auth_keys_dump_entry()
171 self.assert_equal(e.use_count, len(sessions) - removed,
172 "Use count for shared key")
173 s.remove_vpp_config()
175 e = key.get_bfd_auth_keys_dump_entry()
176 self.assert_equal(e.use_count, len(sessions) - removed,
177 "Use count for shared key")
179 def test_activate_auth(self):
180 """ activate SHA1 authentication """
181 key = self.factory.create_random_key(self)
183 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
184 session.add_vpp_config()
185 session.activate_auth(key)
187 def test_deactivate_auth(self):
188 """ deactivate SHA1 authentication """
189 key = self.factory.create_random_key(self)
191 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
192 session.add_vpp_config()
193 session.activate_auth(key)
194 session.deactivate_auth()
196 def test_change_key(self):
197 key1 = self.factory.create_random_key(self)
198 key2 = self.factory.create_random_key(self)
199 while key2.conf_key_id == key1.conf_key_id:
200 key2 = self.factory.create_random_key(self)
201 key1.add_vpp_config()
202 key2.add_vpp_config()
203 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
205 session.add_vpp_config()
206 session.activate_auth(key2)
209 class BFDTestSession(object):
210 """ BFD session as seen from test framework side """
212 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
213 bfd_key_id=None, our_seq_number=0xFFFFFFFF - 4):
216 self.sha1_key = sha1_key
217 self.bfd_key_id = bfd_key_id
218 self.interface = interface
219 self.udp_sport = 50000
220 self.our_seq_number = our_seq_number
221 self.vpp_seq_number = None
223 'my_discriminator': 0,
224 'desired_min_tx_interval': 100000,
225 'detect_mult': detect_mult,
226 'diag': BFDDiagCode.no_diagnostic,
229 def inc_seq_num(self):
230 if self.our_seq_number == 0xFFFFFFFF:
231 self.our_seq_number = 0
233 self.our_seq_number += 1
235 def update(self, **kwargs):
236 self.bfd_values.update(kwargs)
238 def create_packet(self):
241 bfd.auth_type = self.sha1_key.auth_type
242 bfd.auth_len = BFD.sha1_auth_len
243 bfd.auth_key_id = self.bfd_key_id
244 bfd.auth_seq_num = self.our_seq_number
245 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
248 if self.af == AF_INET6:
249 packet = (Ether(src=self.interface.remote_mac,
250 dst=self.interface.local_mac) /
251 IPv6(src=self.interface.remote_ip6,
252 dst=self.interface.local_ip6,
254 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
257 packet = (Ether(src=self.interface.remote_mac,
258 dst=self.interface.local_mac) /
259 IP(src=self.interface.remote_ip4,
260 dst=self.interface.local_ip4,
262 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
264 self.test.logger.debug("BFD: Creating packet")
265 for name, value in self.bfd_values.iteritems():
266 self.test.logger.debug("BFD: setting packet.%s=%s", name, value)
267 packet[BFD].setfieldval(name, value)
269 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
270 "\0" * (20 - len(self.sha1_key.key))
271 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
272 hashlib.sha1(hash_material).hexdigest())
273 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
276 def send_packet(self):
277 p = self.create_packet()
278 self.test.logger.debug(ppp("Sending packet:", p))
279 self.test.pg0.add_stream([p])
282 def verify_sha1_auth(self, packet):
283 """ Verify correctness of authentication in BFD layer. """
285 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
286 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
288 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
289 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
290 if self.vpp_seq_number is None:
291 self.vpp_seq_number = bfd.auth_seq_num
292 self.test.logger.debug("Received initial sequence number: %s" %
295 recvd_seq_num = bfd.auth_seq_num
296 self.test.logger.debug("Received followup sequence number: %s" %
298 if self.vpp_seq_number < 0xffffffff:
299 if self.sha1_key.auth_type == \
300 BFDAuthType.meticulous_keyed_sha1:
301 self.test.assert_equal(recvd_seq_num,
302 self.vpp_seq_number + 1,
303 "BFD sequence number")
305 self.test.assert_in_range(recvd_seq_num,
307 self.vpp_seq_number + 1,
308 "BFD sequence number")
310 if self.sha1_key.auth_type == \
311 BFDAuthType.meticulous_keyed_sha1:
312 self.test.assert_equal(recvd_seq_num, 0,
313 "BFD sequence number")
315 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
316 "BFD sequence number not one of "
317 "(%s, 0)" % self.vpp_seq_number)
318 self.vpp_seq_number = recvd_seq_num
319 # last 20 bytes represent the hash - so replace them with the key,
320 # pad the result with zeros and hash the result
321 hash_material = bfd.original[:-20] + self.sha1_key.key + \
322 "\0" * (20 - len(self.sha1_key.key))
323 expected_hash = hashlib.sha1(hash_material).hexdigest()
324 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
325 expected_hash, "Auth key hash")
327 def verify_bfd(self, packet):
328 """ Verify correctness of BFD layer. """
330 self.test.assert_equal(bfd.version, 1, "BFD version")
331 self.test.assert_equal(bfd.your_discriminator,
332 self.bfd_values['my_discriminator'],
333 "BFD - your discriminator")
335 self.verify_sha1_auth(packet)
339 """Common code used by both IPv4 and IPv6 Test Cases"""
342 self.vapi.collect_events() # clear the event queue
343 if not self.vpp_dead:
344 self.vapi.want_bfd_events(enable_disable=0)
346 def bfd_session_up(self):
347 """ Bring BFD session up """
348 self.pg_enable_capture([self.pg0])
349 self.logger.info("BFD: Waiting for slow hello")
350 p, timeout = self.wait_for_bfd_packet(2)
351 self.logger.info("BFD: Sending Init")
352 self.test_session.update(my_discriminator=randint(0, 40000000),
353 your_discriminator=p[BFD].my_discriminator,
355 required_min_rx_interval=100000)
356 self.test_session.send_packet()
357 self.logger.info("BFD: Waiting for event")
358 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
359 self.verify_event(e, expected_state=BFDState.up)
360 self.logger.info("BFD: Session is Up")
361 self.test_session.update(state=BFDState.up)
362 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
364 def bfd_session_down(self):
365 """ Bring BFD session down """
366 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
367 self.test_session.update(state=BFDState.down)
368 self.test_session.send_packet()
369 self.logger.info("BFD: Waiting for event")
370 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
371 self.verify_event(e, expected_state=BFDState.down)
372 self.logger.info("BFD: Session is Down")
373 self.assert_equal(self.vpp_session.state, BFDState.down, BFDState)
375 def verify_ip(self, packet):
376 """ Verify correctness of IP layer. """
377 if self.vpp_session.af == AF_INET6:
379 local_ip = self.pg0.local_ip6
380 remote_ip = self.pg0.remote_ip6
381 self.assert_equal(ip.hlim, 255, "IPv6 hop limit")
384 local_ip = self.pg0.local_ip4
385 remote_ip = self.pg0.remote_ip4
386 self.assert_equal(ip.ttl, 255, "IPv4 TTL")
387 self.assert_equal(ip.src, local_ip, "IP source address")
388 self.assert_equal(ip.dst, remote_ip, "IP destination address")
390 def verify_udp(self, packet):
391 """ Verify correctness of UDP layer. """
393 self.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
394 self.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
397 def verify_event(self, event, expected_state):
398 """ Verify correctness of event values. """
400 self.logger.debug("BFD: Event: %s" % repr(e))
401 self.assert_equal(e.sw_if_index,
402 self.vpp_session.interface.sw_if_index,
403 "BFD interface index")
405 if self.vpp_session.af == AF_INET6:
407 self.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
408 if self.vpp_session.af == AF_INET:
409 self.assert_equal(e.local_addr[:4], self.vpp_session.local_addr_n,
410 "Local IPv4 address")
411 self.assert_equal(e.peer_addr[:4], self.vpp_session.peer_addr_n,
414 self.assert_equal(e.local_addr, self.vpp_session.local_addr_n,
415 "Local IPv6 address")
416 self.assert_equal(e.peer_addr, self.vpp_session.peer_addr_n,
418 self.assert_equal(e.state, expected_state, BFDState)
420 def wait_for_bfd_packet(self, timeout=1):
421 """ wait for BFD packet
423 :param timeout: how long to wait max
425 :returns: tuple (packet, time spent waiting for packet)
427 self.logger.info("BFD: Waiting for BFD packet")
429 p = self.pg0.wait_for_packet(timeout=timeout)
431 self.logger.debug(ppp("BFD: Got packet:", p))
434 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
436 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
439 self.test_session.verify_bfd(p)
440 return p, after - before
443 class BFD4TestCase(VppTestCase, BFDCommonCode):
444 """Bidirectional Forwarding Detection (BFD)"""
448 super(BFD4TestCase, cls).setUpClass()
450 cls.create_pg_interfaces([0])
452 cls.pg0.configure_ipv4_neighbors()
454 cls.pg0.resolve_arp()
457 super(BFD4TestCase, cls).tearDownClass()
461 super(BFD4TestCase, self).setUp()
462 self.factory = AuthKeyFactory()
463 self.vapi.want_bfd_events()
465 self.vpp_session = VppBFDUDPSession(self, self.pg0,
467 self.vpp_session.add_vpp_config()
468 self.vpp_session.admin_up()
469 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
471 self.vapi.want_bfd_events(enable_disable=0)
475 BFDCommonCode.tearDown(self)
476 VppTestCase.tearDown(self)
478 def test_session_up(self):
479 """ bring BFD session up """
480 self.bfd_session_up()
482 def test_session_down(self):
483 """ bring BFD session down """
484 self.bfd_session_up()
485 self.bfd_session_down()
487 def test_hold_up(self):
488 """ hold BFD session up """
489 self.bfd_session_up()
491 self.wait_for_bfd_packet()
492 self.test_session.send_packet()
493 self.assert_equal(len(self.vapi.collect_events()), 0,
494 "number of bfd events")
496 def test_slow_timer(self):
497 """ verify slow periodic control frames while session down """
498 self.pg_enable_capture([self.pg0])
500 self.logger.info("BFD: Waiting for %d BFD packets" % expected_packets)
501 self.wait_for_bfd_packet(2)
502 for i in range(expected_packets):
504 self.wait_for_bfd_packet(2)
506 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
507 # to work around timing issues
508 self.assert_in_range(
509 after - before, 0.70, 1.05, "time between slow packets")
512 def test_zero_remote_min_rx(self):
513 """ no packets when zero BFD RemoteMinRxInterval """
514 self.pg_enable_capture([self.pg0])
515 p, timeout = self.wait_for_bfd_packet(2)
516 self.test_session.update(my_discriminator=randint(0, 40000000),
517 your_discriminator=p[BFD].my_discriminator,
519 required_min_rx_interval=0)
520 self.test_session.send_packet()
521 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
522 self.verify_event(e, expected_state=BFDState.up)
525 p = self.pg0.wait_for_packet(timeout=1)
528 raise Exception(ppp("Received unexpected BFD packet:", p))
530 def test_conn_down(self):
531 """ verify session goes down after inactivity """
532 self.bfd_session_up()
533 self.wait_for_bfd_packet()
534 self.assert_equal(len(self.vapi.collect_events()), 0,
535 "number of bfd events")
536 self.wait_for_bfd_packet()
537 self.assert_equal(len(self.vapi.collect_events()), 0,
538 "number of bfd events")
539 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
540 self.verify_event(e, expected_state=BFDState.down)
542 def test_large_required_min_rx(self):
543 """ large remote RequiredMinRxInterval """
544 self.bfd_session_up()
546 self.test_session.update(required_min_rx_interval=interval)
547 self.test_session.send_packet()
550 while time.time() < now + interval / us_in_sec:
552 p = self.wait_for_bfd_packet()
554 self.logger.error(ppp("Received unexpected packet:", p))
558 self.assert_in_range(count, 0, 1, "number of packets received")
560 def test_immediate_remote_min_rx_reduce(self):
561 """ immediately honor remote min rx reduction """
562 self.vpp_session.remove_vpp_config()
563 self.vpp_session = VppBFDUDPSession(
564 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
565 self.vpp_session.add_vpp_config()
566 self.test_session.update(desired_min_tx_interval=1000000,
567 required_min_rx_interval=1000000)
568 self.bfd_session_up()
569 self.wait_for_bfd_packet()
571 self.test_session.update(required_min_rx_interval=interval)
572 self.test_session.send_packet()
573 p, ttp = self.wait_for_bfd_packet()
574 # allow extra 10% to work around timing issues, first packet is special
575 self.assert_in_range(ttp, 0, 1.10 * interval / us_in_sec,
576 "time between BFD packets")
577 p, ttp = self.wait_for_bfd_packet()
578 self.assert_in_range(ttp, .9 * .75 * interval / us_in_sec,
579 1.10 * interval / us_in_sec,
580 "time between BFD packets")
581 p, ttp = self.wait_for_bfd_packet()
582 self.assert_in_range(ttp, .9 * .75 * interval / us_in_sec,
583 1.10 * interval / us_in_sec,
584 "time between BFD packets")
587 class BFD6TestCase(VppTestCase, BFDCommonCode):
588 """Bidirectional Forwarding Detection (BFD) (IPv6) """
592 super(BFD6TestCase, cls).setUpClass()
594 cls.create_pg_interfaces([0])
596 cls.pg0.configure_ipv6_neighbors()
598 cls.pg0.resolve_ndp()
601 super(BFD6TestCase, cls).tearDownClass()
605 super(BFD6TestCase, self).setUp()
606 self.factory = AuthKeyFactory()
607 self.vapi.want_bfd_events()
609 self.vpp_session = VppBFDUDPSession(self, self.pg0,
612 self.vpp_session.add_vpp_config()
613 self.vpp_session.admin_up()
614 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
615 self.logger.debug(self.vapi.cli("show adj nbr"))
617 self.vapi.want_bfd_events(enable_disable=0)
621 BFDCommonCode.tearDown(self)
622 VppTestCase.tearDown(self)
624 def test_session_up(self):
625 """ bring BFD session up """
626 self.bfd_session_up()
628 def test_hold_up(self):
629 """ hold BFD session up """
630 self.bfd_session_up()
632 self.wait_for_bfd_packet()
633 self.test_session.send_packet()
634 self.assert_equal(len(self.vapi.collect_events()), 0,
635 "number of bfd events")
636 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
639 class BFDSHA1TestCase(VppTestCase, BFDCommonCode):
640 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
644 super(BFDSHA1TestCase, cls).setUpClass()
646 cls.create_pg_interfaces([0])
649 cls.pg0.resolve_arp()
652 super(BFDSHA1TestCase, cls).tearDownClass()
656 super(BFDSHA1TestCase, self).setUp()
657 self.factory = AuthKeyFactory()
658 self.vapi.want_bfd_events()
661 BFDCommonCode.tearDown(self)
662 VppTestCase.tearDown(self)
664 def test_session_up(self):
665 """ bring BFD session up """
666 key = self.factory.create_random_key(self)
668 self.vpp_session = VppBFDUDPSession(self, self.pg0,
671 self.vpp_session.add_vpp_config()
672 self.vpp_session.admin_up()
673 self.test_session = BFDTestSession(
674 self, self.pg0, AF_INET, sha1_key=key,
675 bfd_key_id=self.vpp_session.bfd_key_id)
676 self.bfd_session_up()
678 def test_hold_up(self):
679 """ hold BFD session up """
680 key = self.factory.create_random_key(self)
682 self.vpp_session = VppBFDUDPSession(self, self.pg0,
685 self.vpp_session.add_vpp_config()
686 self.vpp_session.admin_up()
687 self.test_session = BFDTestSession(
688 self, self.pg0, AF_INET, sha1_key=key,
689 bfd_key_id=self.vpp_session.bfd_key_id)
690 self.bfd_session_up()
692 self.wait_for_bfd_packet()
693 self.test_session.send_packet()
694 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
696 def test_hold_up_meticulous(self):
697 """ hold BFD session up - meticulous auth """
698 key = self.factory.create_random_key(
699 self, BFDAuthType.meticulous_keyed_sha1)
701 self.vpp_session = VppBFDUDPSession(self, self.pg0,
702 self.pg0.remote_ip4, sha1_key=key)
703 self.vpp_session.add_vpp_config()
704 self.vpp_session.admin_up()
705 self.test_session = BFDTestSession(
706 self, self.pg0, AF_INET, sha1_key=key,
707 bfd_key_id=self.vpp_session.bfd_key_id)
708 self.bfd_session_up()
710 self.wait_for_bfd_packet()
711 self.test_session.inc_seq_num()
712 self.test_session.send_packet()
713 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
715 def test_send_bad_seq_number(self):
716 """ session is not kept alive by msgs with bad seq numbers"""
717 key = self.factory.create_random_key(
718 self, BFDAuthType.meticulous_keyed_sha1)
720 self.vpp_session = VppBFDUDPSession(self, self.pg0,
721 self.pg0.remote_ip4, sha1_key=key)
722 self.vpp_session.add_vpp_config()
723 self.vpp_session.admin_up()
724 self.test_session = BFDTestSession(
725 self, self.pg0, AF_INET, sha1_key=key,
726 bfd_key_id=self.vpp_session.bfd_key_id)
727 self.bfd_session_up()
728 self.wait_for_bfd_packet()
729 self.test_session.send_packet()
730 self.assert_equal(len(self.vapi.collect_events()), 0,
731 "number of bfd events")
732 self.wait_for_bfd_packet()
733 self.test_session.send_packet()
734 self.assert_equal(len(self.vapi.collect_events()), 0,
735 "number of bfd events")
736 self.wait_for_bfd_packet()
737 self.test_session.send_packet()
738 self.wait_for_bfd_packet()
739 self.test_session.send_packet()
740 e = self.vapi.collect_events()
741 # session should be down now, because the sequence numbers weren't
743 self.assert_equal(len(e), 1, "number of bfd events")
744 self.verify_event(e[0], expected_state=BFDState.down)
746 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
747 legitimate_test_session,
749 rogue_bfd_values=None):
750 """ execute a rogue session interaction scenario
752 1. create vpp session, add config
753 2. bring the legitimate session up
754 3. copy the bfd values from legitimate session to rogue session
755 4. apply rogue_bfd_values to rogue session
756 5. set rogue session state to down
757 6. send message to take the session down from the rogue session
758 7. assert that the legitimate session is unaffected
761 self.vpp_session = vpp_bfd_udp_session
762 self.vpp_session.add_vpp_config()
763 self.vpp_session.admin_up()
764 self.test_session = legitimate_test_session
765 # bring vpp session up
766 self.bfd_session_up()
767 # send packet from rogue session
768 rogue_test_session.bfd_values = self.test_session.bfd_values.copy()
770 rogue_test_session.update(**rogue_bfd_values)
771 rogue_test_session.update(state=BFDState.down)
772 rogue_test_session.send_packet()
773 self.wait_for_bfd_packet()
774 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
776 def test_mismatch_auth(self):
777 """ session is not brought down by unauthenticated msg """
778 key = self.factory.create_random_key(self)
780 vpp_session = VppBFDUDPSession(
781 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
782 legitimate_test_session = BFDTestSession(
783 self, self.pg0, AF_INET, sha1_key=key,
784 bfd_key_id=vpp_session.bfd_key_id)
785 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
786 self.execute_rogue_session_scenario(vpp_session,
787 legitimate_test_session,
790 def test_mismatch_bfd_key_id(self):
791 """ session is not brought down by msg with non-existent key-id """
792 key = self.factory.create_random_key(self)
794 vpp_session = VppBFDUDPSession(
795 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
796 # pick a different random bfd key id
798 while x == vpp_session.bfd_key_id:
800 legitimate_test_session = BFDTestSession(
801 self, self.pg0, AF_INET, sha1_key=key,
802 bfd_key_id=vpp_session.bfd_key_id)
803 rogue_test_session = BFDTestSession(
804 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
805 self.execute_rogue_session_scenario(vpp_session,
806 legitimate_test_session,
809 def test_mismatched_auth_type(self):
810 """ session is not brought down by msg with wrong auth type """
811 key = self.factory.create_random_key(self)
813 vpp_session = VppBFDUDPSession(
814 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
815 legitimate_test_session = BFDTestSession(
816 self, self.pg0, AF_INET, sha1_key=key,
817 bfd_key_id=vpp_session.bfd_key_id)
818 rogue_test_session = BFDTestSession(
819 self, self.pg0, AF_INET, sha1_key=key,
820 bfd_key_id=vpp_session.bfd_key_id)
821 self.execute_rogue_session_scenario(
822 vpp_session, legitimate_test_session, rogue_test_session,
823 {'auth_type': BFDAuthType.keyed_md5})
825 def test_restart(self):
826 """ simulate remote peer restart and resynchronization """
827 key = self.factory.create_random_key(
828 self, BFDAuthType.meticulous_keyed_sha1)
830 self.vpp_session = VppBFDUDPSession(self, self.pg0,
831 self.pg0.remote_ip4, sha1_key=key)
832 self.vpp_session.add_vpp_config()
833 self.vpp_session.admin_up()
834 self.test_session = BFDTestSession(
835 self, self.pg0, AF_INET, sha1_key=key,
836 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
837 self.bfd_session_up()
838 # now we need to not respond for 2*detection_time (4 packets)
839 self.wait_for_bfd_packet()
840 self.assert_equal(len(self.vapi.collect_events()), 0,
841 "number of bfd events")
842 self.wait_for_bfd_packet()
843 self.assert_equal(len(self.vapi.collect_events()), 0,
844 "number of bfd events")
845 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
846 self.verify_event(e, expected_state=BFDState.down)
847 self.test_session.update(state=BFDState.down)
848 self.wait_for_bfd_packet()
849 self.assert_equal(len(self.vapi.collect_events()), 0,
850 "number of bfd events")
851 self.wait_for_bfd_packet()
852 self.assert_equal(len(self.vapi.collect_events()), 0,
853 "number of bfd events")
854 # reset sequence number
855 self.test_session.our_seq_number = 0
856 self.bfd_session_up()
859 class BFDAuthOnOffTestCase(VppTestCase, BFDCommonCode):
860 """Bidirectional Forwarding Detection (BFD) (changing auth) """
864 super(BFDAuthOnOffTestCase, cls).setUpClass()
866 cls.create_pg_interfaces([0])
869 cls.pg0.resolve_arp()
872 super(BFDAuthOnOffTestCase, cls).tearDownClass()
876 super(BFDAuthOnOffTestCase, self).setUp()
877 self.factory = AuthKeyFactory()
878 self.vapi.want_bfd_events()
881 BFDCommonCode.tearDown(self)
882 VppTestCase.tearDown(self)
884 def test_auth_on_immediate(self):
885 """ turn auth on without disturbing session state (immediate) """
886 key = self.factory.create_random_key(self)
888 self.vpp_session = VppBFDUDPSession(self, self.pg0,
890 self.vpp_session.add_vpp_config()
891 self.vpp_session.admin_up()
892 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
893 self.bfd_session_up()
895 self.wait_for_bfd_packet()
896 self.test_session.send_packet()
897 self.vpp_session.activate_auth(key)
898 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
899 self.test_session.sha1_key = key
901 self.wait_for_bfd_packet()
902 self.test_session.send_packet()
903 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
904 self.assert_equal(len(self.vapi.collect_events()), 0,
905 "number of bfd events")
907 def test_auth_off_immediate(self):
908 """ turn auth off without disturbing session state (immediate) """
909 key = self.factory.create_random_key(self)
911 self.vpp_session = VppBFDUDPSession(self, self.pg0,
912 self.pg0.remote_ip4, sha1_key=key)
913 self.vpp_session.add_vpp_config()
914 self.vpp_session.admin_up()
915 self.test_session = BFDTestSession(
916 self, self.pg0, AF_INET, sha1_key=key,
917 bfd_key_id=self.vpp_session.bfd_key_id)
918 self.bfd_session_up()
920 self.wait_for_bfd_packet()
921 self.test_session.send_packet()
922 self.vpp_session.deactivate_auth()
923 self.test_session.bfd_key_id = None
924 self.test_session.sha1_key = None
926 self.wait_for_bfd_packet()
927 self.test_session.send_packet()
928 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
929 self.assert_equal(len(self.vapi.collect_events()), 0,
930 "number of bfd events")
932 def test_auth_change_key_immediate(self):
933 """ change auth key without disturbing session state (immediate) """
934 key1 = self.factory.create_random_key(self)
935 key1.add_vpp_config()
936 key2 = self.factory.create_random_key(self)
937 key2.add_vpp_config()
938 self.vpp_session = VppBFDUDPSession(self, self.pg0,
939 self.pg0.remote_ip4, sha1_key=key1)
940 self.vpp_session.add_vpp_config()
941 self.vpp_session.admin_up()
942 self.test_session = BFDTestSession(
943 self, self.pg0, AF_INET, sha1_key=key1,
944 bfd_key_id=self.vpp_session.bfd_key_id)
945 self.bfd_session_up()
947 self.wait_for_bfd_packet()
948 self.test_session.send_packet()
949 self.vpp_session.activate_auth(key2)
950 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
951 self.test_session.sha1_key = key2
953 self.wait_for_bfd_packet()
954 self.test_session.send_packet()
955 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
956 self.assert_equal(len(self.vapi.collect_events()), 0,
957 "number of bfd events")
959 def test_auth_on_delayed(self):
960 """ turn auth on without disturbing session state (delayed) """
961 key = self.factory.create_random_key(self)
963 self.vpp_session = VppBFDUDPSession(self, self.pg0,
965 self.vpp_session.add_vpp_config()
966 self.vpp_session.admin_up()
967 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
968 self.bfd_session_up()
970 self.wait_for_bfd_packet()
971 self.test_session.send_packet()
972 self.vpp_session.activate_auth(key, delayed=True)
974 self.wait_for_bfd_packet()
975 self.test_session.send_packet()
976 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
977 self.test_session.sha1_key = key
978 self.test_session.send_packet()
980 self.wait_for_bfd_packet()
981 self.test_session.send_packet()
982 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
983 self.assert_equal(len(self.vapi.collect_events()), 0,
984 "number of bfd events")
986 def test_auth_off_delayed(self):
987 """ turn auth off without disturbing session state (delayed) """
988 key = self.factory.create_random_key(self)
990 self.vpp_session = VppBFDUDPSession(self, self.pg0,
991 self.pg0.remote_ip4, sha1_key=key)
992 self.vpp_session.add_vpp_config()
993 self.vpp_session.admin_up()
994 self.test_session = BFDTestSession(
995 self, self.pg0, AF_INET, sha1_key=key,
996 bfd_key_id=self.vpp_session.bfd_key_id)
997 self.bfd_session_up()
999 self.wait_for_bfd_packet()
1000 self.test_session.send_packet()
1001 self.vpp_session.deactivate_auth(delayed=True)
1003 self.wait_for_bfd_packet()
1004 self.test_session.send_packet()
1005 self.test_session.bfd_key_id = None
1006 self.test_session.sha1_key = None
1007 self.test_session.send_packet()
1009 self.wait_for_bfd_packet()
1010 self.test_session.send_packet()
1011 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1012 self.assert_equal(len(self.vapi.collect_events()), 0,
1013 "number of bfd events")
1015 def test_auth_change_key_delayed(self):
1016 """ change auth key without disturbing session state (delayed) """
1017 key1 = self.factory.create_random_key(self)
1018 key1.add_vpp_config()
1019 key2 = self.factory.create_random_key(self)
1020 key2.add_vpp_config()
1021 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1022 self.pg0.remote_ip4, sha1_key=key1)
1023 self.vpp_session.add_vpp_config()
1024 self.vpp_session.admin_up()
1025 self.test_session = BFDTestSession(
1026 self, self.pg0, AF_INET, sha1_key=key1,
1027 bfd_key_id=self.vpp_session.bfd_key_id)
1028 self.bfd_session_up()
1030 self.wait_for_bfd_packet()
1031 self.test_session.send_packet()
1032 self.vpp_session.activate_auth(key2, delayed=True)
1034 self.wait_for_bfd_packet()
1035 self.test_session.send_packet()
1036 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1037 self.test_session.sha1_key = key2
1038 self.test_session.send_packet()
1040 self.wait_for_bfd_packet()
1041 self.test_session.send_packet()
1042 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1043 self.assert_equal(len(self.vapi.collect_events()), 0,
1044 "number of bfd events")
1046 if __name__ == '__main__':
1047 unittest.main(testRunner=VppTestRunner)