BFD-FIB interactions
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from util import ppp
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22 from vpp_ip_route import VppIpRoute, VppRoutePath
23
24 USEC_IN_SEC = 1000000
25
26
27 class AuthKeyFactory(object):
28     """Factory class for creating auth keys with unique conf key ID"""
29
30     def __init__(self):
31         self._conf_key_ids = {}
32
33     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
34         """ create a random key with unique conf key id """
35         conf_key_id = randint(0, 0xFFFFFFFF)
36         while conf_key_id in self._conf_key_ids:
37             conf_key_id = randint(0, 0xFFFFFFFF)
38         self._conf_key_ids[conf_key_id] = 1
39         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
40         return VppBFDAuthKey(test=test, auth_type=auth_type,
41                              conf_key_id=conf_key_id, key=key)
42
43
44 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
45 class BFDAPITestCase(VppTestCase):
46     """Bidirectional Forwarding Detection (BFD) - API"""
47
48     pg0 = None
49     pg1 = None
50
51     @classmethod
52     def setUpClass(cls):
53         super(BFDAPITestCase, cls).setUpClass()
54
55         try:
56             cls.create_pg_interfaces(range(2))
57             for i in cls.pg_interfaces:
58                 i.config_ip4()
59                 i.config_ip6()
60                 i.resolve_arp()
61
62         except Exception:
63             super(BFDAPITestCase, cls).tearDownClass()
64             raise
65
66     def setUp(self):
67         super(BFDAPITestCase, self).setUp()
68         self.factory = AuthKeyFactory()
69
70     def test_add_bfd(self):
71         """ create a BFD session """
72         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
73         session.add_vpp_config()
74         self.logger.debug("Session state is %s", session.state)
75         session.remove_vpp_config()
76         session.add_vpp_config()
77         self.logger.debug("Session state is %s", session.state)
78         session.remove_vpp_config()
79
80     def test_double_add(self):
81         """ create the same BFD session twice (negative case) """
82         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
83         session.add_vpp_config()
84
85         with self.vapi.expect_negative_api_retval():
86             session.add_vpp_config()
87
88         session.remove_vpp_config()
89
90     def test_add_bfd6(self):
91         """ create IPv6 BFD session """
92         session = VppBFDUDPSession(
93             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
94         session.add_vpp_config()
95         self.logger.debug("Session state is %s", session.state)
96         session.remove_vpp_config()
97         session.add_vpp_config()
98         self.logger.debug("Session state is %s", session.state)
99         session.remove_vpp_config()
100
101     def test_mod_bfd(self):
102         """ modify BFD session parameters """
103         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
104                                    desired_min_tx=50000,
105                                    required_min_rx=10000,
106                                    detect_mult=1)
107         session.add_vpp_config()
108         s = session.get_bfd_udp_session_dump_entry()
109         self.assert_equal(session.desired_min_tx,
110                           s.desired_min_tx,
111                           "desired min transmit interval")
112         self.assert_equal(session.required_min_rx,
113                           s.required_min_rx,
114                           "required min receive interval")
115         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
116         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
117                                   required_min_rx=session.required_min_rx * 2,
118                                   detect_mult=session.detect_mult * 2)
119         s = session.get_bfd_udp_session_dump_entry()
120         self.assert_equal(session.desired_min_tx,
121                           s.desired_min_tx,
122                           "desired min transmit interval")
123         self.assert_equal(session.required_min_rx,
124                           s.required_min_rx,
125                           "required min receive interval")
126         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
127
128     def test_add_sha1_keys(self):
129         """ add SHA1 keys """
130         key_count = 10
131         keys = [self.factory.create_random_key(
132             self) for i in range(0, key_count)]
133         for key in keys:
134             self.assertFalse(key.query_vpp_config())
135         for key in keys:
136             key.add_vpp_config()
137         for key in keys:
138             self.assertTrue(key.query_vpp_config())
139         # remove randomly
140         indexes = range(key_count)
141         shuffle(indexes)
142         removed = []
143         for i in indexes:
144             key = keys[i]
145             key.remove_vpp_config()
146             removed.append(i)
147             for j in range(key_count):
148                 key = keys[j]
149                 if j in removed:
150                     self.assertFalse(key.query_vpp_config())
151                 else:
152                     self.assertTrue(key.query_vpp_config())
153         # should be removed now
154         for key in keys:
155             self.assertFalse(key.query_vpp_config())
156         # add back and remove again
157         for key in keys:
158             key.add_vpp_config()
159         for key in keys:
160             self.assertTrue(key.query_vpp_config())
161         for key in keys:
162             key.remove_vpp_config()
163         for key in keys:
164             self.assertFalse(key.query_vpp_config())
165
166     def test_add_bfd_sha1(self):
167         """ create a BFD session (SHA1) """
168         key = self.factory.create_random_key(self)
169         key.add_vpp_config()
170         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
171                                    sha1_key=key)
172         session.add_vpp_config()
173         self.logger.debug("Session state is %s", session.state)
174         session.remove_vpp_config()
175         session.add_vpp_config()
176         self.logger.debug("Session state is %s", session.state)
177         session.remove_vpp_config()
178
179     def test_double_add_sha1(self):
180         """ create the same BFD session twice (negative case) (SHA1) """
181         key = self.factory.create_random_key(self)
182         key.add_vpp_config()
183         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184                                    sha1_key=key)
185         session.add_vpp_config()
186         with self.assertRaises(Exception):
187             session.add_vpp_config()
188
189     def test_add_auth_nonexistent_key(self):
190         """ create BFD session using non-existent SHA1 (negative case) """
191         session = VppBFDUDPSession(
192             self, self.pg0, self.pg0.remote_ip4,
193             sha1_key=self.factory.create_random_key(self))
194         with self.assertRaises(Exception):
195             session.add_vpp_config()
196
197     def test_shared_sha1_key(self):
198         """ share single SHA1 key between multiple BFD sessions """
199         key = self.factory.create_random_key(self)
200         key.add_vpp_config()
201         sessions = [
202             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
203                              sha1_key=key),
204             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
205                              sha1_key=key, af=AF_INET6),
206             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
207                              sha1_key=key),
208             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
209                              sha1_key=key, af=AF_INET6)]
210         for s in sessions:
211             s.add_vpp_config()
212         removed = 0
213         for s in sessions:
214             e = key.get_bfd_auth_keys_dump_entry()
215             self.assert_equal(e.use_count, len(sessions) - removed,
216                               "Use count for shared key")
217             s.remove_vpp_config()
218             removed += 1
219         e = key.get_bfd_auth_keys_dump_entry()
220         self.assert_equal(e.use_count, len(sessions) - removed,
221                           "Use count for shared key")
222
223     def test_activate_auth(self):
224         """ activate SHA1 authentication """
225         key = self.factory.create_random_key(self)
226         key.add_vpp_config()
227         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
228         session.add_vpp_config()
229         session.activate_auth(key)
230
231     def test_deactivate_auth(self):
232         """ deactivate SHA1 authentication """
233         key = self.factory.create_random_key(self)
234         key.add_vpp_config()
235         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
236         session.add_vpp_config()
237         session.activate_auth(key)
238         session.deactivate_auth()
239
240     def test_change_key(self):
241         """ change SHA1 key """
242         key1 = self.factory.create_random_key(self)
243         key2 = self.factory.create_random_key(self)
244         while key2.conf_key_id == key1.conf_key_id:
245             key2 = self.factory.create_random_key(self)
246         key1.add_vpp_config()
247         key2.add_vpp_config()
248         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
249                                    sha1_key=key1)
250         session.add_vpp_config()
251         session.activate_auth(key2)
252
253
254 class BFDTestSession(object):
255     """ BFD session as seen from test framework side """
256
257     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
258                  bfd_key_id=None, our_seq_number=None):
259         self.test = test
260         self.af = af
261         self.sha1_key = sha1_key
262         self.bfd_key_id = bfd_key_id
263         self.interface = interface
264         self.udp_sport = randint(49152, 65535)
265         if our_seq_number is None:
266             self.our_seq_number = randint(0, 40000000)
267         else:
268             self.our_seq_number = our_seq_number
269         self.vpp_seq_number = None
270         self.my_discriminator = 0
271         self.desired_min_tx = 300000
272         self.required_min_rx = 300000
273         self.required_min_echo_rx = None
274         self.detect_mult = detect_mult
275         self.diag = BFDDiagCode.no_diagnostic
276         self.your_discriminator = None
277         self.state = BFDState.down
278         self.auth_type = BFDAuthType.no_auth
279
280     def inc_seq_num(self):
281         """ increment sequence number, wrapping if needed """
282         if self.our_seq_number == 0xFFFFFFFF:
283             self.our_seq_number = 0
284         else:
285             self.our_seq_number += 1
286
287     def update(self, my_discriminator=None, your_discriminator=None,
288                desired_min_tx=None, required_min_rx=None,
289                required_min_echo_rx=None, detect_mult=None,
290                diag=None, state=None, auth_type=None):
291         """ update BFD parameters associated with session """
292         if my_discriminator is not None:
293             self.my_discriminator = my_discriminator
294         if your_discriminator is not None:
295             self.your_discriminator = your_discriminator
296         if required_min_rx is not None:
297             self.required_min_rx = required_min_rx
298         if required_min_echo_rx is not None:
299             self.required_min_echo_rx = required_min_echo_rx
300         if desired_min_tx is not None:
301             self.desired_min_tx = desired_min_tx
302         if detect_mult is not None:
303             self.detect_mult = detect_mult
304         if diag is not None:
305             self.diag = diag
306         if state is not None:
307             self.state = state
308         if auth_type is not None:
309             self.auth_type = auth_type
310
311     def fill_packet_fields(self, packet):
312         """ set packet fields with known values in packet """
313         bfd = packet[BFD]
314         if self.my_discriminator:
315             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
316                                    self.my_discriminator)
317             bfd.my_discriminator = self.my_discriminator
318         if self.your_discriminator:
319             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
320                                    self.your_discriminator)
321             bfd.your_discriminator = self.your_discriminator
322         if self.required_min_rx:
323             self.test.logger.debug(
324                 "BFD: setting packet.required_min_rx_interval=%s",
325                 self.required_min_rx)
326             bfd.required_min_rx_interval = self.required_min_rx
327         if self.required_min_echo_rx:
328             self.test.logger.debug(
329                 "BFD: setting packet.required_min_echo_rx=%s",
330                 self.required_min_echo_rx)
331             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
332         if self.desired_min_tx:
333             self.test.logger.debug(
334                 "BFD: setting packet.desired_min_tx_interval=%s",
335                 self.desired_min_tx)
336             bfd.desired_min_tx_interval = self.desired_min_tx
337         if self.detect_mult:
338             self.test.logger.debug(
339                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
340             bfd.detect_mult = self.detect_mult
341         if self.diag:
342             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
343             bfd.diag = self.diag
344         if self.state:
345             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
346             bfd.state = self.state
347         if self.auth_type:
348             # this is used by a negative test-case
349             self.test.logger.debug("BFD: setting packet.auth_type=%s",
350                                    self.auth_type)
351             bfd.auth_type = self.auth_type
352
353     def create_packet(self):
354         """ create a BFD packet, reflecting the current state of session """
355         if self.sha1_key:
356             bfd = BFD(flags="A")
357             bfd.auth_type = self.sha1_key.auth_type
358             bfd.auth_len = BFD.sha1_auth_len
359             bfd.auth_key_id = self.bfd_key_id
360             bfd.auth_seq_num = self.our_seq_number
361             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
362         else:
363             bfd = BFD()
364         if self.af == AF_INET6:
365             packet = (Ether(src=self.interface.remote_mac,
366                             dst=self.interface.local_mac) /
367                       IPv6(src=self.interface.remote_ip6,
368                            dst=self.interface.local_ip6,
369                            hlim=255) /
370                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
371                       bfd)
372         else:
373             packet = (Ether(src=self.interface.remote_mac,
374                             dst=self.interface.local_mac) /
375                       IP(src=self.interface.remote_ip4,
376                          dst=self.interface.local_ip4,
377                          ttl=255) /
378                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
379                       bfd)
380         self.test.logger.debug("BFD: Creating packet")
381         self.fill_packet_fields(packet)
382         if self.sha1_key:
383             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
384                 "\0" * (20 - len(self.sha1_key.key))
385             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
386                                    hashlib.sha1(hash_material).hexdigest())
387             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
388         return packet
389
390     def send_packet(self, packet=None, interface=None):
391         """ send packet on interface, creating the packet if needed """
392         if packet is None:
393             packet = self.create_packet()
394         if interface is None:
395             interface = self.test.pg0
396         self.test.logger.debug(ppp("Sending packet:", packet))
397         interface.add_stream(packet)
398         self.test.pg_start()
399
400     def verify_sha1_auth(self, packet):
401         """ Verify correctness of authentication in BFD layer. """
402         bfd = packet[BFD]
403         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
404         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
405                                BFDAuthType)
406         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
407         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
408         if self.vpp_seq_number is None:
409             self.vpp_seq_number = bfd.auth_seq_num
410             self.test.logger.debug("Received initial sequence number: %s" %
411                                    self.vpp_seq_number)
412         else:
413             recvd_seq_num = bfd.auth_seq_num
414             self.test.logger.debug("Received followup sequence number: %s" %
415                                    recvd_seq_num)
416             if self.vpp_seq_number < 0xffffffff:
417                 if self.sha1_key.auth_type == \
418                         BFDAuthType.meticulous_keyed_sha1:
419                     self.test.assert_equal(recvd_seq_num,
420                                            self.vpp_seq_number + 1,
421                                            "BFD sequence number")
422                 else:
423                     self.test.assert_in_range(recvd_seq_num,
424                                               self.vpp_seq_number,
425                                               self.vpp_seq_number + 1,
426                                               "BFD sequence number")
427             else:
428                 if self.sha1_key.auth_type == \
429                         BFDAuthType.meticulous_keyed_sha1:
430                     self.test.assert_equal(recvd_seq_num, 0,
431                                            "BFD sequence number")
432                 else:
433                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
434                                        "BFD sequence number not one of "
435                                        "(%s, 0)" % self.vpp_seq_number)
436             self.vpp_seq_number = recvd_seq_num
437         # last 20 bytes represent the hash - so replace them with the key,
438         # pad the result with zeros and hash the result
439         hash_material = bfd.original[:-20] + self.sha1_key.key + \
440             "\0" * (20 - len(self.sha1_key.key))
441         expected_hash = hashlib.sha1(hash_material).hexdigest()
442         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
443                                expected_hash, "Auth key hash")
444
445     def verify_bfd(self, packet):
446         """ Verify correctness of BFD layer. """
447         bfd = packet[BFD]
448         self.test.assert_equal(bfd.version, 1, "BFD version")
449         self.test.assert_equal(bfd.your_discriminator,
450                                self.my_discriminator,
451                                "BFD - your discriminator")
452         if self.sha1_key:
453             self.verify_sha1_auth(packet)
454
455
456 def bfd_session_up(test):
457     """ Bring BFD session up """
458     test.logger.info("BFD: Waiting for slow hello")
459     p = wait_for_bfd_packet(test, 2)
460     old_offset = None
461     if hasattr(test, 'vpp_clock_offset'):
462         old_offset = test.vpp_clock_offset
463     test.vpp_clock_offset = time.time() - p.time
464     test.logger.debug("BFD: Calculated vpp clock offset: %s",
465                       test.vpp_clock_offset)
466     if old_offset:
467         test.assertAlmostEqual(
468             old_offset, test.vpp_clock_offset, delta=0.5,
469             msg="vpp clock offset not stable (new: %s, old: %s)" %
470             (test.vpp_clock_offset, old_offset))
471     test.logger.info("BFD: Sending Init")
472     test.test_session.update(my_discriminator=randint(0, 40000000),
473                              your_discriminator=p[BFD].my_discriminator,
474                              state=BFDState.init)
475     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
476             BFDAuthType.meticulous_keyed_sha1:
477         test.test_session.inc_seq_num()
478     test.test_session.send_packet()
479     test.logger.info("BFD: Waiting for event")
480     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
481     verify_event(test, e, expected_state=BFDState.up)
482     test.logger.info("BFD: Session is Up")
483     test.test_session.update(state=BFDState.up)
484     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
485             BFDAuthType.meticulous_keyed_sha1:
486         test.test_session.inc_seq_num()
487     test.test_session.send_packet()
488     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
489
490
491 def bfd_session_down(test):
492     """ Bring BFD session down """
493     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
494     test.test_session.update(state=BFDState.down)
495     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
496             BFDAuthType.meticulous_keyed_sha1:
497         test.test_session.inc_seq_num()
498     test.test_session.send_packet()
499     test.logger.info("BFD: Waiting for event")
500     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
501     verify_event(test, e, expected_state=BFDState.down)
502     test.logger.info("BFD: Session is Down")
503     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
504
505
506 def verify_bfd_session_config(test, session, state=None):
507     dump = session.get_bfd_udp_session_dump_entry()
508     test.assertIsNotNone(dump)
509     # since dump is not none, we have verified that sw_if_index and addresses
510     # are valid (in get_bfd_udp_session_dump_entry)
511     if state:
512         test.assert_equal(dump.state, state, "session state")
513     test.assert_equal(dump.required_min_rx, session.required_min_rx,
514                       "required min rx interval")
515     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
516                       "desired min tx interval")
517     test.assert_equal(dump.detect_mult, session.detect_mult,
518                       "detect multiplier")
519     if session.sha1_key is None:
520         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
521     else:
522         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
523         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
524                           "bfd key id")
525         test.assert_equal(dump.conf_key_id,
526                           session.sha1_key.conf_key_id,
527                           "config key id")
528
529
530 def verify_ip(test, packet):
531     """ Verify correctness of IP layer. """
532     if test.vpp_session.af == AF_INET6:
533         ip = packet[IPv6]
534         local_ip = test.pg0.local_ip6
535         remote_ip = test.pg0.remote_ip6
536         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
537     else:
538         ip = packet[IP]
539         local_ip = test.pg0.local_ip4
540         remote_ip = test.pg0.remote_ip4
541         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
542     test.assert_equal(ip.src, local_ip, "IP source address")
543     test.assert_equal(ip.dst, remote_ip, "IP destination address")
544
545
546 def verify_udp(test, packet):
547     """ Verify correctness of UDP layer. """
548     udp = packet[UDP]
549     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
550     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
551                          "UDP source port")
552
553
554 def verify_event(test, event, expected_state):
555     """ Verify correctness of event values. """
556     e = event
557     test.logger.debug("BFD: Event: %s" % repr(e))
558     test.assert_equal(e.sw_if_index,
559                       test.vpp_session.interface.sw_if_index,
560                       "BFD interface index")
561     is_ipv6 = 0
562     if test.vpp_session.af == AF_INET6:
563         is_ipv6 = 1
564     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
565     if test.vpp_session.af == AF_INET:
566         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
567                           "Local IPv4 address")
568         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
569                           "Peer IPv4 address")
570     else:
571         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
572                           "Local IPv6 address")
573         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
574                           "Peer IPv6 address")
575     test.assert_equal(e.state, expected_state, BFDState)
576
577
578 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
579     """ wait for BFD packet and verify its correctness
580
581     :param timeout: how long to wait
582     :param pcap_time_min: ignore packets with pcap timestamp lower than this
583
584     :returns: tuple (packet, time spent waiting for packet)
585     """
586     test.logger.info("BFD: Waiting for BFD packet")
587     deadline = time.time() + timeout
588     counter = 0
589     while True:
590         counter += 1
591         # sanity check
592         test.assert_in_range(counter, 0, 100, "number of packets ignored")
593         time_left = deadline - time.time()
594         if time_left < 0:
595             raise CaptureTimeoutError("Packet did not arrive within timeout")
596         p = test.pg0.wait_for_packet(timeout=time_left)
597         test.logger.debug(ppp("BFD: Got packet:", p))
598         if pcap_time_min is not None and p.time < pcap_time_min:
599             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
600                                   "pcap time min %s):" %
601                                   (p.time, pcap_time_min), p))
602         else:
603             break
604     bfd = p[BFD]
605     if bfd is None:
606         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
607     if bfd.payload:
608         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
609     verify_ip(test, p)
610     verify_udp(test, p)
611     test.test_session.verify_bfd(p)
612     return p
613
614
615 class BFD4TestCase(VppTestCase):
616     """Bidirectional Forwarding Detection (BFD)"""
617
618     pg0 = None
619     vpp_clock_offset = None
620     vpp_session = None
621     test_session = None
622
623     @classmethod
624     def setUpClass(cls):
625         super(BFD4TestCase, cls).setUpClass()
626         try:
627             cls.create_pg_interfaces([0])
628             cls.create_loopback_interfaces([0])
629             cls.loopback0 = cls.lo_interfaces[0]
630             cls.loopback0.config_ip4()
631             cls.loopback0.admin_up()
632             cls.pg0.config_ip4()
633             cls.pg0.configure_ipv4_neighbors()
634             cls.pg0.admin_up()
635             cls.pg0.resolve_arp()
636
637         except Exception:
638             super(BFD4TestCase, cls).tearDownClass()
639             raise
640
641     def setUp(self):
642         super(BFD4TestCase, self).setUp()
643         self.factory = AuthKeyFactory()
644         self.vapi.want_bfd_events()
645         self.pg0.enable_capture()
646         try:
647             self.vpp_session = VppBFDUDPSession(self, self.pg0,
648                                                 self.pg0.remote_ip4)
649             self.vpp_session.add_vpp_config()
650             self.vpp_session.admin_up()
651             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
652         except:
653             self.vapi.want_bfd_events(enable_disable=0)
654             raise
655
656     def tearDown(self):
657         if not self.vpp_dead:
658             self.vapi.want_bfd_events(enable_disable=0)
659         self.vapi.collect_events()  # clear the event queue
660         super(BFD4TestCase, self).tearDown()
661
662     def test_session_up(self):
663         """ bring BFD session up """
664         bfd_session_up(self)
665
666     def test_session_up_by_ip(self):
667         """ bring BFD session up - first frame looked up by address pair """
668         self.logger.info("BFD: Sending Slow control frame")
669         self.test_session.update(my_discriminator=randint(0, 40000000))
670         self.test_session.send_packet()
671         self.pg0.enable_capture()
672         p = self.pg0.wait_for_packet(1)
673         self.assert_equal(p[BFD].your_discriminator,
674                           self.test_session.my_discriminator,
675                           "BFD - your discriminator")
676         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
677         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
678                                  state=BFDState.up)
679         self.logger.info("BFD: Waiting for event")
680         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
681         verify_event(self, e, expected_state=BFDState.init)
682         self.logger.info("BFD: Sending Up")
683         self.test_session.send_packet()
684         self.logger.info("BFD: Waiting for event")
685         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
686         verify_event(self, e, expected_state=BFDState.up)
687         self.logger.info("BFD: Session is Up")
688         self.test_session.update(state=BFDState.up)
689         self.test_session.send_packet()
690         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
691
692     def test_session_down(self):
693         """ bring BFD session down """
694         bfd_session_up(self)
695         bfd_session_down(self)
696
697     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
698     def test_hold_up(self):
699         """ hold BFD session up """
700         bfd_session_up(self)
701         for dummy in range(self.test_session.detect_mult * 2):
702             wait_for_bfd_packet(self)
703             self.test_session.send_packet()
704         self.assert_equal(len(self.vapi.collect_events()), 0,
705                           "number of bfd events")
706
707     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
708     def test_slow_timer(self):
709         """ verify slow periodic control frames while session down """
710         packet_count = 3
711         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
712         prev_packet = wait_for_bfd_packet(self, 2)
713         for dummy in range(packet_count):
714             next_packet = wait_for_bfd_packet(self, 2)
715             time_diff = next_packet.time - prev_packet.time
716             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
717             # to work around timing issues
718             self.assert_in_range(
719                 time_diff, 0.70, 1.05, "time between slow packets")
720             prev_packet = next_packet
721
722     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
723     def test_zero_remote_min_rx(self):
724         """ no packets when zero remote required min rx interval """
725         bfd_session_up(self)
726         self.test_session.update(required_min_rx=0)
727         self.test_session.send_packet()
728         for dummy in range(self.test_session.detect_mult):
729             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
730                        "sleep before transmitting bfd packet")
731             self.test_session.send_packet()
732             try:
733                 p = wait_for_bfd_packet(self, timeout=0)
734                 self.logger.error(ppp("Received unexpected packet:", p))
735             except CaptureTimeoutError:
736                 pass
737         self.assert_equal(
738             len(self.vapi.collect_events()), 0, "number of bfd events")
739         self.test_session.update(required_min_rx=300000)
740         for dummy in range(3):
741             self.test_session.send_packet()
742             wait_for_bfd_packet(
743                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
744         self.assert_equal(
745             len(self.vapi.collect_events()), 0, "number of bfd events")
746
747     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
748     def test_conn_down(self):
749         """ verify session goes down after inactivity """
750         bfd_session_up(self)
751         detection_time = self.test_session.detect_mult *\
752             self.vpp_session.required_min_rx / USEC_IN_SEC
753         self.sleep(detection_time, "waiting for BFD session time-out")
754         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755         verify_event(self, e, expected_state=BFDState.down)
756
757     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
758     def test_large_required_min_rx(self):
759         """ large remote required min rx interval """
760         bfd_session_up(self)
761         p = wait_for_bfd_packet(self)
762         interval = 3000000
763         self.test_session.update(required_min_rx=interval)
764         self.test_session.send_packet()
765         time_mark = time.time()
766         count = 0
767         # busy wait here, trying to collect a packet or event, vpp is not
768         # allowed to send packets and the session will timeout first - so the
769         # Up->Down event must arrive before any packets do
770         while time.time() < time_mark + interval / USEC_IN_SEC:
771             try:
772                 p = wait_for_bfd_packet(self, timeout=0)
773                 # if vpp managed to send a packet before we did the session
774                 # session update, then that's fine, ignore it
775                 if p.time < time_mark - self.vpp_clock_offset:
776                     continue
777                 self.logger.error(ppp("Received unexpected packet:", p))
778                 count += 1
779             except CaptureTimeoutError:
780                 pass
781             events = self.vapi.collect_events()
782             if len(events) > 0:
783                 verify_event(self, events[0], BFDState.down)
784                 break
785         self.assert_equal(count, 0, "number of packets received")
786
787     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
788     def test_immediate_remote_min_rx_reduction(self):
789         """ immediately honor remote required min rx reduction """
790         self.vpp_session.remove_vpp_config()
791         self.vpp_session = VppBFDUDPSession(
792             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
793         self.pg0.enable_capture()
794         self.vpp_session.add_vpp_config()
795         self.test_session.update(desired_min_tx=1000000,
796                                  required_min_rx=1000000)
797         bfd_session_up(self)
798         reference_packet = wait_for_bfd_packet(self)
799         time_mark = time.time()
800         interval = 300000
801         self.test_session.update(required_min_rx=interval)
802         self.test_session.send_packet()
803         extra_time = time.time() - time_mark
804         p = wait_for_bfd_packet(self)
805         # first packet is allowed to be late by time we spent doing the update
806         # calculated in extra_time
807         self.assert_in_range(p.time - reference_packet.time,
808                              .95 * 0.75 * interval / USEC_IN_SEC,
809                              1.05 * interval / USEC_IN_SEC + extra_time,
810                              "time between BFD packets")
811         reference_packet = p
812         for dummy in range(3):
813             p = wait_for_bfd_packet(self)
814             diff = p.time - reference_packet.time
815             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
816                                  1.05 * interval / USEC_IN_SEC,
817                                  "time between BFD packets")
818             reference_packet = p
819
820     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
821     def test_modify_req_min_rx_double(self):
822         """ modify session - double required min rx """
823         bfd_session_up(self)
824         p = wait_for_bfd_packet(self)
825         self.test_session.update(desired_min_tx=10000,
826                                  required_min_rx=10000)
827         self.test_session.send_packet()
828         # double required min rx
829         self.vpp_session.modify_parameters(
830             required_min_rx=2 * self.vpp_session.required_min_rx)
831         p = wait_for_bfd_packet(
832             self, pcap_time_min=time.time() - self.vpp_clock_offset)
833         # poll bit needs to be set
834         self.assertIn("P", p.sprintf("%BFD.flags%"),
835                       "Poll bit not set in BFD packet")
836         # finish poll sequence with final packet
837         final = self.test_session.create_packet()
838         final[BFD].flags = "F"
839         timeout = self.test_session.detect_mult * \
840             max(self.test_session.desired_min_tx,
841                 self.vpp_session.required_min_rx) / USEC_IN_SEC
842         self.test_session.send_packet(final)
843         time_mark = time.time()
844         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
845         verify_event(self, e, expected_state=BFDState.down)
846         time_to_event = time.time() - time_mark
847         self.assert_in_range(time_to_event, .9 * timeout,
848                              1.1 * timeout, "session timeout")
849
850     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
851     def test_modify_req_min_rx_halve(self):
852         """ modify session - halve required min rx """
853         self.vpp_session.modify_parameters(
854             required_min_rx=2 * self.vpp_session.required_min_rx)
855         bfd_session_up(self)
856         p = wait_for_bfd_packet(self)
857         self.test_session.update(desired_min_tx=10000,
858                                  required_min_rx=10000)
859         self.test_session.send_packet()
860         p = wait_for_bfd_packet(
861             self, pcap_time_min=time.time() - self.vpp_clock_offset)
862         # halve required min rx
863         old_required_min_rx = self.vpp_session.required_min_rx
864         self.vpp_session.modify_parameters(
865             required_min_rx=0.5 * self.vpp_session.required_min_rx)
866         # now we wait 0.8*3*old-req-min-rx and the session should still be up
867         self.sleep(0.8 * self.vpp_session.detect_mult *
868                    old_required_min_rx / USEC_IN_SEC,
869                    "wait before finishing poll sequence")
870         self.assert_equal(len(self.vapi.collect_events()), 0,
871                           "number of bfd events")
872         p = wait_for_bfd_packet(self)
873         # poll bit needs to be set
874         self.assertIn("P", p.sprintf("%BFD.flags%"),
875                       "Poll bit not set in BFD packet")
876         # finish poll sequence with final packet
877         final = self.test_session.create_packet()
878         final[BFD].flags = "F"
879         self.test_session.send_packet(final)
880         # now the session should time out under new conditions
881         detection_time = self.test_session.detect_mult *\
882             self.vpp_session.required_min_rx / USEC_IN_SEC
883         before = time.time()
884         e = self.vapi.wait_for_event(
885             2 * detection_time, "bfd_udp_session_details")
886         after = time.time()
887         self.assert_in_range(after - before,
888                              0.9 * detection_time,
889                              1.1 * detection_time,
890                              "time before bfd session goes down")
891         verify_event(self, e, expected_state=BFDState.down)
892
893     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
894     def test_modify_detect_mult(self):
895         """ modify detect multiplier """
896         bfd_session_up(self)
897         p = wait_for_bfd_packet(self)
898         self.vpp_session.modify_parameters(detect_mult=1)
899         p = wait_for_bfd_packet(
900             self, pcap_time_min=time.time() - self.vpp_clock_offset)
901         self.assert_equal(self.vpp_session.detect_mult,
902                           p[BFD].detect_mult,
903                           "detect mult")
904         # poll bit must not be set
905         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
906                          "Poll bit not set in BFD packet")
907         self.vpp_session.modify_parameters(detect_mult=10)
908         p = wait_for_bfd_packet(
909             self, pcap_time_min=time.time() - self.vpp_clock_offset)
910         self.assert_equal(self.vpp_session.detect_mult,
911                           p[BFD].detect_mult,
912                           "detect mult")
913         # poll bit must not be set
914         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
915                          "Poll bit not set in BFD packet")
916
917     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
918     def test_queued_poll(self):
919         """ test poll sequence queueing """
920         bfd_session_up(self)
921         p = wait_for_bfd_packet(self)
922         self.vpp_session.modify_parameters(
923             required_min_rx=2 * self.vpp_session.required_min_rx)
924         p = wait_for_bfd_packet(self)
925         poll_sequence_start = time.time()
926         poll_sequence_length_min = 0.5
927         send_final_after = time.time() + poll_sequence_length_min
928         # poll bit needs to be set
929         self.assertIn("P", p.sprintf("%BFD.flags%"),
930                       "Poll bit not set in BFD packet")
931         self.assert_equal(p[BFD].required_min_rx_interval,
932                           self.vpp_session.required_min_rx,
933                           "BFD required min rx interval")
934         self.vpp_session.modify_parameters(
935             required_min_rx=2 * self.vpp_session.required_min_rx)
936         # 2nd poll sequence should be queued now
937         # don't send the reply back yet, wait for some time to emulate
938         # longer round-trip time
939         packet_count = 0
940         while time.time() < send_final_after:
941             self.test_session.send_packet()
942             p = wait_for_bfd_packet(self)
943             self.assert_equal(len(self.vapi.collect_events()), 0,
944                               "number of bfd events")
945             self.assert_equal(p[BFD].required_min_rx_interval,
946                               self.vpp_session.required_min_rx,
947                               "BFD required min rx interval")
948             packet_count += 1
949             # poll bit must be set
950             self.assertIn("P", p.sprintf("%BFD.flags%"),
951                           "Poll bit not set in BFD packet")
952         final = self.test_session.create_packet()
953         final[BFD].flags = "F"
954         self.test_session.send_packet(final)
955         # finish 1st with final
956         poll_sequence_length = time.time() - poll_sequence_start
957         # vpp must wait for some time before starting new poll sequence
958         poll_no_2_started = False
959         for dummy in range(2 * packet_count):
960             p = wait_for_bfd_packet(self)
961             self.assert_equal(len(self.vapi.collect_events()), 0,
962                               "number of bfd events")
963             if "P" in p.sprintf("%BFD.flags%"):
964                 poll_no_2_started = True
965                 if time.time() < poll_sequence_start + poll_sequence_length:
966                     raise Exception("VPP started 2nd poll sequence too soon")
967                 final = self.test_session.create_packet()
968                 final[BFD].flags = "F"
969                 self.test_session.send_packet(final)
970                 break
971             else:
972                 self.test_session.send_packet()
973         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
974         # finish 2nd with final
975         final = self.test_session.create_packet()
976         final[BFD].flags = "F"
977         self.test_session.send_packet(final)
978         p = wait_for_bfd_packet(self)
979         # poll bit must not be set
980         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
981                          "Poll bit set in BFD packet")
982
983     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
984     def test_poll_response(self):
985         """ test correct response to control frame with poll bit set """
986         bfd_session_up(self)
987         poll = self.test_session.create_packet()
988         poll[BFD].flags = "P"
989         self.test_session.send_packet(poll)
990         final = wait_for_bfd_packet(
991             self, pcap_time_min=time.time() - self.vpp_clock_offset)
992         self.assertIn("F", final.sprintf("%BFD.flags%"))
993
994     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
995     def test_no_periodic_if_remote_demand(self):
996         """ no periodic frames outside poll sequence if remote demand set """
997         bfd_session_up(self)
998         demand = self.test_session.create_packet()
999         demand[BFD].flags = "D"
1000         self.test_session.send_packet(demand)
1001         transmit_time = 0.9 \
1002             * max(self.vpp_session.required_min_rx,
1003                   self.test_session.desired_min_tx) \
1004             / USEC_IN_SEC
1005         count = 0
1006         for dummy in range(self.test_session.detect_mult * 2):
1007             time.sleep(transmit_time)
1008             self.test_session.send_packet(demand)
1009             try:
1010                 p = wait_for_bfd_packet(self, timeout=0)
1011                 self.logger.error(ppp("Received unexpected packet:", p))
1012                 count += 1
1013             except CaptureTimeoutError:
1014                 pass
1015         events = self.vapi.collect_events()
1016         for e in events:
1017             self.logger.error("Received unexpected event: %s", e)
1018         self.assert_equal(count, 0, "number of packets received")
1019         self.assert_equal(len(events), 0, "number of events received")
1020
1021     def test_echo_looped_back(self):
1022         """ echo packets looped back """
1023         # don't need a session in this case..
1024         self.vpp_session.remove_vpp_config()
1025         self.pg0.enable_capture()
1026         echo_packet_count = 10
1027         # random source port low enough to increment a few times..
1028         udp_sport_tx = randint(1, 50000)
1029         udp_sport_rx = udp_sport_tx
1030         echo_packet = (Ether(src=self.pg0.remote_mac,
1031                              dst=self.pg0.local_mac) /
1032                        IP(src=self.pg0.remote_ip4,
1033                           dst=self.pg0.remote_ip4) /
1034                        UDP(dport=BFD.udp_dport_echo) /
1035                        Raw("this should be looped back"))
1036         for dummy in range(echo_packet_count):
1037             self.sleep(.01, "delay between echo packets")
1038             echo_packet[UDP].sport = udp_sport_tx
1039             udp_sport_tx += 1
1040             self.logger.debug(ppp("Sending packet:", echo_packet))
1041             self.pg0.add_stream(echo_packet)
1042             self.pg_start()
1043         for dummy in range(echo_packet_count):
1044             p = self.pg0.wait_for_packet(1)
1045             self.logger.debug(ppp("Got packet:", p))
1046             ether = p[Ether]
1047             self.assert_equal(self.pg0.remote_mac,
1048                               ether.dst, "Destination MAC")
1049             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1050             ip = p[IP]
1051             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1052             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1053             udp = p[UDP]
1054             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1055                               "UDP destination port")
1056             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1057             udp_sport_rx += 1
1058             # need to compare the hex payload here, otherwise BFD_vpp_echo
1059             # gets in way
1060             self.assertEqual(str(p[UDP].payload),
1061                              str(echo_packet[UDP].payload),
1062                              "Received packet is not the echo packet sent")
1063         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1064                           "ECHO packet identifier for test purposes)")
1065
1066     def test_echo(self):
1067         """ echo function """
1068         bfd_session_up(self)
1069         self.test_session.update(required_min_echo_rx=50000)
1070         self.test_session.send_packet()
1071         detection_time = self.test_session.detect_mult *\
1072             self.vpp_session.required_min_rx / USEC_IN_SEC
1073         # echo shouldn't work without echo source set
1074         for dummy in range(3):
1075             sleep = 0.75 * detection_time
1076             self.sleep(sleep, "delay before sending bfd packet")
1077             self.test_session.send_packet()
1078         p = wait_for_bfd_packet(
1079             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1080         self.assert_equal(p[BFD].required_min_rx_interval,
1081                           self.vpp_session.required_min_rx,
1082                           "BFD required min rx interval")
1083         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1084         # should be turned on - loopback echo packets
1085         for dummy in range(3):
1086             loop_until = time.time() + 0.75 * detection_time
1087             while time.time() < loop_until:
1088                 p = self.pg0.wait_for_packet(1)
1089                 self.logger.debug(ppp("Got packet:", p))
1090                 if p[UDP].dport == BFD.udp_dport_echo:
1091                     self.assert_equal(
1092                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1093                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1094                                         "BFD ECHO src IP equal to loopback IP")
1095                     self.logger.debug(ppp("Looping back packet:", p))
1096                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1097                                       "ECHO packet destination MAC address")
1098                     p[Ether].dst = self.pg0.local_mac
1099                     self.pg0.add_stream(p)
1100                     self.pg_start()
1101                 elif p.haslayer(BFD):
1102                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1103                                             1000000)
1104                     if "P" in p.sprintf("%BFD.flags%"):
1105                         final = self.test_session.create_packet()
1106                         final[BFD].flags = "F"
1107                         self.test_session.send_packet(final)
1108                 else:
1109                     raise Exception(ppp("Received unknown packet:", p))
1110
1111                 self.assert_equal(len(self.vapi.collect_events()), 0,
1112                                   "number of bfd events")
1113             self.test_session.send_packet()
1114
1115     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1116     def test_echo_fail(self):
1117         """ session goes down if echo function fails """
1118         bfd_session_up(self)
1119         self.test_session.update(required_min_echo_rx=50000)
1120         self.test_session.send_packet()
1121         detection_time = self.test_session.detect_mult *\
1122             self.vpp_session.required_min_rx / USEC_IN_SEC
1123         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1124         # echo function should be used now, but we will drop the echo packets
1125         verified_diag = False
1126         for dummy in range(3):
1127             loop_until = time.time() + 0.75 * detection_time
1128             while time.time() < loop_until:
1129                 p = self.pg0.wait_for_packet(1)
1130                 self.logger.debug(ppp("Got packet:", p))
1131                 if p[UDP].dport == BFD.udp_dport_echo:
1132                     # dropped
1133                     pass
1134                 elif p.haslayer(BFD):
1135                     if "P" in p.sprintf("%BFD.flags%"):
1136                         self.assertGreaterEqual(
1137                             p[BFD].required_min_rx_interval,
1138                             1000000)
1139                         final = self.test_session.create_packet()
1140                         final[BFD].flags = "F"
1141                         self.test_session.send_packet(final)
1142                     if p[BFD].state == BFDState.down:
1143                         self.assert_equal(p[BFD].diag,
1144                                           BFDDiagCode.echo_function_failed,
1145                                           BFDDiagCode)
1146                         verified_diag = True
1147                 else:
1148                     raise Exception(ppp("Received unknown packet:", p))
1149             self.test_session.send_packet()
1150         events = self.vapi.collect_events()
1151         self.assert_equal(len(events), 1, "number of bfd events")
1152         self.assert_equal(events[0].state, BFDState.down, BFDState)
1153         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1154
1155     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1156     def test_echo_stop(self):
1157         """ echo function stops if peer sets required min echo rx zero """
1158         bfd_session_up(self)
1159         self.test_session.update(required_min_echo_rx=50000)
1160         self.test_session.send_packet()
1161         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1162         # wait for first echo packet
1163         while True:
1164             p = self.pg0.wait_for_packet(1)
1165             self.logger.debug(ppp("Got packet:", p))
1166             if p[UDP].dport == BFD.udp_dport_echo:
1167                 self.logger.debug(ppp("Looping back packet:", p))
1168                 p[Ether].dst = self.pg0.local_mac
1169                 self.pg0.add_stream(p)
1170                 self.pg_start()
1171                 break
1172             elif p.haslayer(BFD):
1173                 # ignore BFD
1174                 pass
1175             else:
1176                 raise Exception(ppp("Received unknown packet:", p))
1177         self.test_session.update(required_min_echo_rx=0)
1178         self.test_session.send_packet()
1179         # echo packets shouldn't arrive anymore
1180         for dummy in range(5):
1181             wait_for_bfd_packet(
1182                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1183             self.test_session.send_packet()
1184             events = self.vapi.collect_events()
1185             self.assert_equal(len(events), 0, "number of bfd events")
1186
1187     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1188     def test_echo_source_removed(self):
1189         """ echo function stops if echo source is removed """
1190         bfd_session_up(self)
1191         self.test_session.update(required_min_echo_rx=50000)
1192         self.test_session.send_packet()
1193         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1194         # wait for first echo packet
1195         while True:
1196             p = self.pg0.wait_for_packet(1)
1197             self.logger.debug(ppp("Got packet:", p))
1198             if p[UDP].dport == BFD.udp_dport_echo:
1199                 self.logger.debug(ppp("Looping back packet:", p))
1200                 p[Ether].dst = self.pg0.local_mac
1201                 self.pg0.add_stream(p)
1202                 self.pg_start()
1203                 break
1204             elif p.haslayer(BFD):
1205                 # ignore BFD
1206                 pass
1207             else:
1208                 raise Exception(ppp("Received unknown packet:", p))
1209         self.vapi.bfd_udp_del_echo_source()
1210         self.test_session.send_packet()
1211         # echo packets shouldn't arrive anymore
1212         for dummy in range(5):
1213             wait_for_bfd_packet(
1214                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1215             self.test_session.send_packet()
1216             events = self.vapi.collect_events()
1217             self.assert_equal(len(events), 0, "number of bfd events")
1218
1219     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1220     def test_stale_echo(self):
1221         """ stale echo packets don't keep a session up """
1222         bfd_session_up(self)
1223         self.test_session.update(required_min_echo_rx=50000)
1224         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1225         self.test_session.send_packet()
1226         # should be turned on - loopback echo packets
1227         echo_packet = None
1228         timeout_at = None
1229         timeout_ok = False
1230         for dummy in range(10 * self.vpp_session.detect_mult):
1231             p = self.pg0.wait_for_packet(1)
1232             if p[UDP].dport == BFD.udp_dport_echo:
1233                 if echo_packet is None:
1234                     self.logger.debug(ppp("Got first echo packet:", p))
1235                     echo_packet = p
1236                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1237                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1238                 else:
1239                     self.logger.debug(ppp("Got followup echo packet:", p))
1240                 self.logger.debug(ppp("Looping back first echo packet:", p))
1241                 echo_packet[Ether].dst = self.pg0.local_mac
1242                 self.pg0.add_stream(echo_packet)
1243                 self.pg_start()
1244             elif p.haslayer(BFD):
1245                 self.logger.debug(ppp("Got packet:", p))
1246                 if "P" in p.sprintf("%BFD.flags%"):
1247                     final = self.test_session.create_packet()
1248                     final[BFD].flags = "F"
1249                     self.test_session.send_packet(final)
1250                 if p[BFD].state == BFDState.down:
1251                     self.assertIsNotNone(
1252                         timeout_at,
1253                         "Session went down before first echo packet received")
1254                     now = time.time()
1255                     self.assertGreaterEqual(
1256                         now, timeout_at,
1257                         "Session timeout at %s, but is expected at %s" %
1258                         (now, timeout_at))
1259                     self.assert_equal(p[BFD].diag,
1260                                       BFDDiagCode.echo_function_failed,
1261                                       BFDDiagCode)
1262                     events = self.vapi.collect_events()
1263                     self.assert_equal(len(events), 1, "number of bfd events")
1264                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1265                     timeout_ok = True
1266                     break
1267             else:
1268                 raise Exception(ppp("Received unknown packet:", p))
1269             self.test_session.send_packet()
1270         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1271
1272     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1273     def test_invalid_echo_checksum(self):
1274         """ echo packets with invalid checksum don't keep a session up """
1275         bfd_session_up(self)
1276         self.test_session.update(required_min_echo_rx=50000)
1277         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1278         self.test_session.send_packet()
1279         # should be turned on - loopback echo packets
1280         timeout_at = None
1281         timeout_ok = False
1282         for dummy in range(10 * self.vpp_session.detect_mult):
1283             p = self.pg0.wait_for_packet(1)
1284             if p[UDP].dport == BFD.udp_dport_echo:
1285                 self.logger.debug(ppp("Got echo packet:", p))
1286                 if timeout_at is None:
1287                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1288                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1289                 p[BFD_vpp_echo].checksum = getrandbits(64)
1290                 p[Ether].dst = self.pg0.local_mac
1291                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1292                 self.pg0.add_stream(p)
1293                 self.pg_start()
1294             elif p.haslayer(BFD):
1295                 self.logger.debug(ppp("Got packet:", p))
1296                 if "P" in p.sprintf("%BFD.flags%"):
1297                     final = self.test_session.create_packet()
1298                     final[BFD].flags = "F"
1299                     self.test_session.send_packet(final)
1300                 if p[BFD].state == BFDState.down:
1301                     self.assertIsNotNone(
1302                         timeout_at,
1303                         "Session went down before first echo packet received")
1304                     now = time.time()
1305                     self.assertGreaterEqual(
1306                         now, timeout_at,
1307                         "Session timeout at %s, but is expected at %s" %
1308                         (now, timeout_at))
1309                     self.assert_equal(p[BFD].diag,
1310                                       BFDDiagCode.echo_function_failed,
1311                                       BFDDiagCode)
1312                     events = self.vapi.collect_events()
1313                     self.assert_equal(len(events), 1, "number of bfd events")
1314                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1315                     timeout_ok = True
1316                     break
1317             else:
1318                 raise Exception(ppp("Received unknown packet:", p))
1319             self.test_session.send_packet()
1320         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1321
1322     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1323     def test_admin_up_down(self):
1324         """ put session admin-up and admin-down """
1325         bfd_session_up(self)
1326         self.vpp_session.admin_down()
1327         self.pg0.enable_capture()
1328         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1329         verify_event(self, e, expected_state=BFDState.admin_down)
1330         for dummy in range(2):
1331             p = wait_for_bfd_packet(self)
1332             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1333         # try to bring session up - shouldn't be possible
1334         self.test_session.update(state=BFDState.init)
1335         self.test_session.send_packet()
1336         for dummy in range(2):
1337             p = wait_for_bfd_packet(self)
1338             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1339         self.vpp_session.admin_up()
1340         self.test_session.update(state=BFDState.down)
1341         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1342         verify_event(self, e, expected_state=BFDState.down)
1343         p = wait_for_bfd_packet(
1344             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1345         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1346         self.test_session.send_packet()
1347         p = wait_for_bfd_packet(
1348             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1349         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1350         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1351         verify_event(self, e, expected_state=BFDState.init)
1352         self.test_session.update(state=BFDState.up)
1353         self.test_session.send_packet()
1354         p = wait_for_bfd_packet(
1355             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1356         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1357         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1358         verify_event(self, e, expected_state=BFDState.up)
1359
1360     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1361     def test_config_change_remote_demand(self):
1362         """ configuration change while peer in demand mode """
1363         bfd_session_up(self)
1364         demand = self.test_session.create_packet()
1365         demand[BFD].flags = "D"
1366         self.test_session.send_packet(demand)
1367         self.vpp_session.modify_parameters(
1368             required_min_rx=2 * self.vpp_session.required_min_rx)
1369         p = wait_for_bfd_packet(
1370             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1371         # poll bit must be set
1372         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1373         # terminate poll sequence
1374         final = self.test_session.create_packet()
1375         final[BFD].flags = "D+F"
1376         self.test_session.send_packet(final)
1377         # vpp should be quiet now again
1378         transmit_time = 0.9 \
1379             * max(self.vpp_session.required_min_rx,
1380                   self.test_session.desired_min_tx) \
1381             / USEC_IN_SEC
1382         count = 0
1383         for dummy in range(self.test_session.detect_mult * 2):
1384             time.sleep(transmit_time)
1385             self.test_session.send_packet(demand)
1386             try:
1387                 p = wait_for_bfd_packet(self, timeout=0)
1388                 self.logger.error(ppp("Received unexpected packet:", p))
1389                 count += 1
1390             except CaptureTimeoutError:
1391                 pass
1392         events = self.vapi.collect_events()
1393         for e in events:
1394             self.logger.error("Received unexpected event: %s", e)
1395         self.assert_equal(count, 0, "number of packets received")
1396         self.assert_equal(len(events), 0, "number of events received")
1397
1398
1399 class BFD6TestCase(VppTestCase):
1400     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1401
1402     pg0 = None
1403     vpp_clock_offset = None
1404     vpp_session = None
1405     test_session = None
1406
1407     @classmethod
1408     def setUpClass(cls):
1409         super(BFD6TestCase, cls).setUpClass()
1410         try:
1411             cls.create_pg_interfaces([0])
1412             cls.pg0.config_ip6()
1413             cls.pg0.configure_ipv6_neighbors()
1414             cls.pg0.admin_up()
1415             cls.pg0.resolve_ndp()
1416             cls.create_loopback_interfaces([0])
1417             cls.loopback0 = cls.lo_interfaces[0]
1418             cls.loopback0.config_ip6()
1419             cls.loopback0.admin_up()
1420
1421         except Exception:
1422             super(BFD6TestCase, cls).tearDownClass()
1423             raise
1424
1425     def setUp(self):
1426         super(BFD6TestCase, self).setUp()
1427         self.factory = AuthKeyFactory()
1428         self.vapi.want_bfd_events()
1429         self.pg0.enable_capture()
1430         try:
1431             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1432                                                 self.pg0.remote_ip6,
1433                                                 af=AF_INET6)
1434             self.vpp_session.add_vpp_config()
1435             self.vpp_session.admin_up()
1436             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1437             self.logger.debug(self.vapi.cli("show adj nbr"))
1438         except:
1439             self.vapi.want_bfd_events(enable_disable=0)
1440             raise
1441
1442     def tearDown(self):
1443         if not self.vpp_dead:
1444             self.vapi.want_bfd_events(enable_disable=0)
1445         self.vapi.collect_events()  # clear the event queue
1446         super(BFD6TestCase, self).tearDown()
1447
1448     def test_session_up(self):
1449         """ bring BFD session up """
1450         bfd_session_up(self)
1451
1452     def test_session_up_by_ip(self):
1453         """ bring BFD session up - first frame looked up by address pair """
1454         self.logger.info("BFD: Sending Slow control frame")
1455         self.test_session.update(my_discriminator=randint(0, 40000000))
1456         self.test_session.send_packet()
1457         self.pg0.enable_capture()
1458         p = self.pg0.wait_for_packet(1)
1459         self.assert_equal(p[BFD].your_discriminator,
1460                           self.test_session.my_discriminator,
1461                           "BFD - your discriminator")
1462         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1463         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1464                                  state=BFDState.up)
1465         self.logger.info("BFD: Waiting for event")
1466         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1467         verify_event(self, e, expected_state=BFDState.init)
1468         self.logger.info("BFD: Sending Up")
1469         self.test_session.send_packet()
1470         self.logger.info("BFD: Waiting for event")
1471         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1472         verify_event(self, e, expected_state=BFDState.up)
1473         self.logger.info("BFD: Session is Up")
1474         self.test_session.update(state=BFDState.up)
1475         self.test_session.send_packet()
1476         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1477
1478     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1479     def test_hold_up(self):
1480         """ hold BFD session up """
1481         bfd_session_up(self)
1482         for dummy in range(self.test_session.detect_mult * 2):
1483             wait_for_bfd_packet(self)
1484             self.test_session.send_packet()
1485         self.assert_equal(len(self.vapi.collect_events()), 0,
1486                           "number of bfd events")
1487         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1488
1489     def test_echo_looped_back(self):
1490         """ echo packets looped back """
1491         # don't need a session in this case..
1492         self.vpp_session.remove_vpp_config()
1493         self.pg0.enable_capture()
1494         echo_packet_count = 10
1495         # random source port low enough to increment a few times..
1496         udp_sport_tx = randint(1, 50000)
1497         udp_sport_rx = udp_sport_tx
1498         echo_packet = (Ether(src=self.pg0.remote_mac,
1499                              dst=self.pg0.local_mac) /
1500                        IPv6(src=self.pg0.remote_ip6,
1501                             dst=self.pg0.remote_ip6) /
1502                        UDP(dport=BFD.udp_dport_echo) /
1503                        Raw("this should be looped back"))
1504         for dummy in range(echo_packet_count):
1505             self.sleep(.01, "delay between echo packets")
1506             echo_packet[UDP].sport = udp_sport_tx
1507             udp_sport_tx += 1
1508             self.logger.debug(ppp("Sending packet:", echo_packet))
1509             self.pg0.add_stream(echo_packet)
1510             self.pg_start()
1511         for dummy in range(echo_packet_count):
1512             p = self.pg0.wait_for_packet(1)
1513             self.logger.debug(ppp("Got packet:", p))
1514             ether = p[Ether]
1515             self.assert_equal(self.pg0.remote_mac,
1516                               ether.dst, "Destination MAC")
1517             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1518             ip = p[IPv6]
1519             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1520             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1521             udp = p[UDP]
1522             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1523                               "UDP destination port")
1524             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1525             udp_sport_rx += 1
1526             # need to compare the hex payload here, otherwise BFD_vpp_echo
1527             # gets in way
1528             self.assertEqual(str(p[UDP].payload),
1529                              str(echo_packet[UDP].payload),
1530                              "Received packet is not the echo packet sent")
1531         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1532                           "ECHO packet identifier for test purposes)")
1533         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1534                           "ECHO packet identifier for test purposes)")
1535
1536     def test_echo(self):
1537         """ echo function used """
1538         bfd_session_up(self)
1539         self.test_session.update(required_min_echo_rx=50000)
1540         self.test_session.send_packet()
1541         detection_time = self.test_session.detect_mult *\
1542             self.vpp_session.required_min_rx / USEC_IN_SEC
1543         # echo shouldn't work without echo source set
1544         for dummy in range(3):
1545             sleep = 0.75 * detection_time
1546             self.sleep(sleep, "delay before sending bfd packet")
1547             self.test_session.send_packet()
1548         p = wait_for_bfd_packet(
1549             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1550         self.assert_equal(p[BFD].required_min_rx_interval,
1551                           self.vpp_session.required_min_rx,
1552                           "BFD required min rx interval")
1553         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1554         # should be turned on - loopback echo packets
1555         for dummy in range(3):
1556             loop_until = time.time() + 0.75 * detection_time
1557             while time.time() < loop_until:
1558                 p = self.pg0.wait_for_packet(1)
1559                 self.logger.debug(ppp("Got packet:", p))
1560                 if p[UDP].dport == BFD.udp_dport_echo:
1561                     self.assert_equal(
1562                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1563                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1564                                         "BFD ECHO src IP equal to loopback IP")
1565                     self.logger.debug(ppp("Looping back packet:", p))
1566                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1567                                       "ECHO packet destination MAC address")
1568                     p[Ether].dst = self.pg0.local_mac
1569                     self.pg0.add_stream(p)
1570                     self.pg_start()
1571                 elif p.haslayer(BFD):
1572                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1573                                             1000000)
1574                     if "P" in p.sprintf("%BFD.flags%"):
1575                         final = self.test_session.create_packet()
1576                         final[BFD].flags = "F"
1577                         self.test_session.send_packet(final)
1578                 else:
1579                     raise Exception(ppp("Received unknown packet:", p))
1580
1581                 self.assert_equal(len(self.vapi.collect_events()), 0,
1582                                   "number of bfd events")
1583             self.test_session.send_packet()
1584
1585
1586 class BFDFIBTestCase(VppTestCase):
1587     """ BFD-FIB interactions (IPv6) """
1588
1589     vpp_session = None
1590     test_session = None
1591
1592     def setUp(self):
1593         super(BFDFIBTestCase, self).setUp()
1594         self.create_pg_interfaces(range(1))
1595
1596         self.vapi.want_bfd_events()
1597         self.pg0.enable_capture()
1598
1599         for i in self.pg_interfaces:
1600             i.admin_up()
1601             i.config_ip6()
1602             i.configure_ipv6_neighbors()
1603
1604     def tearDown(self):
1605         if not self.vpp_dead:
1606             self.vapi.want_bfd_events(enable_disable=0)
1607
1608         super(BFDFIBTestCase, self).tearDown()
1609
1610     @staticmethod
1611     def pkt_is_not_data_traffic(p):
1612         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1613         if p.haslayer(BFD) or is_ipv6_misc(p):
1614             return True
1615         return False
1616
1617     def test_session_with_fib(self):
1618         """ BFD-FIB interactions """
1619
1620         # packets to match against both of the routes
1621         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1622              IPv6(src="3001::1", dst="2001::1") /
1623              UDP(sport=1234, dport=1234) /
1624               Raw('\xa5' * 100)),
1625              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1626               IPv6(src="3001::1", dst="2002::1") /
1627               UDP(sport=1234, dport=1234) /
1628               Raw('\xa5' * 100))]
1629
1630         # A recursive and a non-recursive route via a next-hop that
1631         # will have a BFD session
1632         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1633                                   [VppRoutePath(self.pg0.remote_ip6,
1634                                                 self.pg0.sw_if_index,
1635                                                 is_ip6=1)],
1636                                   is_ip6=1)
1637         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1638                                   [VppRoutePath(self.pg0.remote_ip6,
1639                                                 0xffffffff,
1640                                                 is_ip6=1)],
1641                                   is_ip6=1)
1642         ip_2001_s_64.add_vpp_config()
1643         ip_2002_s_64.add_vpp_config()
1644
1645         # bring the session up now the routes are present
1646         self.vpp_session = VppBFDUDPSession(self,
1647                                             self.pg0,
1648                                             self.pg0.remote_ip6,
1649                                             af=AF_INET6)
1650         self.vpp_session.add_vpp_config()
1651         self.vpp_session.admin_up()
1652         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1653
1654         # session is up - traffic passes
1655         bfd_session_up(self)
1656
1657         self.pg0.add_stream(p)
1658         self.pg_start()
1659         for packet in p:
1660             captured = self.pg0.wait_for_packet(
1661                 1,
1662                 filter_out_fn=self.pkt_is_not_data_traffic)
1663             self.assertEqual(captured[IPv6].dst,
1664                              packet[IPv6].dst)
1665
1666         # session is up - traffic is dropped
1667         bfd_session_down(self)
1668
1669         self.pg0.add_stream(p)
1670         self.pg_start()
1671         with self.assertRaises(CaptureTimeoutError):
1672             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1673
1674         # session is up - traffic passes
1675         bfd_session_up(self)
1676
1677         self.pg0.add_stream(p)
1678         self.pg_start()
1679         for packet in p:
1680             captured = self.pg0.wait_for_packet(
1681                 1,
1682                 filter_out_fn=self.pkt_is_not_data_traffic)
1683             self.assertEqual(captured[IPv6].dst,
1684                              packet[IPv6].dst)
1685
1686
1687 class BFDSHA1TestCase(VppTestCase):
1688     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1689
1690     pg0 = None
1691     vpp_clock_offset = None
1692     vpp_session = None
1693     test_session = None
1694
1695     @classmethod
1696     def setUpClass(cls):
1697         super(BFDSHA1TestCase, cls).setUpClass()
1698         try:
1699             cls.create_pg_interfaces([0])
1700             cls.pg0.config_ip4()
1701             cls.pg0.admin_up()
1702             cls.pg0.resolve_arp()
1703
1704         except Exception:
1705             super(BFDSHA1TestCase, cls).tearDownClass()
1706             raise
1707
1708     def setUp(self):
1709         super(BFDSHA1TestCase, self).setUp()
1710         self.factory = AuthKeyFactory()
1711         self.vapi.want_bfd_events()
1712         self.pg0.enable_capture()
1713
1714     def tearDown(self):
1715         if not self.vpp_dead:
1716             self.vapi.want_bfd_events(enable_disable=0)
1717         self.vapi.collect_events()  # clear the event queue
1718         super(BFDSHA1TestCase, self).tearDown()
1719
1720     def test_session_up(self):
1721         """ bring BFD session up """
1722         key = self.factory.create_random_key(self)
1723         key.add_vpp_config()
1724         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1725                                             self.pg0.remote_ip4,
1726                                             sha1_key=key)
1727         self.vpp_session.add_vpp_config()
1728         self.vpp_session.admin_up()
1729         self.test_session = BFDTestSession(
1730             self, self.pg0, AF_INET, sha1_key=key,
1731             bfd_key_id=self.vpp_session.bfd_key_id)
1732         bfd_session_up(self)
1733
1734     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1735     def test_hold_up(self):
1736         """ hold BFD session up """
1737         key = self.factory.create_random_key(self)
1738         key.add_vpp_config()
1739         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1740                                             self.pg0.remote_ip4,
1741                                             sha1_key=key)
1742         self.vpp_session.add_vpp_config()
1743         self.vpp_session.admin_up()
1744         self.test_session = BFDTestSession(
1745             self, self.pg0, AF_INET, sha1_key=key,
1746             bfd_key_id=self.vpp_session.bfd_key_id)
1747         bfd_session_up(self)
1748         for dummy in range(self.test_session.detect_mult * 2):
1749             wait_for_bfd_packet(self)
1750             self.test_session.send_packet()
1751         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1752
1753     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1754     def test_hold_up_meticulous(self):
1755         """ hold BFD session up - meticulous auth """
1756         key = self.factory.create_random_key(
1757             self, BFDAuthType.meticulous_keyed_sha1)
1758         key.add_vpp_config()
1759         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1760                                             self.pg0.remote_ip4, sha1_key=key)
1761         self.vpp_session.add_vpp_config()
1762         self.vpp_session.admin_up()
1763         # specify sequence number so that it wraps
1764         self.test_session = BFDTestSession(
1765             self, self.pg0, AF_INET, sha1_key=key,
1766             bfd_key_id=self.vpp_session.bfd_key_id,
1767             our_seq_number=0xFFFFFFFF - 4)
1768         bfd_session_up(self)
1769         for dummy in range(30):
1770             wait_for_bfd_packet(self)
1771             self.test_session.inc_seq_num()
1772             self.test_session.send_packet()
1773         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1774
1775     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1776     def test_send_bad_seq_number(self):
1777         """ session is not kept alive by msgs with bad sequence numbers"""
1778         key = self.factory.create_random_key(
1779             self, BFDAuthType.meticulous_keyed_sha1)
1780         key.add_vpp_config()
1781         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1782                                             self.pg0.remote_ip4, sha1_key=key)
1783         self.vpp_session.add_vpp_config()
1784         self.test_session = BFDTestSession(
1785             self, self.pg0, AF_INET, sha1_key=key,
1786             bfd_key_id=self.vpp_session.bfd_key_id)
1787         bfd_session_up(self)
1788         detection_time = self.test_session.detect_mult *\
1789             self.vpp_session.required_min_rx / USEC_IN_SEC
1790         send_until = time.time() + 2 * detection_time
1791         while time.time() < send_until:
1792             self.test_session.send_packet()
1793             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1794                        "time between bfd packets")
1795         e = self.vapi.collect_events()
1796         # session should be down now, because the sequence numbers weren't
1797         # updated
1798         self.assert_equal(len(e), 1, "number of bfd events")
1799         verify_event(self, e[0], expected_state=BFDState.down)
1800
1801     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1802                                        legitimate_test_session,
1803                                        rogue_test_session,
1804                                        rogue_bfd_values=None):
1805         """ execute a rogue session interaction scenario
1806
1807         1. create vpp session, add config
1808         2. bring the legitimate session up
1809         3. copy the bfd values from legitimate session to rogue session
1810         4. apply rogue_bfd_values to rogue session
1811         5. set rogue session state to down
1812         6. send message to take the session down from the rogue session
1813         7. assert that the legitimate session is unaffected
1814         """
1815
1816         self.vpp_session = vpp_bfd_udp_session
1817         self.vpp_session.add_vpp_config()
1818         self.test_session = legitimate_test_session
1819         # bring vpp session up
1820         bfd_session_up(self)
1821         # send packet from rogue session
1822         rogue_test_session.update(
1823             my_discriminator=self.test_session.my_discriminator,
1824             your_discriminator=self.test_session.your_discriminator,
1825             desired_min_tx=self.test_session.desired_min_tx,
1826             required_min_rx=self.test_session.required_min_rx,
1827             detect_mult=self.test_session.detect_mult,
1828             diag=self.test_session.diag,
1829             state=self.test_session.state,
1830             auth_type=self.test_session.auth_type)
1831         if rogue_bfd_values:
1832             rogue_test_session.update(**rogue_bfd_values)
1833         rogue_test_session.update(state=BFDState.down)
1834         rogue_test_session.send_packet()
1835         wait_for_bfd_packet(self)
1836         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1837
1838     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1839     def test_mismatch_auth(self):
1840         """ session is not brought down by unauthenticated msg """
1841         key = self.factory.create_random_key(self)
1842         key.add_vpp_config()
1843         vpp_session = VppBFDUDPSession(
1844             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1845         legitimate_test_session = BFDTestSession(
1846             self, self.pg0, AF_INET, sha1_key=key,
1847             bfd_key_id=vpp_session.bfd_key_id)
1848         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1849         self.execute_rogue_session_scenario(vpp_session,
1850                                             legitimate_test_session,
1851                                             rogue_test_session)
1852
1853     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1854     def test_mismatch_bfd_key_id(self):
1855         """ session is not brought down by msg with non-existent key-id """
1856         key = self.factory.create_random_key(self)
1857         key.add_vpp_config()
1858         vpp_session = VppBFDUDPSession(
1859             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1860         # pick a different random bfd key id
1861         x = randint(0, 255)
1862         while x == vpp_session.bfd_key_id:
1863             x = randint(0, 255)
1864         legitimate_test_session = BFDTestSession(
1865             self, self.pg0, AF_INET, sha1_key=key,
1866             bfd_key_id=vpp_session.bfd_key_id)
1867         rogue_test_session = BFDTestSession(
1868             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1869         self.execute_rogue_session_scenario(vpp_session,
1870                                             legitimate_test_session,
1871                                             rogue_test_session)
1872
1873     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1874     def test_mismatched_auth_type(self):
1875         """ session is not brought down by msg with wrong auth type """
1876         key = self.factory.create_random_key(self)
1877         key.add_vpp_config()
1878         vpp_session = VppBFDUDPSession(
1879             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1880         legitimate_test_session = BFDTestSession(
1881             self, self.pg0, AF_INET, sha1_key=key,
1882             bfd_key_id=vpp_session.bfd_key_id)
1883         rogue_test_session = BFDTestSession(
1884             self, self.pg0, AF_INET, sha1_key=key,
1885             bfd_key_id=vpp_session.bfd_key_id)
1886         self.execute_rogue_session_scenario(
1887             vpp_session, legitimate_test_session, rogue_test_session,
1888             {'auth_type': BFDAuthType.keyed_md5})
1889
1890     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1891     def test_restart(self):
1892         """ simulate remote peer restart and resynchronization """
1893         key = self.factory.create_random_key(
1894             self, BFDAuthType.meticulous_keyed_sha1)
1895         key.add_vpp_config()
1896         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1897                                             self.pg0.remote_ip4, sha1_key=key)
1898         self.vpp_session.add_vpp_config()
1899         self.test_session = BFDTestSession(
1900             self, self.pg0, AF_INET, sha1_key=key,
1901             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1902         bfd_session_up(self)
1903         # don't send any packets for 2*detection_time
1904         detection_time = self.test_session.detect_mult *\
1905             self.vpp_session.required_min_rx / USEC_IN_SEC
1906         self.sleep(2 * detection_time, "simulating peer restart")
1907         events = self.vapi.collect_events()
1908         self.assert_equal(len(events), 1, "number of bfd events")
1909         verify_event(self, events[0], expected_state=BFDState.down)
1910         self.test_session.update(state=BFDState.down)
1911         # reset sequence number
1912         self.test_session.our_seq_number = 0
1913         self.test_session.vpp_seq_number = None
1914         # now throw away any pending packets
1915         self.pg0.enable_capture()
1916         bfd_session_up(self)
1917
1918
1919 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1920 class BFDAuthOnOffTestCase(VppTestCase):
1921     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1922
1923     pg0 = None
1924     vpp_session = None
1925     test_session = None
1926
1927     @classmethod
1928     def setUpClass(cls):
1929         super(BFDAuthOnOffTestCase, cls).setUpClass()
1930         try:
1931             cls.create_pg_interfaces([0])
1932             cls.pg0.config_ip4()
1933             cls.pg0.admin_up()
1934             cls.pg0.resolve_arp()
1935
1936         except Exception:
1937             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1938             raise
1939
1940     def setUp(self):
1941         super(BFDAuthOnOffTestCase, self).setUp()
1942         self.factory = AuthKeyFactory()
1943         self.vapi.want_bfd_events()
1944         self.pg0.enable_capture()
1945
1946     def tearDown(self):
1947         if not self.vpp_dead:
1948             self.vapi.want_bfd_events(enable_disable=0)
1949         self.vapi.collect_events()  # clear the event queue
1950         super(BFDAuthOnOffTestCase, self).tearDown()
1951
1952     def test_auth_on_immediate(self):
1953         """ turn auth on without disturbing session state (immediate) """
1954         key = self.factory.create_random_key(self)
1955         key.add_vpp_config()
1956         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1957                                             self.pg0.remote_ip4)
1958         self.vpp_session.add_vpp_config()
1959         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1960         bfd_session_up(self)
1961         for dummy in range(self.test_session.detect_mult * 2):
1962             p = wait_for_bfd_packet(self)
1963             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1964             self.test_session.send_packet()
1965         self.vpp_session.activate_auth(key)
1966         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1967         self.test_session.sha1_key = key
1968         for dummy in range(self.test_session.detect_mult * 2):
1969             p = wait_for_bfd_packet(self)
1970             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1971             self.test_session.send_packet()
1972         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1973         self.assert_equal(len(self.vapi.collect_events()), 0,
1974                           "number of bfd events")
1975
1976     def test_auth_off_immediate(self):
1977         """ turn auth off without disturbing session state (immediate) """
1978         key = self.factory.create_random_key(self)
1979         key.add_vpp_config()
1980         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1981                                             self.pg0.remote_ip4, sha1_key=key)
1982         self.vpp_session.add_vpp_config()
1983         self.test_session = BFDTestSession(
1984             self, self.pg0, AF_INET, sha1_key=key,
1985             bfd_key_id=self.vpp_session.bfd_key_id)
1986         bfd_session_up(self)
1987         # self.vapi.want_bfd_events(enable_disable=0)
1988         for dummy in range(self.test_session.detect_mult * 2):
1989             p = wait_for_bfd_packet(self)
1990             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1991             self.test_session.inc_seq_num()
1992             self.test_session.send_packet()
1993         self.vpp_session.deactivate_auth()
1994         self.test_session.bfd_key_id = None
1995         self.test_session.sha1_key = None
1996         for dummy in range(self.test_session.detect_mult * 2):
1997             p = wait_for_bfd_packet(self)
1998             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1999             self.test_session.inc_seq_num()
2000             self.test_session.send_packet()
2001         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2002         self.assert_equal(len(self.vapi.collect_events()), 0,
2003                           "number of bfd events")
2004
2005     def test_auth_change_key_immediate(self):
2006         """ change auth key without disturbing session state (immediate) """
2007         key1 = self.factory.create_random_key(self)
2008         key1.add_vpp_config()
2009         key2 = self.factory.create_random_key(self)
2010         key2.add_vpp_config()
2011         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2012                                             self.pg0.remote_ip4, sha1_key=key1)
2013         self.vpp_session.add_vpp_config()
2014         self.test_session = BFDTestSession(
2015             self, self.pg0, AF_INET, sha1_key=key1,
2016             bfd_key_id=self.vpp_session.bfd_key_id)
2017         bfd_session_up(self)
2018         for dummy in range(self.test_session.detect_mult * 2):
2019             p = wait_for_bfd_packet(self)
2020             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2021             self.test_session.send_packet()
2022         self.vpp_session.activate_auth(key2)
2023         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2024         self.test_session.sha1_key = key2
2025         for dummy in range(self.test_session.detect_mult * 2):
2026             p = wait_for_bfd_packet(self)
2027             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2028             self.test_session.send_packet()
2029         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2030         self.assert_equal(len(self.vapi.collect_events()), 0,
2031                           "number of bfd events")
2032
2033     def test_auth_on_delayed(self):
2034         """ turn auth on without disturbing session state (delayed) """
2035         key = self.factory.create_random_key(self)
2036         key.add_vpp_config()
2037         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2038                                             self.pg0.remote_ip4)
2039         self.vpp_session.add_vpp_config()
2040         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2041         bfd_session_up(self)
2042         for dummy in range(self.test_session.detect_mult * 2):
2043             wait_for_bfd_packet(self)
2044             self.test_session.send_packet()
2045         self.vpp_session.activate_auth(key, delayed=True)
2046         for dummy in range(self.test_session.detect_mult * 2):
2047             p = wait_for_bfd_packet(self)
2048             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2049             self.test_session.send_packet()
2050         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2051         self.test_session.sha1_key = key
2052         self.test_session.send_packet()
2053         for dummy in range(self.test_session.detect_mult * 2):
2054             p = wait_for_bfd_packet(self)
2055             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2056             self.test_session.send_packet()
2057         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2058         self.assert_equal(len(self.vapi.collect_events()), 0,
2059                           "number of bfd events")
2060
2061     def test_auth_off_delayed(self):
2062         """ turn auth off without disturbing session state (delayed) """
2063         key = self.factory.create_random_key(self)
2064         key.add_vpp_config()
2065         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2066                                             self.pg0.remote_ip4, sha1_key=key)
2067         self.vpp_session.add_vpp_config()
2068         self.test_session = BFDTestSession(
2069             self, self.pg0, AF_INET, sha1_key=key,
2070             bfd_key_id=self.vpp_session.bfd_key_id)
2071         bfd_session_up(self)
2072         for dummy in range(self.test_session.detect_mult * 2):
2073             p = wait_for_bfd_packet(self)
2074             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2075             self.test_session.send_packet()
2076         self.vpp_session.deactivate_auth(delayed=True)
2077         for dummy in range(self.test_session.detect_mult * 2):
2078             p = wait_for_bfd_packet(self)
2079             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2080             self.test_session.send_packet()
2081         self.test_session.bfd_key_id = None
2082         self.test_session.sha1_key = None
2083         self.test_session.send_packet()
2084         for dummy in range(self.test_session.detect_mult * 2):
2085             p = wait_for_bfd_packet(self)
2086             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2087             self.test_session.send_packet()
2088         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2089         self.assert_equal(len(self.vapi.collect_events()), 0,
2090                           "number of bfd events")
2091
2092     def test_auth_change_key_delayed(self):
2093         """ change auth key without disturbing session state (delayed) """
2094         key1 = self.factory.create_random_key(self)
2095         key1.add_vpp_config()
2096         key2 = self.factory.create_random_key(self)
2097         key2.add_vpp_config()
2098         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2099                                             self.pg0.remote_ip4, sha1_key=key1)
2100         self.vpp_session.add_vpp_config()
2101         self.vpp_session.admin_up()
2102         self.test_session = BFDTestSession(
2103             self, self.pg0, AF_INET, sha1_key=key1,
2104             bfd_key_id=self.vpp_session.bfd_key_id)
2105         bfd_session_up(self)
2106         for dummy in range(self.test_session.detect_mult * 2):
2107             p = wait_for_bfd_packet(self)
2108             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2109             self.test_session.send_packet()
2110         self.vpp_session.activate_auth(key2, delayed=True)
2111         for dummy in range(self.test_session.detect_mult * 2):
2112             p = wait_for_bfd_packet(self)
2113             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2114             self.test_session.send_packet()
2115         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2116         self.test_session.sha1_key = key2
2117         self.test_session.send_packet()
2118         for dummy in range(self.test_session.detect_mult * 2):
2119             p = wait_for_bfd_packet(self)
2120             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2121             self.test_session.send_packet()
2122         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2123         self.assert_equal(len(self.vapi.collect_events()), 0,
2124                           "number of bfd events")
2125
2126
2127 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2128 class BFDCLITestCase(VppTestCase):
2129     """Bidirectional Forwarding Detection (BFD) (CLI) """
2130     pg0 = None
2131
2132     @classmethod
2133     def setUpClass(cls):
2134         super(BFDCLITestCase, cls).setUpClass()
2135
2136         try:
2137             cls.create_pg_interfaces((0,))
2138             cls.pg0.config_ip4()
2139             cls.pg0.config_ip6()
2140             cls.pg0.resolve_arp()
2141             cls.pg0.resolve_ndp()
2142
2143         except Exception:
2144             super(BFDCLITestCase, cls).tearDownClass()
2145             raise
2146
2147     def setUp(self):
2148         super(BFDCLITestCase, self).setUp()
2149         self.factory = AuthKeyFactory()
2150         self.pg0.enable_capture()
2151
2152     def tearDown(self):
2153         try:
2154             self.vapi.want_bfd_events(enable_disable=0)
2155         except UnexpectedApiReturnValueError:
2156             # some tests aren't subscribed, so this is not an issue
2157             pass
2158         self.vapi.collect_events()  # clear the event queue
2159         super(BFDCLITestCase, self).tearDown()
2160
2161     def cli_verify_no_response(self, cli):
2162         """ execute a CLI, asserting that the response is empty """
2163         self.assert_equal(self.vapi.cli(cli),
2164                           "",
2165                           "CLI command response")
2166
2167     def cli_verify_response(self, cli, expected):
2168         """ execute a CLI, asserting that the response matches expectation """
2169         self.assert_equal(self.vapi.cli(cli).strip(),
2170                           expected,
2171                           "CLI command response")
2172
2173     def test_show(self):
2174         """ show commands """
2175         k1 = self.factory.create_random_key(self)
2176         k1.add_vpp_config()
2177         k2 = self.factory.create_random_key(
2178             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2179         k2.add_vpp_config()
2180         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2181         s1.add_vpp_config()
2182         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2183                               sha1_key=k2)
2184         s2.add_vpp_config()
2185         self.logger.info(self.vapi.ppcli("show bfd keys"))
2186         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2187         self.logger.info(self.vapi.ppcli("show bfd"))
2188
2189     def test_set_del_sha1_key(self):
2190         """ set/delete SHA1 auth key """
2191         k = self.factory.create_random_key(self)
2192         self.registry.register(k, self.logger)
2193         self.cli_verify_no_response(
2194             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2195             (k.conf_key_id,
2196                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2197         self.assertTrue(k.query_vpp_config())
2198         self.vpp_session = VppBFDUDPSession(
2199             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2200         self.vpp_session.add_vpp_config()
2201         self.test_session = \
2202             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2203                            bfd_key_id=self.vpp_session.bfd_key_id)
2204         self.vapi.want_bfd_events()
2205         bfd_session_up(self)
2206         bfd_session_down(self)
2207         # try to replace the secret for the key - should fail because the key
2208         # is in-use
2209         k2 = self.factory.create_random_key(self)
2210         self.cli_verify_response(
2211             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2212             (k.conf_key_id,
2213                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2214             "bfd key set: `bfd_auth_set_key' API call failed, "
2215             "rv=-103:BFD object in use")
2216         # manipulating the session using old secret should still work
2217         bfd_session_up(self)
2218         bfd_session_down(self)
2219         self.vpp_session.remove_vpp_config()
2220         self.cli_verify_no_response(
2221             "bfd key del conf-key-id %s" % k.conf_key_id)
2222         self.assertFalse(k.query_vpp_config())
2223
2224     def test_set_del_meticulous_sha1_key(self):
2225         """ set/delete meticulous SHA1 auth key """
2226         k = self.factory.create_random_key(
2227             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2228         self.registry.register(k, self.logger)
2229         self.cli_verify_no_response(
2230             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2231             (k.conf_key_id,
2232                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2233         self.assertTrue(k.query_vpp_config())
2234         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2235                                             self.pg0.remote_ip6, af=AF_INET6,
2236                                             sha1_key=k)
2237         self.vpp_session.add_vpp_config()
2238         self.vpp_session.admin_up()
2239         self.test_session = \
2240             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2241                            bfd_key_id=self.vpp_session.bfd_key_id)
2242         self.vapi.want_bfd_events()
2243         bfd_session_up(self)
2244         bfd_session_down(self)
2245         # try to replace the secret for the key - should fail because the key
2246         # is in-use
2247         k2 = self.factory.create_random_key(self)
2248         self.cli_verify_response(
2249             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2250             (k.conf_key_id,
2251                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2252             "bfd key set: `bfd_auth_set_key' API call failed, "
2253             "rv=-103:BFD object in use")
2254         # manipulating the session using old secret should still work
2255         bfd_session_up(self)
2256         bfd_session_down(self)
2257         self.vpp_session.remove_vpp_config()
2258         self.cli_verify_no_response(
2259             "bfd key del conf-key-id %s" % k.conf_key_id)
2260         self.assertFalse(k.query_vpp_config())
2261
2262     def test_add_mod_del_bfd_udp(self):
2263         """ create/modify/delete IPv4 BFD UDP session """
2264         vpp_session = VppBFDUDPSession(
2265             self, self.pg0, self.pg0.remote_ip4)
2266         self.registry.register(vpp_session, self.logger)
2267         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2268             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2269             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2270                                 self.pg0.remote_ip4,
2271                                 vpp_session.desired_min_tx,
2272                                 vpp_session.required_min_rx,
2273                                 vpp_session.detect_mult)
2274         self.cli_verify_no_response(cli_add_cmd)
2275         # 2nd add should fail
2276         self.cli_verify_response(
2277             cli_add_cmd,
2278             "bfd udp session add: `bfd_add_add_session' API call"
2279             " failed, rv=-101:Duplicate BFD object")
2280         verify_bfd_session_config(self, vpp_session)
2281         mod_session = VppBFDUDPSession(
2282             self, self.pg0, self.pg0.remote_ip4,
2283             required_min_rx=2 * vpp_session.required_min_rx,
2284             desired_min_tx=3 * vpp_session.desired_min_tx,
2285             detect_mult=4 * vpp_session.detect_mult)
2286         self.cli_verify_no_response(
2287             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2288             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2289             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2290              mod_session.desired_min_tx, mod_session.required_min_rx,
2291              mod_session.detect_mult))
2292         verify_bfd_session_config(self, mod_session)
2293         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2294             "peer-addr %s" % (self.pg0.name,
2295                               self.pg0.local_ip4, self.pg0.remote_ip4)
2296         self.cli_verify_no_response(cli_del_cmd)
2297         # 2nd del is expected to fail
2298         self.cli_verify_response(
2299             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2300             " failed, rv=-102:No such BFD object")
2301         self.assertFalse(vpp_session.query_vpp_config())
2302
2303     def test_add_mod_del_bfd_udp6(self):
2304         """ create/modify/delete IPv6 BFD UDP session """
2305         vpp_session = VppBFDUDPSession(
2306             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2307         self.registry.register(vpp_session, self.logger)
2308         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2309             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2310             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2311                                 self.pg0.remote_ip6,
2312                                 vpp_session.desired_min_tx,
2313                                 vpp_session.required_min_rx,
2314                                 vpp_session.detect_mult)
2315         self.cli_verify_no_response(cli_add_cmd)
2316         # 2nd add should fail
2317         self.cli_verify_response(
2318             cli_add_cmd,
2319             "bfd udp session add: `bfd_add_add_session' API call"
2320             " failed, rv=-101:Duplicate BFD object")
2321         verify_bfd_session_config(self, vpp_session)
2322         mod_session = VppBFDUDPSession(
2323             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2324             required_min_rx=2 * vpp_session.required_min_rx,
2325             desired_min_tx=3 * vpp_session.desired_min_tx,
2326             detect_mult=4 * vpp_session.detect_mult)
2327         self.cli_verify_no_response(
2328             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2329             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2330             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2331              mod_session.desired_min_tx,
2332              mod_session.required_min_rx, mod_session.detect_mult))
2333         verify_bfd_session_config(self, mod_session)
2334         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2335             "peer-addr %s" % (self.pg0.name,
2336                               self.pg0.local_ip6, self.pg0.remote_ip6)
2337         self.cli_verify_no_response(cli_del_cmd)
2338         # 2nd del is expected to fail
2339         self.cli_verify_response(
2340             cli_del_cmd,
2341             "bfd udp session del: `bfd_udp_del_session' API call"
2342             " failed, rv=-102:No such BFD object")
2343         self.assertFalse(vpp_session.query_vpp_config())
2344
2345     def test_add_mod_del_bfd_udp_auth(self):
2346         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2347         key = self.factory.create_random_key(self)
2348         key.add_vpp_config()
2349         vpp_session = VppBFDUDPSession(
2350             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2351         self.registry.register(vpp_session, self.logger)
2352         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2353             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2354             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2355             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2356                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2357                vpp_session.detect_mult, key.conf_key_id,
2358                vpp_session.bfd_key_id)
2359         self.cli_verify_no_response(cli_add_cmd)
2360         # 2nd add should fail
2361         self.cli_verify_response(
2362             cli_add_cmd,
2363             "bfd udp session add: `bfd_add_add_session' API call"
2364             " failed, rv=-101:Duplicate BFD object")
2365         verify_bfd_session_config(self, vpp_session)
2366         mod_session = VppBFDUDPSession(
2367             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2368             bfd_key_id=vpp_session.bfd_key_id,
2369             required_min_rx=2 * vpp_session.required_min_rx,
2370             desired_min_tx=3 * vpp_session.desired_min_tx,
2371             detect_mult=4 * vpp_session.detect_mult)
2372         self.cli_verify_no_response(
2373             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2374             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2375             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2376              mod_session.desired_min_tx,
2377              mod_session.required_min_rx, mod_session.detect_mult))
2378         verify_bfd_session_config(self, mod_session)
2379         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2380             "peer-addr %s" % (self.pg0.name,
2381                               self.pg0.local_ip4, self.pg0.remote_ip4)
2382         self.cli_verify_no_response(cli_del_cmd)
2383         # 2nd del is expected to fail
2384         self.cli_verify_response(
2385             cli_del_cmd,
2386             "bfd udp session del: `bfd_udp_del_session' API call"
2387             " failed, rv=-102:No such BFD object")
2388         self.assertFalse(vpp_session.query_vpp_config())
2389
2390     def test_add_mod_del_bfd_udp6_auth(self):
2391         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2392         key = self.factory.create_random_key(
2393             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2394         key.add_vpp_config()
2395         vpp_session = VppBFDUDPSession(
2396             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2397         self.registry.register(vpp_session, self.logger)
2398         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2399             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2400             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2401             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2402                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2403                vpp_session.detect_mult, key.conf_key_id,
2404                vpp_session.bfd_key_id)
2405         self.cli_verify_no_response(cli_add_cmd)
2406         # 2nd add should fail
2407         self.cli_verify_response(
2408             cli_add_cmd,
2409             "bfd udp session add: `bfd_add_add_session' API call"
2410             " failed, rv=-101:Duplicate BFD object")
2411         verify_bfd_session_config(self, vpp_session)
2412         mod_session = VppBFDUDPSession(
2413             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2414             bfd_key_id=vpp_session.bfd_key_id,
2415             required_min_rx=2 * vpp_session.required_min_rx,
2416             desired_min_tx=3 * vpp_session.desired_min_tx,
2417             detect_mult=4 * vpp_session.detect_mult)
2418         self.cli_verify_no_response(
2419             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2420             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2421             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2422              mod_session.desired_min_tx,
2423              mod_session.required_min_rx, mod_session.detect_mult))
2424         verify_bfd_session_config(self, mod_session)
2425         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2426             "peer-addr %s" % (self.pg0.name,
2427                               self.pg0.local_ip6, self.pg0.remote_ip6)
2428         self.cli_verify_no_response(cli_del_cmd)
2429         # 2nd del is expected to fail
2430         self.cli_verify_response(
2431             cli_del_cmd,
2432             "bfd udp session del: `bfd_udp_del_session' API call"
2433             " failed, rv=-102:No such BFD object")
2434         self.assertFalse(vpp_session.query_vpp_config())
2435
2436     def test_auth_on_off(self):
2437         """ turn authentication on and off """
2438         key = self.factory.create_random_key(
2439             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2440         key.add_vpp_config()
2441         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2442         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2443                                         sha1_key=key)
2444         session.add_vpp_config()
2445         cli_activate = \
2446             "bfd udp session auth activate interface %s local-addr %s "\
2447             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2448             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2449                key.conf_key_id, auth_session.bfd_key_id)
2450         self.cli_verify_no_response(cli_activate)
2451         verify_bfd_session_config(self, auth_session)
2452         self.cli_verify_no_response(cli_activate)
2453         verify_bfd_session_config(self, auth_session)
2454         cli_deactivate = \
2455             "bfd udp session auth deactivate interface %s local-addr %s "\
2456             "peer-addr %s "\
2457             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2458         self.cli_verify_no_response(cli_deactivate)
2459         verify_bfd_session_config(self, session)
2460         self.cli_verify_no_response(cli_deactivate)
2461         verify_bfd_session_config(self, session)
2462
2463     def test_auth_on_off_delayed(self):
2464         """ turn authentication on and off (delayed) """
2465         key = self.factory.create_random_key(
2466             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2467         key.add_vpp_config()
2468         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2469         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2470                                         sha1_key=key)
2471         session.add_vpp_config()
2472         cli_activate = \
2473             "bfd udp session auth activate interface %s local-addr %s "\
2474             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2475             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2476                key.conf_key_id, auth_session.bfd_key_id)
2477         self.cli_verify_no_response(cli_activate)
2478         verify_bfd_session_config(self, auth_session)
2479         self.cli_verify_no_response(cli_activate)
2480         verify_bfd_session_config(self, auth_session)
2481         cli_deactivate = \
2482             "bfd udp session auth deactivate interface %s local-addr %s "\
2483             "peer-addr %s delayed yes"\
2484             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2485         self.cli_verify_no_response(cli_deactivate)
2486         verify_bfd_session_config(self, session)
2487         self.cli_verify_no_response(cli_deactivate)
2488         verify_bfd_session_config(self, session)
2489
2490     def test_admin_up_down(self):
2491         """ put session admin-up and admin-down """
2492         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2493         session.add_vpp_config()
2494         cli_down = \
2495             "bfd udp session set-flags admin down interface %s local-addr %s "\
2496             "peer-addr %s "\
2497             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2498         cli_up = \
2499             "bfd udp session set-flags admin up interface %s local-addr %s "\
2500             "peer-addr %s "\
2501             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2502         self.cli_verify_no_response(cli_down)
2503         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2504         self.cli_verify_no_response(cli_up)
2505         verify_bfd_session_config(self, session, state=BFDState.down)
2506
2507     def test_set_del_udp_echo_source(self):
2508         """ set/del udp echo source """
2509         self.create_loopback_interfaces([0])
2510         self.loopback0 = self.lo_interfaces[0]
2511         self.loopback0.admin_up()
2512         self.cli_verify_response("show bfd echo-source",
2513                                  "UDP echo source is not set.")
2514         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2515         self.cli_verify_no_response(cli_set)
2516         self.cli_verify_response("show bfd echo-source",
2517                                  "UDP echo source is: %s\n"
2518                                  "IPv4 address usable as echo source: none\n"
2519                                  "IPv6 address usable as echo source: none" %
2520                                  self.loopback0.name)
2521         self.loopback0.config_ip4()
2522         unpacked = unpack("!L", self.loopback0.local_ip4n)
2523         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2524         self.cli_verify_response("show bfd echo-source",
2525                                  "UDP echo source is: %s\n"
2526                                  "IPv4 address usable as echo source: %s\n"
2527                                  "IPv6 address usable as echo source: none" %
2528                                  (self.loopback0.name, echo_ip4))
2529         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2530         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2531                                             unpacked[2], unpacked[3] ^ 1))
2532         self.loopback0.config_ip6()
2533         self.cli_verify_response("show bfd echo-source",
2534                                  "UDP echo source is: %s\n"
2535                                  "IPv4 address usable as echo source: %s\n"
2536                                  "IPv6 address usable as echo source: %s" %
2537                                  (self.loopback0.name, echo_ip4, echo_ip6))
2538         cli_del = "bfd udp echo-source del"
2539         self.cli_verify_no_response(cli_del)
2540         self.cli_verify_response("show bfd echo-source",
2541                                  "UDP echo source is not set.")
2542
2543 if __name__ == '__main__':
2544     unittest.main(testRunner=VppTestRunner)