7b474228eb6b15b6fa7778dd755179c82fac9bc8
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 from scapy.layers.inet import UDP, IP
16 from scapy.layers.inet6 import IPv6
17 from scapy.layers.l2 import Ether
18 from scapy.packet import Raw
19
20 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
21     BFDDiagCode, BFDState, BFD_vpp_echo
22 from framework import VppTestCase, VppTestRunner, running_extended_tests
23 from util import ppp
24 from vpp_ip import DpoProto
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_lo_interface import VppLoInterface
27 from vpp_papi_provider import UnexpectedApiReturnValueError
28 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
29
30 USEC_IN_SEC = 1000000
31
32
33 class AuthKeyFactory(object):
34     """Factory class for creating auth keys with unique conf key ID"""
35
36     def __init__(self):
37         self._conf_key_ids = {}
38
39     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
40         """ create a random key with unique conf key id """
41         conf_key_id = randint(0, 0xFFFFFFFF)
42         while conf_key_id in self._conf_key_ids:
43             conf_key_id = randint(0, 0xFFFFFFFF)
44         self._conf_key_ids[conf_key_id] = 1
45         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
46         return VppBFDAuthKey(test=test, auth_type=auth_type,
47                              conf_key_id=conf_key_id, key=key)
48
49
50 @unittest.skipUnless(running_extended_tests, "part of extended tests")
51 class BFDAPITestCase(VppTestCase):
52     """Bidirectional Forwarding Detection (BFD) - API"""
53
54     pg0 = None
55     pg1 = None
56
57     @classmethod
58     def setUpClass(cls):
59         super(BFDAPITestCase, cls).setUpClass()
60         cls.vapi.cli("set log class bfd level debug")
61         try:
62             cls.create_pg_interfaces(range(2))
63             for i in cls.pg_interfaces:
64                 i.config_ip4()
65                 i.config_ip6()
66                 i.resolve_arp()
67
68         except Exception:
69             super(BFDAPITestCase, cls).tearDownClass()
70             raise
71
72     @classmethod
73     def tearDownClass(cls):
74         super(BFDAPITestCase, cls).tearDownClass()
75
76     def setUp(self):
77         super(BFDAPITestCase, self).setUp()
78         self.factory = AuthKeyFactory()
79
80     def test_add_bfd(self):
81         """ create a BFD session """
82         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
83         session.add_vpp_config()
84         self.logger.debug("Session state is %s", session.state)
85         session.remove_vpp_config()
86         session.add_vpp_config()
87         self.logger.debug("Session state is %s", session.state)
88         session.remove_vpp_config()
89
90     def test_double_add(self):
91         """ create the same BFD session twice (negative case) """
92         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
93         session.add_vpp_config()
94
95         with self.vapi.assert_negative_api_retval():
96             session.add_vpp_config()
97
98         session.remove_vpp_config()
99
100     def test_add_bfd6(self):
101         """ create IPv6 BFD session """
102         session = VppBFDUDPSession(
103             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
104         session.add_vpp_config()
105         self.logger.debug("Session state is %s", session.state)
106         session.remove_vpp_config()
107         session.add_vpp_config()
108         self.logger.debug("Session state is %s", session.state)
109         session.remove_vpp_config()
110
111     def test_mod_bfd(self):
112         """ modify BFD session parameters """
113         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
114                                    desired_min_tx=50000,
115                                    required_min_rx=10000,
116                                    detect_mult=1)
117         session.add_vpp_config()
118         s = session.get_bfd_udp_session_dump_entry()
119         self.assert_equal(session.desired_min_tx,
120                           s.desired_min_tx,
121                           "desired min transmit interval")
122         self.assert_equal(session.required_min_rx,
123                           s.required_min_rx,
124                           "required min receive interval")
125         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
126         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
127                                   required_min_rx=session.required_min_rx * 2,
128                                   detect_mult=session.detect_mult * 2)
129         s = session.get_bfd_udp_session_dump_entry()
130         self.assert_equal(session.desired_min_tx,
131                           s.desired_min_tx,
132                           "desired min transmit interval")
133         self.assert_equal(session.required_min_rx,
134                           s.required_min_rx,
135                           "required min receive interval")
136         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
137
138     def test_add_sha1_keys(self):
139         """ add SHA1 keys """
140         key_count = 10
141         keys = [self.factory.create_random_key(
142             self) for i in range(0, key_count)]
143         for key in keys:
144             self.assertFalse(key.query_vpp_config())
145         for key in keys:
146             key.add_vpp_config()
147         for key in keys:
148             self.assertTrue(key.query_vpp_config())
149         # remove randomly
150         indexes = range(key_count)
151         shuffle(indexes)
152         removed = []
153         for i in indexes:
154             key = keys[i]
155             key.remove_vpp_config()
156             removed.append(i)
157             for j in range(key_count):
158                 key = keys[j]
159                 if j in removed:
160                     self.assertFalse(key.query_vpp_config())
161                 else:
162                     self.assertTrue(key.query_vpp_config())
163         # should be removed now
164         for key in keys:
165             self.assertFalse(key.query_vpp_config())
166         # add back and remove again
167         for key in keys:
168             key.add_vpp_config()
169         for key in keys:
170             self.assertTrue(key.query_vpp_config())
171         for key in keys:
172             key.remove_vpp_config()
173         for key in keys:
174             self.assertFalse(key.query_vpp_config())
175
176     def test_add_bfd_sha1(self):
177         """ create a BFD session (SHA1) """
178         key = self.factory.create_random_key(self)
179         key.add_vpp_config()
180         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
181                                    sha1_key=key)
182         session.add_vpp_config()
183         self.logger.debug("Session state is %s", session.state)
184         session.remove_vpp_config()
185         session.add_vpp_config()
186         self.logger.debug("Session state is %s", session.state)
187         session.remove_vpp_config()
188
189     def test_double_add_sha1(self):
190         """ create the same BFD session twice (negative case) (SHA1) """
191         key = self.factory.create_random_key(self)
192         key.add_vpp_config()
193         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
194                                    sha1_key=key)
195         session.add_vpp_config()
196         with self.assertRaises(Exception):
197             session.add_vpp_config()
198
199     def test_add_auth_nonexistent_key(self):
200         """ create BFD session using non-existent SHA1 (negative case) """
201         session = VppBFDUDPSession(
202             self, self.pg0, self.pg0.remote_ip4,
203             sha1_key=self.factory.create_random_key(self))
204         with self.assertRaises(Exception):
205             session.add_vpp_config()
206
207     def test_shared_sha1_key(self):
208         """ share single SHA1 key between multiple BFD sessions """
209         key = self.factory.create_random_key(self)
210         key.add_vpp_config()
211         sessions = [
212             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
213                              sha1_key=key),
214             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
215                              sha1_key=key, af=AF_INET6),
216             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
217                              sha1_key=key),
218             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
219                              sha1_key=key, af=AF_INET6)]
220         for s in sessions:
221             s.add_vpp_config()
222         removed = 0
223         for s in sessions:
224             e = key.get_bfd_auth_keys_dump_entry()
225             self.assert_equal(e.use_count, len(sessions) - removed,
226                               "Use count for shared key")
227             s.remove_vpp_config()
228             removed += 1
229         e = key.get_bfd_auth_keys_dump_entry()
230         self.assert_equal(e.use_count, len(sessions) - removed,
231                           "Use count for shared key")
232
233     def test_activate_auth(self):
234         """ activate SHA1 authentication """
235         key = self.factory.create_random_key(self)
236         key.add_vpp_config()
237         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
238         session.add_vpp_config()
239         session.activate_auth(key)
240
241     def test_deactivate_auth(self):
242         """ deactivate SHA1 authentication """
243         key = self.factory.create_random_key(self)
244         key.add_vpp_config()
245         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
246         session.add_vpp_config()
247         session.activate_auth(key)
248         session.deactivate_auth()
249
250     def test_change_key(self):
251         """ change SHA1 key """
252         key1 = self.factory.create_random_key(self)
253         key2 = self.factory.create_random_key(self)
254         while key2.conf_key_id == key1.conf_key_id:
255             key2 = self.factory.create_random_key(self)
256         key1.add_vpp_config()
257         key2.add_vpp_config()
258         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
259                                    sha1_key=key1)
260         session.add_vpp_config()
261         session.activate_auth(key2)
262
263     def test_set_del_udp_echo_source(self):
264         """ set/del udp echo source """
265         self.create_loopback_interfaces(1)
266         self.loopback0 = self.lo_interfaces[0]
267         self.loopback0.admin_up()
268         echo_source = self.vapi.bfd_udp_get_echo_source()
269         self.assertFalse(echo_source.is_set)
270         self.assertFalse(echo_source.have_usable_ip4)
271         self.assertFalse(echo_source.have_usable_ip6)
272
273         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
274         echo_source = self.vapi.bfd_udp_get_echo_source()
275         self.assertTrue(echo_source.is_set)
276         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
277         self.assertFalse(echo_source.have_usable_ip4)
278         self.assertFalse(echo_source.have_usable_ip6)
279
280         self.loopback0.config_ip4()
281         unpacked = unpack("!L", self.loopback0.local_ip4n)
282         echo_ip4 = pack("!L", unpacked[0] ^ 1)
283         echo_source = self.vapi.bfd_udp_get_echo_source()
284         self.assertTrue(echo_source.is_set)
285         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
286         self.assertTrue(echo_source.have_usable_ip4)
287         self.assertEqual(echo_source.ip4_addr, echo_ip4)
288         self.assertFalse(echo_source.have_usable_ip6)
289
290         self.loopback0.config_ip6()
291         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
292         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
293                         unpacked[3] ^ 1)
294         echo_source = self.vapi.bfd_udp_get_echo_source()
295         self.assertTrue(echo_source.is_set)
296         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
297         self.assertTrue(echo_source.have_usable_ip4)
298         self.assertEqual(echo_source.ip4_addr, echo_ip4)
299         self.assertTrue(echo_source.have_usable_ip6)
300         self.assertEqual(echo_source.ip6_addr, echo_ip6)
301
302         self.vapi.bfd_udp_del_echo_source()
303         echo_source = self.vapi.bfd_udp_get_echo_source()
304         self.assertFalse(echo_source.is_set)
305         self.assertFalse(echo_source.have_usable_ip4)
306         self.assertFalse(echo_source.have_usable_ip6)
307
308
309 class BFDTestSession(object):
310     """ BFD session as seen from test framework side """
311
312     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
313                  bfd_key_id=None, our_seq_number=None):
314         self.test = test
315         self.af = af
316         self.sha1_key = sha1_key
317         self.bfd_key_id = bfd_key_id
318         self.interface = interface
319         self.udp_sport = randint(49152, 65535)
320         if our_seq_number is None:
321             self.our_seq_number = randint(0, 40000000)
322         else:
323             self.our_seq_number = our_seq_number
324         self.vpp_seq_number = None
325         self.my_discriminator = 0
326         self.desired_min_tx = 300000
327         self.required_min_rx = 300000
328         self.required_min_echo_rx = None
329         self.detect_mult = detect_mult
330         self.diag = BFDDiagCode.no_diagnostic
331         self.your_discriminator = None
332         self.state = BFDState.down
333         self.auth_type = BFDAuthType.no_auth
334
335     def inc_seq_num(self):
336         """ increment sequence number, wrapping if needed """
337         if self.our_seq_number == 0xFFFFFFFF:
338             self.our_seq_number = 0
339         else:
340             self.our_seq_number += 1
341
342     def update(self, my_discriminator=None, your_discriminator=None,
343                desired_min_tx=None, required_min_rx=None,
344                required_min_echo_rx=None, detect_mult=None,
345                diag=None, state=None, auth_type=None):
346         """ update BFD parameters associated with session """
347         if my_discriminator is not None:
348             self.my_discriminator = my_discriminator
349         if your_discriminator is not None:
350             self.your_discriminator = your_discriminator
351         if required_min_rx is not None:
352             self.required_min_rx = required_min_rx
353         if required_min_echo_rx is not None:
354             self.required_min_echo_rx = required_min_echo_rx
355         if desired_min_tx is not None:
356             self.desired_min_tx = desired_min_tx
357         if detect_mult is not None:
358             self.detect_mult = detect_mult
359         if diag is not None:
360             self.diag = diag
361         if state is not None:
362             self.state = state
363         if auth_type is not None:
364             self.auth_type = auth_type
365
366     def fill_packet_fields(self, packet):
367         """ set packet fields with known values in packet """
368         bfd = packet[BFD]
369         if self.my_discriminator:
370             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
371                                    self.my_discriminator)
372             bfd.my_discriminator = self.my_discriminator
373         if self.your_discriminator:
374             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
375                                    self.your_discriminator)
376             bfd.your_discriminator = self.your_discriminator
377         if self.required_min_rx:
378             self.test.logger.debug(
379                 "BFD: setting packet.required_min_rx_interval=%s",
380                 self.required_min_rx)
381             bfd.required_min_rx_interval = self.required_min_rx
382         if self.required_min_echo_rx:
383             self.test.logger.debug(
384                 "BFD: setting packet.required_min_echo_rx=%s",
385                 self.required_min_echo_rx)
386             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
387         if self.desired_min_tx:
388             self.test.logger.debug(
389                 "BFD: setting packet.desired_min_tx_interval=%s",
390                 self.desired_min_tx)
391             bfd.desired_min_tx_interval = self.desired_min_tx
392         if self.detect_mult:
393             self.test.logger.debug(
394                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
395             bfd.detect_mult = self.detect_mult
396         if self.diag:
397             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
398             bfd.diag = self.diag
399         if self.state:
400             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
401             bfd.state = self.state
402         if self.auth_type:
403             # this is used by a negative test-case
404             self.test.logger.debug("BFD: setting packet.auth_type=%s",
405                                    self.auth_type)
406             bfd.auth_type = self.auth_type
407
408     def create_packet(self):
409         """ create a BFD packet, reflecting the current state of session """
410         if self.sha1_key:
411             bfd = BFD(flags="A")
412             bfd.auth_type = self.sha1_key.auth_type
413             bfd.auth_len = BFD.sha1_auth_len
414             bfd.auth_key_id = self.bfd_key_id
415             bfd.auth_seq_num = self.our_seq_number
416             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
417         else:
418             bfd = BFD()
419         if self.af == AF_INET6:
420             packet = (Ether(src=self.interface.remote_mac,
421                             dst=self.interface.local_mac) /
422                       IPv6(src=self.interface.remote_ip6,
423                            dst=self.interface.local_ip6,
424                            hlim=255) /
425                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
426                       bfd)
427         else:
428             packet = (Ether(src=self.interface.remote_mac,
429                             dst=self.interface.local_mac) /
430                       IP(src=self.interface.remote_ip4,
431                          dst=self.interface.local_ip4,
432                          ttl=255) /
433                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
434                       bfd)
435         self.test.logger.debug("BFD: Creating packet")
436         self.fill_packet_fields(packet)
437         if self.sha1_key:
438             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
439                 "\0" * (20 - len(self.sha1_key.key))
440             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
441                                    hashlib.sha1(hash_material).hexdigest())
442             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
443         return packet
444
445     def send_packet(self, packet=None, interface=None):
446         """ send packet on interface, creating the packet if needed """
447         if packet is None:
448             packet = self.create_packet()
449         if interface is None:
450             interface = self.test.pg0
451         self.test.logger.debug(ppp("Sending packet:", packet))
452         interface.add_stream(packet)
453         self.test.pg_start()
454
455     def verify_sha1_auth(self, packet):
456         """ Verify correctness of authentication in BFD layer. """
457         bfd = packet[BFD]
458         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
459         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
460                                BFDAuthType)
461         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
462         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
463         if self.vpp_seq_number is None:
464             self.vpp_seq_number = bfd.auth_seq_num
465             self.test.logger.debug("Received initial sequence number: %s" %
466                                    self.vpp_seq_number)
467         else:
468             recvd_seq_num = bfd.auth_seq_num
469             self.test.logger.debug("Received followup sequence number: %s" %
470                                    recvd_seq_num)
471             if self.vpp_seq_number < 0xffffffff:
472                 if self.sha1_key.auth_type == \
473                         BFDAuthType.meticulous_keyed_sha1:
474                     self.test.assert_equal(recvd_seq_num,
475                                            self.vpp_seq_number + 1,
476                                            "BFD sequence number")
477                 else:
478                     self.test.assert_in_range(recvd_seq_num,
479                                               self.vpp_seq_number,
480                                               self.vpp_seq_number + 1,
481                                               "BFD sequence number")
482             else:
483                 if self.sha1_key.auth_type == \
484                         BFDAuthType.meticulous_keyed_sha1:
485                     self.test.assert_equal(recvd_seq_num, 0,
486                                            "BFD sequence number")
487                 else:
488                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
489                                        "BFD sequence number not one of "
490                                        "(%s, 0)" % self.vpp_seq_number)
491             self.vpp_seq_number = recvd_seq_num
492         # last 20 bytes represent the hash - so replace them with the key,
493         # pad the result with zeros and hash the result
494         hash_material = bfd.original[:-20] + self.sha1_key.key + \
495             "\0" * (20 - len(self.sha1_key.key))
496         expected_hash = hashlib.sha1(hash_material).hexdigest()
497         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
498                                expected_hash, "Auth key hash")
499
500     def verify_bfd(self, packet):
501         """ Verify correctness of BFD layer. """
502         bfd = packet[BFD]
503         self.test.assert_equal(bfd.version, 1, "BFD version")
504         self.test.assert_equal(bfd.your_discriminator,
505                                self.my_discriminator,
506                                "BFD - your discriminator")
507         if self.sha1_key:
508             self.verify_sha1_auth(packet)
509
510
511 def bfd_session_up(test):
512     """ Bring BFD session up """
513     test.logger.info("BFD: Waiting for slow hello")
514     p = wait_for_bfd_packet(test, 2)
515     old_offset = None
516     if hasattr(test, 'vpp_clock_offset'):
517         old_offset = test.vpp_clock_offset
518     test.vpp_clock_offset = time.time() - p.time
519     test.logger.debug("BFD: Calculated vpp clock offset: %s",
520                       test.vpp_clock_offset)
521     if old_offset:
522         test.assertAlmostEqual(
523             old_offset, test.vpp_clock_offset, delta=0.5,
524             msg="vpp clock offset not stable (new: %s, old: %s)" %
525             (test.vpp_clock_offset, old_offset))
526     test.logger.info("BFD: Sending Init")
527     test.test_session.update(my_discriminator=randint(0, 40000000),
528                              your_discriminator=p[BFD].my_discriminator,
529                              state=BFDState.init)
530     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
531             BFDAuthType.meticulous_keyed_sha1:
532         test.test_session.inc_seq_num()
533     test.test_session.send_packet()
534     test.logger.info("BFD: Waiting for event")
535     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
536     verify_event(test, e, expected_state=BFDState.up)
537     test.logger.info("BFD: Session is Up")
538     test.test_session.update(state=BFDState.up)
539     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
540             BFDAuthType.meticulous_keyed_sha1:
541         test.test_session.inc_seq_num()
542     test.test_session.send_packet()
543     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
544
545
546 def bfd_session_down(test):
547     """ Bring BFD session down """
548     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
549     test.test_session.update(state=BFDState.down)
550     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
551             BFDAuthType.meticulous_keyed_sha1:
552         test.test_session.inc_seq_num()
553     test.test_session.send_packet()
554     test.logger.info("BFD: Waiting for event")
555     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
556     verify_event(test, e, expected_state=BFDState.down)
557     test.logger.info("BFD: Session is Down")
558     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
559
560
561 def verify_bfd_session_config(test, session, state=None):
562     dump = session.get_bfd_udp_session_dump_entry()
563     test.assertIsNotNone(dump)
564     # since dump is not none, we have verified that sw_if_index and addresses
565     # are valid (in get_bfd_udp_session_dump_entry)
566     if state:
567         test.assert_equal(dump.state, state, "session state")
568     test.assert_equal(dump.required_min_rx, session.required_min_rx,
569                       "required min rx interval")
570     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
571                       "desired min tx interval")
572     test.assert_equal(dump.detect_mult, session.detect_mult,
573                       "detect multiplier")
574     if session.sha1_key is None:
575         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
576     else:
577         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
578         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
579                           "bfd key id")
580         test.assert_equal(dump.conf_key_id,
581                           session.sha1_key.conf_key_id,
582                           "config key id")
583
584
585 def verify_ip(test, packet):
586     """ Verify correctness of IP layer. """
587     if test.vpp_session.af == AF_INET6:
588         ip = packet[IPv6]
589         local_ip = test.pg0.local_ip6
590         remote_ip = test.pg0.remote_ip6
591         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
592     else:
593         ip = packet[IP]
594         local_ip = test.pg0.local_ip4
595         remote_ip = test.pg0.remote_ip4
596         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
597     test.assert_equal(ip.src, local_ip, "IP source address")
598     test.assert_equal(ip.dst, remote_ip, "IP destination address")
599
600
601 def verify_udp(test, packet):
602     """ Verify correctness of UDP layer. """
603     udp = packet[UDP]
604     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
605     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
606                          "UDP source port")
607
608
609 def verify_event(test, event, expected_state):
610     """ Verify correctness of event values. """
611     e = event
612     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
613     test.assert_equal(e.sw_if_index,
614                       test.vpp_session.interface.sw_if_index,
615                       "BFD interface index")
616     is_ipv6 = 0
617     if test.vpp_session.af == AF_INET6:
618         is_ipv6 = 1
619     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
620     if test.vpp_session.af == AF_INET:
621         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
622                           "Local IPv4 address")
623         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
624                           "Peer IPv4 address")
625     else:
626         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
627                           "Local IPv6 address")
628         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
629                           "Peer IPv6 address")
630     test.assert_equal(e.state, expected_state, BFDState)
631
632
633 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
634     """ wait for BFD packet and verify its correctness
635
636     :param timeout: how long to wait
637     :param pcap_time_min: ignore packets with pcap timestamp lower than this
638
639     :returns: tuple (packet, time spent waiting for packet)
640     """
641     test.logger.info("BFD: Waiting for BFD packet")
642     deadline = time.time() + timeout
643     counter = 0
644     while True:
645         counter += 1
646         # sanity check
647         test.assert_in_range(counter, 0, 100, "number of packets ignored")
648         time_left = deadline - time.time()
649         if time_left < 0:
650             raise CaptureTimeoutError("Packet did not arrive within timeout")
651         p = test.pg0.wait_for_packet(timeout=time_left)
652         test.logger.debug(ppp("BFD: Got packet:", p))
653         if pcap_time_min is not None and p.time < pcap_time_min:
654             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
655                                   "pcap time min %s):" %
656                                   (p.time, pcap_time_min), p))
657         else:
658             break
659     bfd = p[BFD]
660     if bfd is None:
661         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
662     if bfd.payload:
663         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
664     verify_ip(test, p)
665     verify_udp(test, p)
666     test.test_session.verify_bfd(p)
667     return p
668
669
670 @unittest.skipUnless(running_extended_tests, "part of extended tests")
671 class BFD4TestCase(VppTestCase):
672     """Bidirectional Forwarding Detection (BFD)"""
673
674     pg0 = None
675     vpp_clock_offset = None
676     vpp_session = None
677     test_session = None
678
679     @classmethod
680     def setUpClass(cls):
681         super(BFD4TestCase, cls).setUpClass()
682         cls.vapi.cli("set log class bfd level debug")
683         try:
684             cls.create_pg_interfaces([0])
685             cls.create_loopback_interfaces(1)
686             cls.loopback0 = cls.lo_interfaces[0]
687             cls.loopback0.config_ip4()
688             cls.loopback0.admin_up()
689             cls.pg0.config_ip4()
690             cls.pg0.configure_ipv4_neighbors()
691             cls.pg0.admin_up()
692             cls.pg0.resolve_arp()
693
694         except Exception:
695             super(BFD4TestCase, cls).tearDownClass()
696             raise
697
698     @classmethod
699     def tearDownClass(cls):
700         super(BFD4TestCase, cls).tearDownClass()
701
702     def setUp(self):
703         super(BFD4TestCase, self).setUp()
704         self.factory = AuthKeyFactory()
705         self.vapi.want_bfd_events()
706         self.pg0.enable_capture()
707         try:
708             self.vpp_session = VppBFDUDPSession(self, self.pg0,
709                                                 self.pg0.remote_ip4)
710             self.vpp_session.add_vpp_config()
711             self.vpp_session.admin_up()
712             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
713         except:
714             self.vapi.want_bfd_events(enable_disable=0)
715             raise
716
717     def tearDown(self):
718         if not self.vpp_dead:
719             self.vapi.want_bfd_events(enable_disable=0)
720         self.vapi.collect_events()  # clear the event queue
721         super(BFD4TestCase, self).tearDown()
722
723     def test_session_up(self):
724         """ bring BFD session up """
725         bfd_session_up(self)
726
727     def test_session_up_by_ip(self):
728         """ bring BFD session up - first frame looked up by address pair """
729         self.logger.info("BFD: Sending Slow control frame")
730         self.test_session.update(my_discriminator=randint(0, 40000000))
731         self.test_session.send_packet()
732         self.pg0.enable_capture()
733         p = self.pg0.wait_for_packet(1)
734         self.assert_equal(p[BFD].your_discriminator,
735                           self.test_session.my_discriminator,
736                           "BFD - your discriminator")
737         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
738         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
739                                  state=BFDState.up)
740         self.logger.info("BFD: Waiting for event")
741         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
742         verify_event(self, e, expected_state=BFDState.init)
743         self.logger.info("BFD: Sending Up")
744         self.test_session.send_packet()
745         self.logger.info("BFD: Waiting for event")
746         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
747         verify_event(self, e, expected_state=BFDState.up)
748         self.logger.info("BFD: Session is Up")
749         self.test_session.update(state=BFDState.up)
750         self.test_session.send_packet()
751         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
752
753     def test_session_down(self):
754         """ bring BFD session down """
755         bfd_session_up(self)
756         bfd_session_down(self)
757
758     @unittest.skipUnless(running_extended_tests, "part of extended tests")
759     def test_hold_up(self):
760         """ hold BFD session up """
761         bfd_session_up(self)
762         for dummy in range(self.test_session.detect_mult * 2):
763             wait_for_bfd_packet(self)
764             self.test_session.send_packet()
765         self.assert_equal(len(self.vapi.collect_events()), 0,
766                           "number of bfd events")
767
768     @unittest.skipUnless(running_extended_tests, "part of extended tests")
769     def test_slow_timer(self):
770         """ verify slow periodic control frames while session down """
771         packet_count = 3
772         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
773         prev_packet = wait_for_bfd_packet(self, 2)
774         for dummy in range(packet_count):
775             next_packet = wait_for_bfd_packet(self, 2)
776             time_diff = next_packet.time - prev_packet.time
777             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
778             # to work around timing issues
779             self.assert_in_range(
780                 time_diff, 0.70, 1.05, "time between slow packets")
781             prev_packet = next_packet
782
783     @unittest.skipUnless(running_extended_tests, "part of extended tests")
784     def test_zero_remote_min_rx(self):
785         """ no packets when zero remote required min rx interval """
786         bfd_session_up(self)
787         self.test_session.update(required_min_rx=0)
788         self.test_session.send_packet()
789         for dummy in range(self.test_session.detect_mult):
790             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
791                        "sleep before transmitting bfd packet")
792             self.test_session.send_packet()
793             try:
794                 p = wait_for_bfd_packet(self, timeout=0)
795                 self.logger.error(ppp("Received unexpected packet:", p))
796             except CaptureTimeoutError:
797                 pass
798         self.assert_equal(
799             len(self.vapi.collect_events()), 0, "number of bfd events")
800         self.test_session.update(required_min_rx=300000)
801         for dummy in range(3):
802             self.test_session.send_packet()
803             wait_for_bfd_packet(
804                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
805         self.assert_equal(
806             len(self.vapi.collect_events()), 0, "number of bfd events")
807
808     @unittest.skipUnless(running_extended_tests, "part of extended tests")
809     def test_conn_down(self):
810         """ verify session goes down after inactivity """
811         bfd_session_up(self)
812         detection_time = self.test_session.detect_mult *\
813             self.vpp_session.required_min_rx / USEC_IN_SEC
814         self.sleep(detection_time, "waiting for BFD session time-out")
815         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
816         verify_event(self, e, expected_state=BFDState.down)
817
818     @unittest.skipUnless(running_extended_tests, "part of extended tests")
819     def test_large_required_min_rx(self):
820         """ large remote required min rx interval """
821         bfd_session_up(self)
822         p = wait_for_bfd_packet(self)
823         interval = 3000000
824         self.test_session.update(required_min_rx=interval)
825         self.test_session.send_packet()
826         time_mark = time.time()
827         count = 0
828         # busy wait here, trying to collect a packet or event, vpp is not
829         # allowed to send packets and the session will timeout first - so the
830         # Up->Down event must arrive before any packets do
831         while time.time() < time_mark + interval / USEC_IN_SEC:
832             try:
833                 p = wait_for_bfd_packet(self, timeout=0)
834                 # if vpp managed to send a packet before we did the session
835                 # session update, then that's fine, ignore it
836                 if p.time < time_mark - self.vpp_clock_offset:
837                     continue
838                 self.logger.error(ppp("Received unexpected packet:", p))
839                 count += 1
840             except CaptureTimeoutError:
841                 pass
842             events = self.vapi.collect_events()
843             if len(events) > 0:
844                 verify_event(self, events[0], BFDState.down)
845                 break
846         self.assert_equal(count, 0, "number of packets received")
847
848     @unittest.skipUnless(running_extended_tests, "part of extended tests")
849     def test_immediate_remote_min_rx_reduction(self):
850         """ immediately honor remote required min rx reduction """
851         self.vpp_session.remove_vpp_config()
852         self.vpp_session = VppBFDUDPSession(
853             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
854         self.pg0.enable_capture()
855         self.vpp_session.add_vpp_config()
856         self.test_session.update(desired_min_tx=1000000,
857                                  required_min_rx=1000000)
858         bfd_session_up(self)
859         reference_packet = wait_for_bfd_packet(self)
860         time_mark = time.time()
861         interval = 300000
862         self.test_session.update(required_min_rx=interval)
863         self.test_session.send_packet()
864         extra_time = time.time() - time_mark
865         p = wait_for_bfd_packet(self)
866         # first packet is allowed to be late by time we spent doing the update
867         # calculated in extra_time
868         self.assert_in_range(p.time - reference_packet.time,
869                              .95 * 0.75 * interval / USEC_IN_SEC,
870                              1.05 * interval / USEC_IN_SEC + extra_time,
871                              "time between BFD packets")
872         reference_packet = p
873         for dummy in range(3):
874             p = wait_for_bfd_packet(self)
875             diff = p.time - reference_packet.time
876             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
877                                  1.05 * interval / USEC_IN_SEC,
878                                  "time between BFD packets")
879             reference_packet = p
880
881     @unittest.skipUnless(running_extended_tests, "part of extended tests")
882     def test_modify_req_min_rx_double(self):
883         """ modify session - double required min rx """
884         bfd_session_up(self)
885         p = wait_for_bfd_packet(self)
886         self.test_session.update(desired_min_tx=10000,
887                                  required_min_rx=10000)
888         self.test_session.send_packet()
889         # double required min rx
890         self.vpp_session.modify_parameters(
891             required_min_rx=2 * self.vpp_session.required_min_rx)
892         p = wait_for_bfd_packet(
893             self, pcap_time_min=time.time() - self.vpp_clock_offset)
894         # poll bit needs to be set
895         self.assertIn("P", p.sprintf("%BFD.flags%"),
896                       "Poll bit not set in BFD packet")
897         # finish poll sequence with final packet
898         final = self.test_session.create_packet()
899         final[BFD].flags = "F"
900         timeout = self.test_session.detect_mult * \
901             max(self.test_session.desired_min_tx,
902                 self.vpp_session.required_min_rx) / USEC_IN_SEC
903         self.test_session.send_packet(final)
904         time_mark = time.time()
905         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
906         verify_event(self, e, expected_state=BFDState.down)
907         time_to_event = time.time() - time_mark
908         self.assert_in_range(time_to_event, .9 * timeout,
909                              1.1 * timeout, "session timeout")
910
911     @unittest.skipUnless(running_extended_tests, "part of extended tests")
912     def test_modify_req_min_rx_halve(self):
913         """ modify session - halve required min rx """
914         self.vpp_session.modify_parameters(
915             required_min_rx=2 * self.vpp_session.required_min_rx)
916         bfd_session_up(self)
917         p = wait_for_bfd_packet(self)
918         self.test_session.update(desired_min_tx=10000,
919                                  required_min_rx=10000)
920         self.test_session.send_packet()
921         p = wait_for_bfd_packet(
922             self, pcap_time_min=time.time() - self.vpp_clock_offset)
923         # halve required min rx
924         old_required_min_rx = self.vpp_session.required_min_rx
925         self.vpp_session.modify_parameters(
926             required_min_rx=0.5 * self.vpp_session.required_min_rx)
927         # now we wait 0.8*3*old-req-min-rx and the session should still be up
928         self.sleep(0.8 * self.vpp_session.detect_mult *
929                    old_required_min_rx / USEC_IN_SEC,
930                    "wait before finishing poll sequence")
931         self.assert_equal(len(self.vapi.collect_events()), 0,
932                           "number of bfd events")
933         p = wait_for_bfd_packet(self)
934         # poll bit needs to be set
935         self.assertIn("P", p.sprintf("%BFD.flags%"),
936                       "Poll bit not set in BFD packet")
937         # finish poll sequence with final packet
938         final = self.test_session.create_packet()
939         final[BFD].flags = "F"
940         self.test_session.send_packet(final)
941         # now the session should time out under new conditions
942         detection_time = self.test_session.detect_mult *\
943             self.vpp_session.required_min_rx / USEC_IN_SEC
944         before = time.time()
945         e = self.vapi.wait_for_event(
946             2 * detection_time, "bfd_udp_session_details")
947         after = time.time()
948         self.assert_in_range(after - before,
949                              0.9 * detection_time,
950                              1.1 * detection_time,
951                              "time before bfd session goes down")
952         verify_event(self, e, expected_state=BFDState.down)
953
954     @unittest.skipUnless(running_extended_tests, "part of extended tests")
955     def test_modify_detect_mult(self):
956         """ modify detect multiplier """
957         bfd_session_up(self)
958         p = wait_for_bfd_packet(self)
959         self.vpp_session.modify_parameters(detect_mult=1)
960         p = wait_for_bfd_packet(
961             self, pcap_time_min=time.time() - self.vpp_clock_offset)
962         self.assert_equal(self.vpp_session.detect_mult,
963                           p[BFD].detect_mult,
964                           "detect mult")
965         # poll bit must not be set
966         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
967                          "Poll bit not set in BFD packet")
968         self.vpp_session.modify_parameters(detect_mult=10)
969         p = wait_for_bfd_packet(
970             self, pcap_time_min=time.time() - self.vpp_clock_offset)
971         self.assert_equal(self.vpp_session.detect_mult,
972                           p[BFD].detect_mult,
973                           "detect mult")
974         # poll bit must not be set
975         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
976                          "Poll bit not set in BFD packet")
977
978     @unittest.skipUnless(running_extended_tests, "part of extended tests")
979     def test_queued_poll(self):
980         """ test poll sequence queueing """
981         bfd_session_up(self)
982         p = wait_for_bfd_packet(self)
983         self.vpp_session.modify_parameters(
984             required_min_rx=2 * self.vpp_session.required_min_rx)
985         p = wait_for_bfd_packet(self)
986         poll_sequence_start = time.time()
987         poll_sequence_length_min = 0.5
988         send_final_after = time.time() + poll_sequence_length_min
989         # poll bit needs to be set
990         self.assertIn("P", p.sprintf("%BFD.flags%"),
991                       "Poll bit not set in BFD packet")
992         self.assert_equal(p[BFD].required_min_rx_interval,
993                           self.vpp_session.required_min_rx,
994                           "BFD required min rx interval")
995         self.vpp_session.modify_parameters(
996             required_min_rx=2 * self.vpp_session.required_min_rx)
997         # 2nd poll sequence should be queued now
998         # don't send the reply back yet, wait for some time to emulate
999         # longer round-trip time
1000         packet_count = 0
1001         while time.time() < send_final_after:
1002             self.test_session.send_packet()
1003             p = wait_for_bfd_packet(self)
1004             self.assert_equal(len(self.vapi.collect_events()), 0,
1005                               "number of bfd events")
1006             self.assert_equal(p[BFD].required_min_rx_interval,
1007                               self.vpp_session.required_min_rx,
1008                               "BFD required min rx interval")
1009             packet_count += 1
1010             # poll bit must be set
1011             self.assertIn("P", p.sprintf("%BFD.flags%"),
1012                           "Poll bit not set in BFD packet")
1013         final = self.test_session.create_packet()
1014         final[BFD].flags = "F"
1015         self.test_session.send_packet(final)
1016         # finish 1st with final
1017         poll_sequence_length = time.time() - poll_sequence_start
1018         # vpp must wait for some time before starting new poll sequence
1019         poll_no_2_started = False
1020         for dummy in range(2 * packet_count):
1021             p = wait_for_bfd_packet(self)
1022             self.assert_equal(len(self.vapi.collect_events()), 0,
1023                               "number of bfd events")
1024             if "P" in p.sprintf("%BFD.flags%"):
1025                 poll_no_2_started = True
1026                 if time.time() < poll_sequence_start + poll_sequence_length:
1027                     raise Exception("VPP started 2nd poll sequence too soon")
1028                 final = self.test_session.create_packet()
1029                 final[BFD].flags = "F"
1030                 self.test_session.send_packet(final)
1031                 break
1032             else:
1033                 self.test_session.send_packet()
1034         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1035         # finish 2nd with final
1036         final = self.test_session.create_packet()
1037         final[BFD].flags = "F"
1038         self.test_session.send_packet(final)
1039         p = wait_for_bfd_packet(self)
1040         # poll bit must not be set
1041         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1042                          "Poll bit set in BFD packet")
1043
1044     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1045     def test_poll_response(self):
1046         """ test correct response to control frame with poll bit set """
1047         bfd_session_up(self)
1048         poll = self.test_session.create_packet()
1049         poll[BFD].flags = "P"
1050         self.test_session.send_packet(poll)
1051         final = wait_for_bfd_packet(
1052             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1053         self.assertIn("F", final.sprintf("%BFD.flags%"))
1054
1055     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1056     def test_no_periodic_if_remote_demand(self):
1057         """ no periodic frames outside poll sequence if remote demand set """
1058         bfd_session_up(self)
1059         demand = self.test_session.create_packet()
1060         demand[BFD].flags = "D"
1061         self.test_session.send_packet(demand)
1062         transmit_time = 0.9 \
1063             * max(self.vpp_session.required_min_rx,
1064                   self.test_session.desired_min_tx) \
1065             / USEC_IN_SEC
1066         count = 0
1067         for dummy in range(self.test_session.detect_mult * 2):
1068             self.sleep(transmit_time)
1069             self.test_session.send_packet(demand)
1070             try:
1071                 p = wait_for_bfd_packet(self, timeout=0)
1072                 self.logger.error(ppp("Received unexpected packet:", p))
1073                 count += 1
1074             except CaptureTimeoutError:
1075                 pass
1076         events = self.vapi.collect_events()
1077         for e in events:
1078             self.logger.error("Received unexpected event: %s", e)
1079         self.assert_equal(count, 0, "number of packets received")
1080         self.assert_equal(len(events), 0, "number of events received")
1081
1082     def test_echo_looped_back(self):
1083         """ echo packets looped back """
1084         # don't need a session in this case..
1085         self.vpp_session.remove_vpp_config()
1086         self.pg0.enable_capture()
1087         echo_packet_count = 10
1088         # random source port low enough to increment a few times..
1089         udp_sport_tx = randint(1, 50000)
1090         udp_sport_rx = udp_sport_tx
1091         echo_packet = (Ether(src=self.pg0.remote_mac,
1092                              dst=self.pg0.local_mac) /
1093                        IP(src=self.pg0.remote_ip4,
1094                           dst=self.pg0.remote_ip4) /
1095                        UDP(dport=BFD.udp_dport_echo) /
1096                        Raw("this should be looped back"))
1097         for dummy in range(echo_packet_count):
1098             self.sleep(.01, "delay between echo packets")
1099             echo_packet[UDP].sport = udp_sport_tx
1100             udp_sport_tx += 1
1101             self.logger.debug(ppp("Sending packet:", echo_packet))
1102             self.pg0.add_stream(echo_packet)
1103             self.pg_start()
1104         for dummy in range(echo_packet_count):
1105             p = self.pg0.wait_for_packet(1)
1106             self.logger.debug(ppp("Got packet:", p))
1107             ether = p[Ether]
1108             self.assert_equal(self.pg0.remote_mac,
1109                               ether.dst, "Destination MAC")
1110             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1111             ip = p[IP]
1112             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1113             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1114             udp = p[UDP]
1115             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1116                               "UDP destination port")
1117             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1118             udp_sport_rx += 1
1119             # need to compare the hex payload here, otherwise BFD_vpp_echo
1120             # gets in way
1121             self.assertEqual(str(p[UDP].payload),
1122                              str(echo_packet[UDP].payload),
1123                              "Received packet is not the echo packet sent")
1124         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1125                           "ECHO packet identifier for test purposes)")
1126
1127     def test_echo(self):
1128         """ echo function """
1129         bfd_session_up(self)
1130         self.test_session.update(required_min_echo_rx=150000)
1131         self.test_session.send_packet()
1132         detection_time = self.test_session.detect_mult *\
1133             self.vpp_session.required_min_rx / USEC_IN_SEC
1134         # echo shouldn't work without echo source set
1135         for dummy in range(10):
1136             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1137             self.sleep(sleep, "delay before sending bfd packet")
1138             self.test_session.send_packet()
1139         p = wait_for_bfd_packet(
1140             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1141         self.assert_equal(p[BFD].required_min_rx_interval,
1142                           self.vpp_session.required_min_rx,
1143                           "BFD required min rx interval")
1144         self.test_session.send_packet()
1145         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1146         echo_seen = False
1147         # should be turned on - loopback echo packets
1148         for dummy in range(3):
1149             loop_until = time.time() + 0.75 * detection_time
1150             while time.time() < loop_until:
1151                 p = self.pg0.wait_for_packet(1)
1152                 self.logger.debug(ppp("Got packet:", p))
1153                 if p[UDP].dport == BFD.udp_dport_echo:
1154                     self.assert_equal(
1155                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1156                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1157                                         "BFD ECHO src IP equal to loopback IP")
1158                     self.logger.debug(ppp("Looping back packet:", p))
1159                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1160                                       "ECHO packet destination MAC address")
1161                     p[Ether].dst = self.pg0.local_mac
1162                     self.pg0.add_stream(p)
1163                     self.pg_start()
1164                     echo_seen = True
1165                 elif p.haslayer(BFD):
1166                     if echo_seen:
1167                         self.assertGreaterEqual(
1168                             p[BFD].required_min_rx_interval,
1169                             1000000)
1170                     if "P" in p.sprintf("%BFD.flags%"):
1171                         final = self.test_session.create_packet()
1172                         final[BFD].flags = "F"
1173                         self.test_session.send_packet(final)
1174                 else:
1175                     raise Exception(ppp("Received unknown packet:", p))
1176
1177                 self.assert_equal(len(self.vapi.collect_events()), 0,
1178                                   "number of bfd events")
1179             self.test_session.send_packet()
1180         self.assertTrue(echo_seen, "No echo packets received")
1181
1182     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1183     def test_echo_fail(self):
1184         """ session goes down if echo function fails """
1185         bfd_session_up(self)
1186         self.test_session.update(required_min_echo_rx=150000)
1187         self.test_session.send_packet()
1188         detection_time = self.test_session.detect_mult *\
1189             self.vpp_session.required_min_rx / USEC_IN_SEC
1190         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1191         # echo function should be used now, but we will drop the echo packets
1192         verified_diag = False
1193         for dummy in range(3):
1194             loop_until = time.time() + 0.75 * detection_time
1195             while time.time() < loop_until:
1196                 p = self.pg0.wait_for_packet(1)
1197                 self.logger.debug(ppp("Got packet:", p))
1198                 if p[UDP].dport == BFD.udp_dport_echo:
1199                     # dropped
1200                     pass
1201                 elif p.haslayer(BFD):
1202                     if "P" in p.sprintf("%BFD.flags%"):
1203                         self.assertGreaterEqual(
1204                             p[BFD].required_min_rx_interval,
1205                             1000000)
1206                         final = self.test_session.create_packet()
1207                         final[BFD].flags = "F"
1208                         self.test_session.send_packet(final)
1209                     if p[BFD].state == BFDState.down:
1210                         self.assert_equal(p[BFD].diag,
1211                                           BFDDiagCode.echo_function_failed,
1212                                           BFDDiagCode)
1213                         verified_diag = True
1214                 else:
1215                     raise Exception(ppp("Received unknown packet:", p))
1216             self.test_session.send_packet()
1217         events = self.vapi.collect_events()
1218         self.assert_equal(len(events), 1, "number of bfd events")
1219         self.assert_equal(events[0].state, BFDState.down, BFDState)
1220         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1221
1222     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1223     def test_echo_stop(self):
1224         """ echo function stops if peer sets required min echo rx zero """
1225         bfd_session_up(self)
1226         self.test_session.update(required_min_echo_rx=150000)
1227         self.test_session.send_packet()
1228         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1229         # wait for first echo packet
1230         while True:
1231             p = self.pg0.wait_for_packet(1)
1232             self.logger.debug(ppp("Got packet:", p))
1233             if p[UDP].dport == BFD.udp_dport_echo:
1234                 self.logger.debug(ppp("Looping back packet:", p))
1235                 p[Ether].dst = self.pg0.local_mac
1236                 self.pg0.add_stream(p)
1237                 self.pg_start()
1238                 break
1239             elif p.haslayer(BFD):
1240                 # ignore BFD
1241                 pass
1242             else:
1243                 raise Exception(ppp("Received unknown packet:", p))
1244         self.test_session.update(required_min_echo_rx=0)
1245         self.test_session.send_packet()
1246         # echo packets shouldn't arrive anymore
1247         for dummy in range(5):
1248             wait_for_bfd_packet(
1249                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1250             self.test_session.send_packet()
1251             events = self.vapi.collect_events()
1252             self.assert_equal(len(events), 0, "number of bfd events")
1253
1254     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1255     def test_echo_source_removed(self):
1256         """ echo function stops if echo source is removed """
1257         bfd_session_up(self)
1258         self.test_session.update(required_min_echo_rx=150000)
1259         self.test_session.send_packet()
1260         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1261         # wait for first echo packet
1262         while True:
1263             p = self.pg0.wait_for_packet(1)
1264             self.logger.debug(ppp("Got packet:", p))
1265             if p[UDP].dport == BFD.udp_dport_echo:
1266                 self.logger.debug(ppp("Looping back packet:", p))
1267                 p[Ether].dst = self.pg0.local_mac
1268                 self.pg0.add_stream(p)
1269                 self.pg_start()
1270                 break
1271             elif p.haslayer(BFD):
1272                 # ignore BFD
1273                 pass
1274             else:
1275                 raise Exception(ppp("Received unknown packet:", p))
1276         self.vapi.bfd_udp_del_echo_source()
1277         self.test_session.send_packet()
1278         # echo packets shouldn't arrive anymore
1279         for dummy in range(5):
1280             wait_for_bfd_packet(
1281                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1282             self.test_session.send_packet()
1283             events = self.vapi.collect_events()
1284             self.assert_equal(len(events), 0, "number of bfd events")
1285
1286     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1287     def test_stale_echo(self):
1288         """ stale echo packets don't keep a session up """
1289         bfd_session_up(self)
1290         self.test_session.update(required_min_echo_rx=150000)
1291         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1292         self.test_session.send_packet()
1293         # should be turned on - loopback echo packets
1294         echo_packet = None
1295         timeout_at = None
1296         timeout_ok = False
1297         for dummy in range(10 * self.vpp_session.detect_mult):
1298             p = self.pg0.wait_for_packet(1)
1299             if p[UDP].dport == BFD.udp_dport_echo:
1300                 if echo_packet is None:
1301                     self.logger.debug(ppp("Got first echo packet:", p))
1302                     echo_packet = p
1303                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1304                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1305                 else:
1306                     self.logger.debug(ppp("Got followup echo packet:", p))
1307                 self.logger.debug(ppp("Looping back first echo packet:", p))
1308                 echo_packet[Ether].dst = self.pg0.local_mac
1309                 self.pg0.add_stream(echo_packet)
1310                 self.pg_start()
1311             elif p.haslayer(BFD):
1312                 self.logger.debug(ppp("Got packet:", p))
1313                 if "P" in p.sprintf("%BFD.flags%"):
1314                     final = self.test_session.create_packet()
1315                     final[BFD].flags = "F"
1316                     self.test_session.send_packet(final)
1317                 if p[BFD].state == BFDState.down:
1318                     self.assertIsNotNone(
1319                         timeout_at,
1320                         "Session went down before first echo packet received")
1321                     now = time.time()
1322                     self.assertGreaterEqual(
1323                         now, timeout_at,
1324                         "Session timeout at %s, but is expected at %s" %
1325                         (now, timeout_at))
1326                     self.assert_equal(p[BFD].diag,
1327                                       BFDDiagCode.echo_function_failed,
1328                                       BFDDiagCode)
1329                     events = self.vapi.collect_events()
1330                     self.assert_equal(len(events), 1, "number of bfd events")
1331                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1332                     timeout_ok = True
1333                     break
1334             else:
1335                 raise Exception(ppp("Received unknown packet:", p))
1336             self.test_session.send_packet()
1337         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1338
1339     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1340     def test_invalid_echo_checksum(self):
1341         """ echo packets with invalid checksum don't keep a session up """
1342         bfd_session_up(self)
1343         self.test_session.update(required_min_echo_rx=150000)
1344         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1345         self.test_session.send_packet()
1346         # should be turned on - loopback echo packets
1347         timeout_at = None
1348         timeout_ok = False
1349         for dummy in range(10 * self.vpp_session.detect_mult):
1350             p = self.pg0.wait_for_packet(1)
1351             if p[UDP].dport == BFD.udp_dport_echo:
1352                 self.logger.debug(ppp("Got echo packet:", p))
1353                 if timeout_at is None:
1354                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1355                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1356                 p[BFD_vpp_echo].checksum = getrandbits(64)
1357                 p[Ether].dst = self.pg0.local_mac
1358                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1359                 self.pg0.add_stream(p)
1360                 self.pg_start()
1361             elif p.haslayer(BFD):
1362                 self.logger.debug(ppp("Got packet:", p))
1363                 if "P" in p.sprintf("%BFD.flags%"):
1364                     final = self.test_session.create_packet()
1365                     final[BFD].flags = "F"
1366                     self.test_session.send_packet(final)
1367                 if p[BFD].state == BFDState.down:
1368                     self.assertIsNotNone(
1369                         timeout_at,
1370                         "Session went down before first echo packet received")
1371                     now = time.time()
1372                     self.assertGreaterEqual(
1373                         now, timeout_at,
1374                         "Session timeout at %s, but is expected at %s" %
1375                         (now, timeout_at))
1376                     self.assert_equal(p[BFD].diag,
1377                                       BFDDiagCode.echo_function_failed,
1378                                       BFDDiagCode)
1379                     events = self.vapi.collect_events()
1380                     self.assert_equal(len(events), 1, "number of bfd events")
1381                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1382                     timeout_ok = True
1383                     break
1384             else:
1385                 raise Exception(ppp("Received unknown packet:", p))
1386             self.test_session.send_packet()
1387         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1388
1389     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1390     def test_admin_up_down(self):
1391         """ put session admin-up and admin-down """
1392         bfd_session_up(self)
1393         self.vpp_session.admin_down()
1394         self.pg0.enable_capture()
1395         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1396         verify_event(self, e, expected_state=BFDState.admin_down)
1397         for dummy in range(2):
1398             p = wait_for_bfd_packet(self)
1399             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1400         # try to bring session up - shouldn't be possible
1401         self.test_session.update(state=BFDState.init)
1402         self.test_session.send_packet()
1403         for dummy in range(2):
1404             p = wait_for_bfd_packet(self)
1405             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1406         self.vpp_session.admin_up()
1407         self.test_session.update(state=BFDState.down)
1408         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1409         verify_event(self, e, expected_state=BFDState.down)
1410         p = wait_for_bfd_packet(
1411             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1412         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1413         self.test_session.send_packet()
1414         p = wait_for_bfd_packet(
1415             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1416         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1417         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1418         verify_event(self, e, expected_state=BFDState.init)
1419         self.test_session.update(state=BFDState.up)
1420         self.test_session.send_packet()
1421         p = wait_for_bfd_packet(
1422             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1423         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1424         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1425         verify_event(self, e, expected_state=BFDState.up)
1426
1427     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1428     def test_config_change_remote_demand(self):
1429         """ configuration change while peer in demand mode """
1430         bfd_session_up(self)
1431         demand = self.test_session.create_packet()
1432         demand[BFD].flags = "D"
1433         self.test_session.send_packet(demand)
1434         self.vpp_session.modify_parameters(
1435             required_min_rx=2 * self.vpp_session.required_min_rx)
1436         p = wait_for_bfd_packet(
1437             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1438         # poll bit must be set
1439         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1440         # terminate poll sequence
1441         final = self.test_session.create_packet()
1442         final[BFD].flags = "D+F"
1443         self.test_session.send_packet(final)
1444         # vpp should be quiet now again
1445         transmit_time = 0.9 \
1446             * max(self.vpp_session.required_min_rx,
1447                   self.test_session.desired_min_tx) \
1448             / USEC_IN_SEC
1449         count = 0
1450         for dummy in range(self.test_session.detect_mult * 2):
1451             self.sleep(transmit_time)
1452             self.test_session.send_packet(demand)
1453             try:
1454                 p = wait_for_bfd_packet(self, timeout=0)
1455                 self.logger.error(ppp("Received unexpected packet:", p))
1456                 count += 1
1457             except CaptureTimeoutError:
1458                 pass
1459         events = self.vapi.collect_events()
1460         for e in events:
1461             self.logger.error("Received unexpected event: %s", e)
1462         self.assert_equal(count, 0, "number of packets received")
1463         self.assert_equal(len(events), 0, "number of events received")
1464
1465     def test_intf_deleted(self):
1466         """ interface with bfd session deleted """
1467         intf = VppLoInterface(self)
1468         intf.config_ip4()
1469         intf.admin_up()
1470         sw_if_index = intf.sw_if_index
1471         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1472         vpp_session.add_vpp_config()
1473         vpp_session.admin_up()
1474         intf.remove_vpp_config()
1475         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1476         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1477         self.assertFalse(vpp_session.query_vpp_config())
1478
1479
1480 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1481 class BFD6TestCase(VppTestCase):
1482     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1483
1484     pg0 = None
1485     vpp_clock_offset = None
1486     vpp_session = None
1487     test_session = None
1488
1489     @classmethod
1490     def setUpClass(cls):
1491         super(BFD6TestCase, cls).setUpClass()
1492         cls.vapi.cli("set log class bfd level debug")
1493         try:
1494             cls.create_pg_interfaces([0])
1495             cls.pg0.config_ip6()
1496             cls.pg0.configure_ipv6_neighbors()
1497             cls.pg0.admin_up()
1498             cls.pg0.resolve_ndp()
1499             cls.create_loopback_interfaces(1)
1500             cls.loopback0 = cls.lo_interfaces[0]
1501             cls.loopback0.config_ip6()
1502             cls.loopback0.admin_up()
1503
1504         except Exception:
1505             super(BFD6TestCase, cls).tearDownClass()
1506             raise
1507
1508     @classmethod
1509     def tearDownClass(cls):
1510         super(BFD6TestCase, cls).tearDownClass()
1511
1512     def setUp(self):
1513         super(BFD6TestCase, self).setUp()
1514         self.factory = AuthKeyFactory()
1515         self.vapi.want_bfd_events()
1516         self.pg0.enable_capture()
1517         try:
1518             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1519                                                 self.pg0.remote_ip6,
1520                                                 af=AF_INET6)
1521             self.vpp_session.add_vpp_config()
1522             self.vpp_session.admin_up()
1523             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1524             self.logger.debug(self.vapi.cli("show adj nbr"))
1525         except:
1526             self.vapi.want_bfd_events(enable_disable=0)
1527             raise
1528
1529     def tearDown(self):
1530         if not self.vpp_dead:
1531             self.vapi.want_bfd_events(enable_disable=0)
1532         self.vapi.collect_events()  # clear the event queue
1533         super(BFD6TestCase, self).tearDown()
1534
1535     def test_session_up(self):
1536         """ bring BFD session up """
1537         bfd_session_up(self)
1538
1539     def test_session_up_by_ip(self):
1540         """ bring BFD session up - first frame looked up by address pair """
1541         self.logger.info("BFD: Sending Slow control frame")
1542         self.test_session.update(my_discriminator=randint(0, 40000000))
1543         self.test_session.send_packet()
1544         self.pg0.enable_capture()
1545         p = self.pg0.wait_for_packet(1)
1546         self.assert_equal(p[BFD].your_discriminator,
1547                           self.test_session.my_discriminator,
1548                           "BFD - your discriminator")
1549         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1550         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1551                                  state=BFDState.up)
1552         self.logger.info("BFD: Waiting for event")
1553         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1554         verify_event(self, e, expected_state=BFDState.init)
1555         self.logger.info("BFD: Sending Up")
1556         self.test_session.send_packet()
1557         self.logger.info("BFD: Waiting for event")
1558         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1559         verify_event(self, e, expected_state=BFDState.up)
1560         self.logger.info("BFD: Session is Up")
1561         self.test_session.update(state=BFDState.up)
1562         self.test_session.send_packet()
1563         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1564
1565     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1566     def test_hold_up(self):
1567         """ hold BFD session up """
1568         bfd_session_up(self)
1569         for dummy in range(self.test_session.detect_mult * 2):
1570             wait_for_bfd_packet(self)
1571             self.test_session.send_packet()
1572         self.assert_equal(len(self.vapi.collect_events()), 0,
1573                           "number of bfd events")
1574         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1575
1576     def test_echo_looped_back(self):
1577         """ echo packets looped back """
1578         # don't need a session in this case..
1579         self.vpp_session.remove_vpp_config()
1580         self.pg0.enable_capture()
1581         echo_packet_count = 10
1582         # random source port low enough to increment a few times..
1583         udp_sport_tx = randint(1, 50000)
1584         udp_sport_rx = udp_sport_tx
1585         echo_packet = (Ether(src=self.pg0.remote_mac,
1586                              dst=self.pg0.local_mac) /
1587                        IPv6(src=self.pg0.remote_ip6,
1588                             dst=self.pg0.remote_ip6) /
1589                        UDP(dport=BFD.udp_dport_echo) /
1590                        Raw("this should be looped back"))
1591         for dummy in range(echo_packet_count):
1592             self.sleep(.01, "delay between echo packets")
1593             echo_packet[UDP].sport = udp_sport_tx
1594             udp_sport_tx += 1
1595             self.logger.debug(ppp("Sending packet:", echo_packet))
1596             self.pg0.add_stream(echo_packet)
1597             self.pg_start()
1598         for dummy in range(echo_packet_count):
1599             p = self.pg0.wait_for_packet(1)
1600             self.logger.debug(ppp("Got packet:", p))
1601             ether = p[Ether]
1602             self.assert_equal(self.pg0.remote_mac,
1603                               ether.dst, "Destination MAC")
1604             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1605             ip = p[IPv6]
1606             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1607             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1608             udp = p[UDP]
1609             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1610                               "UDP destination port")
1611             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1612             udp_sport_rx += 1
1613             # need to compare the hex payload here, otherwise BFD_vpp_echo
1614             # gets in way
1615             self.assertEqual(str(p[UDP].payload),
1616                              str(echo_packet[UDP].payload),
1617                              "Received packet is not the echo packet sent")
1618         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1619                           "ECHO packet identifier for test purposes)")
1620         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1621                           "ECHO packet identifier for test purposes)")
1622
1623     def test_echo(self):
1624         """ echo function """
1625         bfd_session_up(self)
1626         self.test_session.update(required_min_echo_rx=150000)
1627         self.test_session.send_packet()
1628         detection_time = self.test_session.detect_mult *\
1629             self.vpp_session.required_min_rx / USEC_IN_SEC
1630         # echo shouldn't work without echo source set
1631         for dummy in range(10):
1632             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1633             self.sleep(sleep, "delay before sending bfd packet")
1634             self.test_session.send_packet()
1635         p = wait_for_bfd_packet(
1636             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1637         self.assert_equal(p[BFD].required_min_rx_interval,
1638                           self.vpp_session.required_min_rx,
1639                           "BFD required min rx interval")
1640         self.test_session.send_packet()
1641         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1642         echo_seen = False
1643         # should be turned on - loopback echo packets
1644         for dummy in range(3):
1645             loop_until = time.time() + 0.75 * detection_time
1646             while time.time() < loop_until:
1647                 p = self.pg0.wait_for_packet(1)
1648                 self.logger.debug(ppp("Got packet:", p))
1649                 if p[UDP].dport == BFD.udp_dport_echo:
1650                     self.assert_equal(
1651                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1652                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1653                                         "BFD ECHO src IP equal to loopback IP")
1654                     self.logger.debug(ppp("Looping back packet:", p))
1655                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1656                                       "ECHO packet destination MAC address")
1657                     p[Ether].dst = self.pg0.local_mac
1658                     self.pg0.add_stream(p)
1659                     self.pg_start()
1660                     echo_seen = True
1661                 elif p.haslayer(BFD):
1662                     if echo_seen:
1663                         self.assertGreaterEqual(
1664                             p[BFD].required_min_rx_interval,
1665                             1000000)
1666                     if "P" in p.sprintf("%BFD.flags%"):
1667                         final = self.test_session.create_packet()
1668                         final[BFD].flags = "F"
1669                         self.test_session.send_packet(final)
1670                 else:
1671                     raise Exception(ppp("Received unknown packet:", p))
1672
1673                 self.assert_equal(len(self.vapi.collect_events()), 0,
1674                                   "number of bfd events")
1675             self.test_session.send_packet()
1676         self.assertTrue(echo_seen, "No echo packets received")
1677
1678     def test_intf_deleted(self):
1679         """ interface with bfd session deleted """
1680         intf = VppLoInterface(self)
1681         intf.config_ip6()
1682         intf.admin_up()
1683         sw_if_index = intf.sw_if_index
1684         vpp_session = VppBFDUDPSession(
1685             self, intf, intf.remote_ip6, af=AF_INET6)
1686         vpp_session.add_vpp_config()
1687         vpp_session.admin_up()
1688         intf.remove_vpp_config()
1689         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1690         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1691         self.assertFalse(vpp_session.query_vpp_config())
1692
1693
1694 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1695 class BFDFIBTestCase(VppTestCase):
1696     """ BFD-FIB interactions (IPv6) """
1697
1698     vpp_session = None
1699     test_session = None
1700
1701     @classmethod
1702     def setUpClass(cls):
1703         super(BFDFIBTestCase, cls).setUpClass()
1704
1705     @classmethod
1706     def tearDownClass(cls):
1707         super(BFDFIBTestCase, cls).tearDownClass()
1708
1709     def setUp(self):
1710         super(BFDFIBTestCase, self).setUp()
1711         self.create_pg_interfaces(range(1))
1712
1713         self.vapi.want_bfd_events()
1714         self.pg0.enable_capture()
1715
1716         for i in self.pg_interfaces:
1717             i.admin_up()
1718             i.config_ip6()
1719             i.configure_ipv6_neighbors()
1720
1721     def tearDown(self):
1722         if not self.vpp_dead:
1723             self.vapi.want_bfd_events(enable_disable=0)
1724
1725         super(BFDFIBTestCase, self).tearDown()
1726
1727     @staticmethod
1728     def pkt_is_not_data_traffic(p):
1729         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1730         if p.haslayer(BFD) or is_ipv6_misc(p):
1731             return True
1732         return False
1733
1734     def test_session_with_fib(self):
1735         """ BFD-FIB interactions """
1736
1737         # packets to match against both of the routes
1738         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1739               IPv6(src="3001::1", dst="2001::1") /
1740               UDP(sport=1234, dport=1234) /
1741               Raw('\xa5' * 100)),
1742              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1743               IPv6(src="3001::1", dst="2002::1") /
1744               UDP(sport=1234, dport=1234) /
1745               Raw('\xa5' * 100))]
1746
1747         # A recursive and a non-recursive route via a next-hop that
1748         # will have a BFD session
1749         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1750                                   [VppRoutePath(self.pg0.remote_ip6,
1751                                                 self.pg0.sw_if_index,
1752                                                 proto=DpoProto.DPO_PROTO_IP6)],
1753                                   is_ip6=1)
1754         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1755                                   [VppRoutePath(self.pg0.remote_ip6,
1756                                                 0xffffffff,
1757                                                 proto=DpoProto.DPO_PROTO_IP6)],
1758                                   is_ip6=1)
1759         ip_2001_s_64.add_vpp_config()
1760         ip_2002_s_64.add_vpp_config()
1761
1762         # bring the session up now the routes are present
1763         self.vpp_session = VppBFDUDPSession(self,
1764                                             self.pg0,
1765                                             self.pg0.remote_ip6,
1766                                             af=AF_INET6)
1767         self.vpp_session.add_vpp_config()
1768         self.vpp_session.admin_up()
1769         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1770
1771         # session is up - traffic passes
1772         bfd_session_up(self)
1773
1774         self.pg0.add_stream(p)
1775         self.pg_start()
1776         for packet in p:
1777             captured = self.pg0.wait_for_packet(
1778                 1,
1779                 filter_out_fn=self.pkt_is_not_data_traffic)
1780             self.assertEqual(captured[IPv6].dst,
1781                              packet[IPv6].dst)
1782
1783         # session is up - traffic is dropped
1784         bfd_session_down(self)
1785
1786         self.pg0.add_stream(p)
1787         self.pg_start()
1788         with self.assertRaises(CaptureTimeoutError):
1789             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1790
1791         # session is up - traffic passes
1792         bfd_session_up(self)
1793
1794         self.pg0.add_stream(p)
1795         self.pg_start()
1796         for packet in p:
1797             captured = self.pg0.wait_for_packet(
1798                 1,
1799                 filter_out_fn=self.pkt_is_not_data_traffic)
1800             self.assertEqual(captured[IPv6].dst,
1801                              packet[IPv6].dst)
1802
1803
1804 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1805 class BFDSHA1TestCase(VppTestCase):
1806     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1807
1808     pg0 = None
1809     vpp_clock_offset = None
1810     vpp_session = None
1811     test_session = None
1812
1813     @classmethod
1814     def setUpClass(cls):
1815         super(BFDSHA1TestCase, cls).setUpClass()
1816         cls.vapi.cli("set log class bfd level debug")
1817         try:
1818             cls.create_pg_interfaces([0])
1819             cls.pg0.config_ip4()
1820             cls.pg0.admin_up()
1821             cls.pg0.resolve_arp()
1822
1823         except Exception:
1824             super(BFDSHA1TestCase, cls).tearDownClass()
1825             raise
1826
1827     @classmethod
1828     def tearDownClass(cls):
1829         super(BFDSHA1TestCase, cls).tearDownClass()
1830
1831     def setUp(self):
1832         super(BFDSHA1TestCase, self).setUp()
1833         self.factory = AuthKeyFactory()
1834         self.vapi.want_bfd_events()
1835         self.pg0.enable_capture()
1836
1837     def tearDown(self):
1838         if not self.vpp_dead:
1839             self.vapi.want_bfd_events(enable_disable=0)
1840         self.vapi.collect_events()  # clear the event queue
1841         super(BFDSHA1TestCase, self).tearDown()
1842
1843     def test_session_up(self):
1844         """ bring BFD session up """
1845         key = self.factory.create_random_key(self)
1846         key.add_vpp_config()
1847         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1848                                             self.pg0.remote_ip4,
1849                                             sha1_key=key)
1850         self.vpp_session.add_vpp_config()
1851         self.vpp_session.admin_up()
1852         self.test_session = BFDTestSession(
1853             self, self.pg0, AF_INET, sha1_key=key,
1854             bfd_key_id=self.vpp_session.bfd_key_id)
1855         bfd_session_up(self)
1856
1857     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1858     def test_hold_up(self):
1859         """ hold BFD session up """
1860         key = self.factory.create_random_key(self)
1861         key.add_vpp_config()
1862         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1863                                             self.pg0.remote_ip4,
1864                                             sha1_key=key)
1865         self.vpp_session.add_vpp_config()
1866         self.vpp_session.admin_up()
1867         self.test_session = BFDTestSession(
1868             self, self.pg0, AF_INET, sha1_key=key,
1869             bfd_key_id=self.vpp_session.bfd_key_id)
1870         bfd_session_up(self)
1871         for dummy in range(self.test_session.detect_mult * 2):
1872             wait_for_bfd_packet(self)
1873             self.test_session.send_packet()
1874         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1875
1876     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1877     def test_hold_up_meticulous(self):
1878         """ hold BFD session up - meticulous auth """
1879         key = self.factory.create_random_key(
1880             self, BFDAuthType.meticulous_keyed_sha1)
1881         key.add_vpp_config()
1882         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1883                                             self.pg0.remote_ip4, sha1_key=key)
1884         self.vpp_session.add_vpp_config()
1885         self.vpp_session.admin_up()
1886         # specify sequence number so that it wraps
1887         self.test_session = BFDTestSession(
1888             self, self.pg0, AF_INET, sha1_key=key,
1889             bfd_key_id=self.vpp_session.bfd_key_id,
1890             our_seq_number=0xFFFFFFFF - 4)
1891         bfd_session_up(self)
1892         for dummy in range(30):
1893             wait_for_bfd_packet(self)
1894             self.test_session.inc_seq_num()
1895             self.test_session.send_packet()
1896         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1897
1898     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1899     def test_send_bad_seq_number(self):
1900         """ session is not kept alive by msgs with bad sequence numbers"""
1901         key = self.factory.create_random_key(
1902             self, BFDAuthType.meticulous_keyed_sha1)
1903         key.add_vpp_config()
1904         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1905                                             self.pg0.remote_ip4, sha1_key=key)
1906         self.vpp_session.add_vpp_config()
1907         self.test_session = BFDTestSession(
1908             self, self.pg0, AF_INET, sha1_key=key,
1909             bfd_key_id=self.vpp_session.bfd_key_id)
1910         bfd_session_up(self)
1911         detection_time = self.test_session.detect_mult *\
1912             self.vpp_session.required_min_rx / USEC_IN_SEC
1913         send_until = time.time() + 2 * detection_time
1914         while time.time() < send_until:
1915             self.test_session.send_packet()
1916             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1917                        "time between bfd packets")
1918         e = self.vapi.collect_events()
1919         # session should be down now, because the sequence numbers weren't
1920         # updated
1921         self.assert_equal(len(e), 1, "number of bfd events")
1922         verify_event(self, e[0], expected_state=BFDState.down)
1923
1924     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1925                                        legitimate_test_session,
1926                                        rogue_test_session,
1927                                        rogue_bfd_values=None):
1928         """ execute a rogue session interaction scenario
1929
1930         1. create vpp session, add config
1931         2. bring the legitimate session up
1932         3. copy the bfd values from legitimate session to rogue session
1933         4. apply rogue_bfd_values to rogue session
1934         5. set rogue session state to down
1935         6. send message to take the session down from the rogue session
1936         7. assert that the legitimate session is unaffected
1937         """
1938
1939         self.vpp_session = vpp_bfd_udp_session
1940         self.vpp_session.add_vpp_config()
1941         self.test_session = legitimate_test_session
1942         # bring vpp session up
1943         bfd_session_up(self)
1944         # send packet from rogue session
1945         rogue_test_session.update(
1946             my_discriminator=self.test_session.my_discriminator,
1947             your_discriminator=self.test_session.your_discriminator,
1948             desired_min_tx=self.test_session.desired_min_tx,
1949             required_min_rx=self.test_session.required_min_rx,
1950             detect_mult=self.test_session.detect_mult,
1951             diag=self.test_session.diag,
1952             state=self.test_session.state,
1953             auth_type=self.test_session.auth_type)
1954         if rogue_bfd_values:
1955             rogue_test_session.update(**rogue_bfd_values)
1956         rogue_test_session.update(state=BFDState.down)
1957         rogue_test_session.send_packet()
1958         wait_for_bfd_packet(self)
1959         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1960
1961     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1962     def test_mismatch_auth(self):
1963         """ session is not brought down by unauthenticated msg """
1964         key = self.factory.create_random_key(self)
1965         key.add_vpp_config()
1966         vpp_session = VppBFDUDPSession(
1967             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1968         legitimate_test_session = BFDTestSession(
1969             self, self.pg0, AF_INET, sha1_key=key,
1970             bfd_key_id=vpp_session.bfd_key_id)
1971         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1972         self.execute_rogue_session_scenario(vpp_session,
1973                                             legitimate_test_session,
1974                                             rogue_test_session)
1975
1976     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1977     def test_mismatch_bfd_key_id(self):
1978         """ session is not brought down by msg with non-existent key-id """
1979         key = self.factory.create_random_key(self)
1980         key.add_vpp_config()
1981         vpp_session = VppBFDUDPSession(
1982             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1983         # pick a different random bfd key id
1984         x = randint(0, 255)
1985         while x == vpp_session.bfd_key_id:
1986             x = randint(0, 255)
1987         legitimate_test_session = BFDTestSession(
1988             self, self.pg0, AF_INET, sha1_key=key,
1989             bfd_key_id=vpp_session.bfd_key_id)
1990         rogue_test_session = BFDTestSession(
1991             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1992         self.execute_rogue_session_scenario(vpp_session,
1993                                             legitimate_test_session,
1994                                             rogue_test_session)
1995
1996     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1997     def test_mismatched_auth_type(self):
1998         """ session is not brought down by msg with wrong auth type """
1999         key = self.factory.create_random_key(self)
2000         key.add_vpp_config()
2001         vpp_session = VppBFDUDPSession(
2002             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2003         legitimate_test_session = BFDTestSession(
2004             self, self.pg0, AF_INET, sha1_key=key,
2005             bfd_key_id=vpp_session.bfd_key_id)
2006         rogue_test_session = BFDTestSession(
2007             self, self.pg0, AF_INET, sha1_key=key,
2008             bfd_key_id=vpp_session.bfd_key_id)
2009         self.execute_rogue_session_scenario(
2010             vpp_session, legitimate_test_session, rogue_test_session,
2011             {'auth_type': BFDAuthType.keyed_md5})
2012
2013     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2014     def test_restart(self):
2015         """ simulate remote peer restart and resynchronization """
2016         key = self.factory.create_random_key(
2017             self, BFDAuthType.meticulous_keyed_sha1)
2018         key.add_vpp_config()
2019         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2020                                             self.pg0.remote_ip4, sha1_key=key)
2021         self.vpp_session.add_vpp_config()
2022         self.test_session = BFDTestSession(
2023             self, self.pg0, AF_INET, sha1_key=key,
2024             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2025         bfd_session_up(self)
2026         # don't send any packets for 2*detection_time
2027         detection_time = self.test_session.detect_mult *\
2028             self.vpp_session.required_min_rx / USEC_IN_SEC
2029         self.sleep(2 * detection_time, "simulating peer restart")
2030         events = self.vapi.collect_events()
2031         self.assert_equal(len(events), 1, "number of bfd events")
2032         verify_event(self, events[0], expected_state=BFDState.down)
2033         self.test_session.update(state=BFDState.down)
2034         # reset sequence number
2035         self.test_session.our_seq_number = 0
2036         self.test_session.vpp_seq_number = None
2037         # now throw away any pending packets
2038         self.pg0.enable_capture()
2039         bfd_session_up(self)
2040
2041
2042 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2043 class BFDAuthOnOffTestCase(VppTestCase):
2044     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2045
2046     pg0 = None
2047     vpp_session = None
2048     test_session = None
2049
2050     @classmethod
2051     def setUpClass(cls):
2052         super(BFDAuthOnOffTestCase, cls).setUpClass()
2053         cls.vapi.cli("set log class bfd level debug")
2054         try:
2055             cls.create_pg_interfaces([0])
2056             cls.pg0.config_ip4()
2057             cls.pg0.admin_up()
2058             cls.pg0.resolve_arp()
2059
2060         except Exception:
2061             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2062             raise
2063
2064     @classmethod
2065     def tearDownClass(cls):
2066         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2067
2068     def setUp(self):
2069         super(BFDAuthOnOffTestCase, self).setUp()
2070         self.factory = AuthKeyFactory()
2071         self.vapi.want_bfd_events()
2072         self.pg0.enable_capture()
2073
2074     def tearDown(self):
2075         if not self.vpp_dead:
2076             self.vapi.want_bfd_events(enable_disable=0)
2077         self.vapi.collect_events()  # clear the event queue
2078         super(BFDAuthOnOffTestCase, self).tearDown()
2079
2080     def test_auth_on_immediate(self):
2081         """ turn auth on without disturbing session state (immediate) """
2082         key = self.factory.create_random_key(self)
2083         key.add_vpp_config()
2084         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2085                                             self.pg0.remote_ip4)
2086         self.vpp_session.add_vpp_config()
2087         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2088         bfd_session_up(self)
2089         for dummy in range(self.test_session.detect_mult * 2):
2090             p = wait_for_bfd_packet(self)
2091             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2092             self.test_session.send_packet()
2093         self.vpp_session.activate_auth(key)
2094         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2095         self.test_session.sha1_key = key
2096         for dummy in range(self.test_session.detect_mult * 2):
2097             p = wait_for_bfd_packet(self)
2098             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2099             self.test_session.send_packet()
2100         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2101         self.assert_equal(len(self.vapi.collect_events()), 0,
2102                           "number of bfd events")
2103
2104     def test_auth_off_immediate(self):
2105         """ turn auth off without disturbing session state (immediate) """
2106         key = self.factory.create_random_key(self)
2107         key.add_vpp_config()
2108         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2109                                             self.pg0.remote_ip4, sha1_key=key)
2110         self.vpp_session.add_vpp_config()
2111         self.test_session = BFDTestSession(
2112             self, self.pg0, AF_INET, sha1_key=key,
2113             bfd_key_id=self.vpp_session.bfd_key_id)
2114         bfd_session_up(self)
2115         # self.vapi.want_bfd_events(enable_disable=0)
2116         for dummy in range(self.test_session.detect_mult * 2):
2117             p = wait_for_bfd_packet(self)
2118             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2119             self.test_session.inc_seq_num()
2120             self.test_session.send_packet()
2121         self.vpp_session.deactivate_auth()
2122         self.test_session.bfd_key_id = None
2123         self.test_session.sha1_key = None
2124         for dummy in range(self.test_session.detect_mult * 2):
2125             p = wait_for_bfd_packet(self)
2126             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2127             self.test_session.inc_seq_num()
2128             self.test_session.send_packet()
2129         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2130         self.assert_equal(len(self.vapi.collect_events()), 0,
2131                           "number of bfd events")
2132
2133     def test_auth_change_key_immediate(self):
2134         """ change auth key without disturbing session state (immediate) """
2135         key1 = self.factory.create_random_key(self)
2136         key1.add_vpp_config()
2137         key2 = self.factory.create_random_key(self)
2138         key2.add_vpp_config()
2139         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2140                                             self.pg0.remote_ip4, sha1_key=key1)
2141         self.vpp_session.add_vpp_config()
2142         self.test_session = BFDTestSession(
2143             self, self.pg0, AF_INET, sha1_key=key1,
2144             bfd_key_id=self.vpp_session.bfd_key_id)
2145         bfd_session_up(self)
2146         for dummy in range(self.test_session.detect_mult * 2):
2147             p = wait_for_bfd_packet(self)
2148             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2149             self.test_session.send_packet()
2150         self.vpp_session.activate_auth(key2)
2151         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2152         self.test_session.sha1_key = key2
2153         for dummy in range(self.test_session.detect_mult * 2):
2154             p = wait_for_bfd_packet(self)
2155             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2156             self.test_session.send_packet()
2157         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2158         self.assert_equal(len(self.vapi.collect_events()), 0,
2159                           "number of bfd events")
2160
2161     def test_auth_on_delayed(self):
2162         """ turn auth on without disturbing session state (delayed) """
2163         key = self.factory.create_random_key(self)
2164         key.add_vpp_config()
2165         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2166                                             self.pg0.remote_ip4)
2167         self.vpp_session.add_vpp_config()
2168         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2169         bfd_session_up(self)
2170         for dummy in range(self.test_session.detect_mult * 2):
2171             wait_for_bfd_packet(self)
2172             self.test_session.send_packet()
2173         self.vpp_session.activate_auth(key, delayed=True)
2174         for dummy in range(self.test_session.detect_mult * 2):
2175             p = wait_for_bfd_packet(self)
2176             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2177             self.test_session.send_packet()
2178         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2179         self.test_session.sha1_key = key
2180         self.test_session.send_packet()
2181         for dummy in range(self.test_session.detect_mult * 2):
2182             p = wait_for_bfd_packet(self)
2183             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2184             self.test_session.send_packet()
2185         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2186         self.assert_equal(len(self.vapi.collect_events()), 0,
2187                           "number of bfd events")
2188
2189     def test_auth_off_delayed(self):
2190         """ turn auth off without disturbing session state (delayed) """
2191         key = self.factory.create_random_key(self)
2192         key.add_vpp_config()
2193         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2194                                             self.pg0.remote_ip4, sha1_key=key)
2195         self.vpp_session.add_vpp_config()
2196         self.test_session = BFDTestSession(
2197             self, self.pg0, AF_INET, sha1_key=key,
2198             bfd_key_id=self.vpp_session.bfd_key_id)
2199         bfd_session_up(self)
2200         for dummy in range(self.test_session.detect_mult * 2):
2201             p = wait_for_bfd_packet(self)
2202             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2203             self.test_session.send_packet()
2204         self.vpp_session.deactivate_auth(delayed=True)
2205         for dummy in range(self.test_session.detect_mult * 2):
2206             p = wait_for_bfd_packet(self)
2207             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2208             self.test_session.send_packet()
2209         self.test_session.bfd_key_id = None
2210         self.test_session.sha1_key = None
2211         self.test_session.send_packet()
2212         for dummy in range(self.test_session.detect_mult * 2):
2213             p = wait_for_bfd_packet(self)
2214             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2215             self.test_session.send_packet()
2216         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2217         self.assert_equal(len(self.vapi.collect_events()), 0,
2218                           "number of bfd events")
2219
2220     def test_auth_change_key_delayed(self):
2221         """ change auth key without disturbing session state (delayed) """
2222         key1 = self.factory.create_random_key(self)
2223         key1.add_vpp_config()
2224         key2 = self.factory.create_random_key(self)
2225         key2.add_vpp_config()
2226         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2227                                             self.pg0.remote_ip4, sha1_key=key1)
2228         self.vpp_session.add_vpp_config()
2229         self.vpp_session.admin_up()
2230         self.test_session = BFDTestSession(
2231             self, self.pg0, AF_INET, sha1_key=key1,
2232             bfd_key_id=self.vpp_session.bfd_key_id)
2233         bfd_session_up(self)
2234         for dummy in range(self.test_session.detect_mult * 2):
2235             p = wait_for_bfd_packet(self)
2236             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2237             self.test_session.send_packet()
2238         self.vpp_session.activate_auth(key2, delayed=True)
2239         for dummy in range(self.test_session.detect_mult * 2):
2240             p = wait_for_bfd_packet(self)
2241             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2242             self.test_session.send_packet()
2243         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2244         self.test_session.sha1_key = key2
2245         self.test_session.send_packet()
2246         for dummy in range(self.test_session.detect_mult * 2):
2247             p = wait_for_bfd_packet(self)
2248             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2249             self.test_session.send_packet()
2250         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2251         self.assert_equal(len(self.vapi.collect_events()), 0,
2252                           "number of bfd events")
2253
2254
2255 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2256 class BFDCLITestCase(VppTestCase):
2257     """Bidirectional Forwarding Detection (BFD) (CLI) """
2258     pg0 = None
2259
2260     @classmethod
2261     def setUpClass(cls):
2262         super(BFDCLITestCase, cls).setUpClass()
2263         cls.vapi.cli("set log class bfd level debug")
2264         try:
2265             cls.create_pg_interfaces((0,))
2266             cls.pg0.config_ip4()
2267             cls.pg0.config_ip6()
2268             cls.pg0.resolve_arp()
2269             cls.pg0.resolve_ndp()
2270
2271         except Exception:
2272             super(BFDCLITestCase, cls).tearDownClass()
2273             raise
2274
2275     @classmethod
2276     def tearDownClass(cls):
2277         super(BFDCLITestCase, cls).tearDownClass()
2278
2279     def setUp(self):
2280         super(BFDCLITestCase, self).setUp()
2281         self.factory = AuthKeyFactory()
2282         self.pg0.enable_capture()
2283
2284     def tearDown(self):
2285         try:
2286             self.vapi.want_bfd_events(enable_disable=0)
2287         except UnexpectedApiReturnValueError:
2288             # some tests aren't subscribed, so this is not an issue
2289             pass
2290         self.vapi.collect_events()  # clear the event queue
2291         super(BFDCLITestCase, self).tearDown()
2292
2293     def cli_verify_no_response(self, cli):
2294         """ execute a CLI, asserting that the response is empty """
2295         self.assert_equal(self.vapi.cli(cli),
2296                           "",
2297                           "CLI command response")
2298
2299     def cli_verify_response(self, cli, expected):
2300         """ execute a CLI, asserting that the response matches expectation """
2301         self.assert_equal(self.vapi.cli(cli).strip(),
2302                           expected,
2303                           "CLI command response")
2304
2305     def test_show(self):
2306         """ show commands """
2307         k1 = self.factory.create_random_key(self)
2308         k1.add_vpp_config()
2309         k2 = self.factory.create_random_key(
2310             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2311         k2.add_vpp_config()
2312         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2313         s1.add_vpp_config()
2314         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2315                               sha1_key=k2)
2316         s2.add_vpp_config()
2317         self.logger.info(self.vapi.ppcli("show bfd keys"))
2318         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2319         self.logger.info(self.vapi.ppcli("show bfd"))
2320
2321     def test_set_del_sha1_key(self):
2322         """ set/delete SHA1 auth key """
2323         k = self.factory.create_random_key(self)
2324         self.registry.register(k, self.logger)
2325         self.cli_verify_no_response(
2326             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2327             (k.conf_key_id,
2328                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2329         self.assertTrue(k.query_vpp_config())
2330         self.vpp_session = VppBFDUDPSession(
2331             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2332         self.vpp_session.add_vpp_config()
2333         self.test_session = \
2334             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2335                            bfd_key_id=self.vpp_session.bfd_key_id)
2336         self.vapi.want_bfd_events()
2337         bfd_session_up(self)
2338         bfd_session_down(self)
2339         # try to replace the secret for the key - should fail because the key
2340         # is in-use
2341         k2 = self.factory.create_random_key(self)
2342         self.cli_verify_response(
2343             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2344             (k.conf_key_id,
2345                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2346             "bfd key set: `bfd_auth_set_key' API call failed, "
2347             "rv=-103:BFD object in use")
2348         # manipulating the session using old secret should still work
2349         bfd_session_up(self)
2350         bfd_session_down(self)
2351         self.vpp_session.remove_vpp_config()
2352         self.cli_verify_no_response(
2353             "bfd key del conf-key-id %s" % k.conf_key_id)
2354         self.assertFalse(k.query_vpp_config())
2355
2356     def test_set_del_meticulous_sha1_key(self):
2357         """ set/delete meticulous SHA1 auth key """
2358         k = self.factory.create_random_key(
2359             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2360         self.registry.register(k, self.logger)
2361         self.cli_verify_no_response(
2362             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2363             (k.conf_key_id,
2364                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2365         self.assertTrue(k.query_vpp_config())
2366         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2367                                             self.pg0.remote_ip6, af=AF_INET6,
2368                                             sha1_key=k)
2369         self.vpp_session.add_vpp_config()
2370         self.vpp_session.admin_up()
2371         self.test_session = \
2372             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2373                            bfd_key_id=self.vpp_session.bfd_key_id)
2374         self.vapi.want_bfd_events()
2375         bfd_session_up(self)
2376         bfd_session_down(self)
2377         # try to replace the secret for the key - should fail because the key
2378         # is in-use
2379         k2 = self.factory.create_random_key(self)
2380         self.cli_verify_response(
2381             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2382             (k.conf_key_id,
2383                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2384             "bfd key set: `bfd_auth_set_key' API call failed, "
2385             "rv=-103:BFD object in use")
2386         # manipulating the session using old secret should still work
2387         bfd_session_up(self)
2388         bfd_session_down(self)
2389         self.vpp_session.remove_vpp_config()
2390         self.cli_verify_no_response(
2391             "bfd key del conf-key-id %s" % k.conf_key_id)
2392         self.assertFalse(k.query_vpp_config())
2393
2394     def test_add_mod_del_bfd_udp(self):
2395         """ create/modify/delete IPv4 BFD UDP session """
2396         vpp_session = VppBFDUDPSession(
2397             self, self.pg0, self.pg0.remote_ip4)
2398         self.registry.register(vpp_session, self.logger)
2399         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2400             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2401             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2402                                 self.pg0.remote_ip4,
2403                                 vpp_session.desired_min_tx,
2404                                 vpp_session.required_min_rx,
2405                                 vpp_session.detect_mult)
2406         self.cli_verify_no_response(cli_add_cmd)
2407         # 2nd add should fail
2408         self.cli_verify_response(
2409             cli_add_cmd,
2410             "bfd udp session add: `bfd_add_add_session' API call"
2411             " failed, rv=-101:Duplicate BFD object")
2412         verify_bfd_session_config(self, vpp_session)
2413         mod_session = VppBFDUDPSession(
2414             self, self.pg0, self.pg0.remote_ip4,
2415             required_min_rx=2 * vpp_session.required_min_rx,
2416             desired_min_tx=3 * vpp_session.desired_min_tx,
2417             detect_mult=4 * vpp_session.detect_mult)
2418         self.cli_verify_no_response(
2419             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2420             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2421             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2422              mod_session.desired_min_tx, mod_session.required_min_rx,
2423              mod_session.detect_mult))
2424         verify_bfd_session_config(self, mod_session)
2425         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2426             "peer-addr %s" % (self.pg0.name,
2427                               self.pg0.local_ip4, self.pg0.remote_ip4)
2428         self.cli_verify_no_response(cli_del_cmd)
2429         # 2nd del is expected to fail
2430         self.cli_verify_response(
2431             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2432             " failed, rv=-102:No such BFD object")
2433         self.assertFalse(vpp_session.query_vpp_config())
2434
2435     def test_add_mod_del_bfd_udp6(self):
2436         """ create/modify/delete IPv6 BFD UDP session """
2437         vpp_session = VppBFDUDPSession(
2438             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2439         self.registry.register(vpp_session, self.logger)
2440         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2441             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2442             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2443                                 self.pg0.remote_ip6,
2444                                 vpp_session.desired_min_tx,
2445                                 vpp_session.required_min_rx,
2446                                 vpp_session.detect_mult)
2447         self.cli_verify_no_response(cli_add_cmd)
2448         # 2nd add should fail
2449         self.cli_verify_response(
2450             cli_add_cmd,
2451             "bfd udp session add: `bfd_add_add_session' API call"
2452             " failed, rv=-101:Duplicate BFD object")
2453         verify_bfd_session_config(self, vpp_session)
2454         mod_session = VppBFDUDPSession(
2455             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2456             required_min_rx=2 * vpp_session.required_min_rx,
2457             desired_min_tx=3 * vpp_session.desired_min_tx,
2458             detect_mult=4 * vpp_session.detect_mult)
2459         self.cli_verify_no_response(
2460             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2461             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2462             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2463              mod_session.desired_min_tx,
2464              mod_session.required_min_rx, mod_session.detect_mult))
2465         verify_bfd_session_config(self, mod_session)
2466         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2467             "peer-addr %s" % (self.pg0.name,
2468                               self.pg0.local_ip6, self.pg0.remote_ip6)
2469         self.cli_verify_no_response(cli_del_cmd)
2470         # 2nd del is expected to fail
2471         self.cli_verify_response(
2472             cli_del_cmd,
2473             "bfd udp session del: `bfd_udp_del_session' API call"
2474             " failed, rv=-102:No such BFD object")
2475         self.assertFalse(vpp_session.query_vpp_config())
2476
2477     def test_add_mod_del_bfd_udp_auth(self):
2478         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2479         key = self.factory.create_random_key(self)
2480         key.add_vpp_config()
2481         vpp_session = VppBFDUDPSession(
2482             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2483         self.registry.register(vpp_session, self.logger)
2484         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2485             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2486             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2487             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2488                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2489                vpp_session.detect_mult, key.conf_key_id,
2490                vpp_session.bfd_key_id)
2491         self.cli_verify_no_response(cli_add_cmd)
2492         # 2nd add should fail
2493         self.cli_verify_response(
2494             cli_add_cmd,
2495             "bfd udp session add: `bfd_add_add_session' API call"
2496             " failed, rv=-101:Duplicate BFD object")
2497         verify_bfd_session_config(self, vpp_session)
2498         mod_session = VppBFDUDPSession(
2499             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2500             bfd_key_id=vpp_session.bfd_key_id,
2501             required_min_rx=2 * vpp_session.required_min_rx,
2502             desired_min_tx=3 * vpp_session.desired_min_tx,
2503             detect_mult=4 * vpp_session.detect_mult)
2504         self.cli_verify_no_response(
2505             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2506             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2507             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2508              mod_session.desired_min_tx,
2509              mod_session.required_min_rx, mod_session.detect_mult))
2510         verify_bfd_session_config(self, mod_session)
2511         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2512             "peer-addr %s" % (self.pg0.name,
2513                               self.pg0.local_ip4, self.pg0.remote_ip4)
2514         self.cli_verify_no_response(cli_del_cmd)
2515         # 2nd del is expected to fail
2516         self.cli_verify_response(
2517             cli_del_cmd,
2518             "bfd udp session del: `bfd_udp_del_session' API call"
2519             " failed, rv=-102:No such BFD object")
2520         self.assertFalse(vpp_session.query_vpp_config())
2521
2522     def test_add_mod_del_bfd_udp6_auth(self):
2523         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2524         key = self.factory.create_random_key(
2525             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2526         key.add_vpp_config()
2527         vpp_session = VppBFDUDPSession(
2528             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2529         self.registry.register(vpp_session, self.logger)
2530         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2531             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2532             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2533             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2534                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2535                vpp_session.detect_mult, key.conf_key_id,
2536                vpp_session.bfd_key_id)
2537         self.cli_verify_no_response(cli_add_cmd)
2538         # 2nd add should fail
2539         self.cli_verify_response(
2540             cli_add_cmd,
2541             "bfd udp session add: `bfd_add_add_session' API call"
2542             " failed, rv=-101:Duplicate BFD object")
2543         verify_bfd_session_config(self, vpp_session)
2544         mod_session = VppBFDUDPSession(
2545             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2546             bfd_key_id=vpp_session.bfd_key_id,
2547             required_min_rx=2 * vpp_session.required_min_rx,
2548             desired_min_tx=3 * vpp_session.desired_min_tx,
2549             detect_mult=4 * vpp_session.detect_mult)
2550         self.cli_verify_no_response(
2551             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2552             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2553             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2554              mod_session.desired_min_tx,
2555              mod_session.required_min_rx, mod_session.detect_mult))
2556         verify_bfd_session_config(self, mod_session)
2557         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2558             "peer-addr %s" % (self.pg0.name,
2559                               self.pg0.local_ip6, self.pg0.remote_ip6)
2560         self.cli_verify_no_response(cli_del_cmd)
2561         # 2nd del is expected to fail
2562         self.cli_verify_response(
2563             cli_del_cmd,
2564             "bfd udp session del: `bfd_udp_del_session' API call"
2565             " failed, rv=-102:No such BFD object")
2566         self.assertFalse(vpp_session.query_vpp_config())
2567
2568     def test_auth_on_off(self):
2569         """ turn authentication on and off """
2570         key = self.factory.create_random_key(
2571             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2572         key.add_vpp_config()
2573         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2574         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2575                                         sha1_key=key)
2576         session.add_vpp_config()
2577         cli_activate = \
2578             "bfd udp session auth activate interface %s local-addr %s "\
2579             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2580             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2581                key.conf_key_id, auth_session.bfd_key_id)
2582         self.cli_verify_no_response(cli_activate)
2583         verify_bfd_session_config(self, auth_session)
2584         self.cli_verify_no_response(cli_activate)
2585         verify_bfd_session_config(self, auth_session)
2586         cli_deactivate = \
2587             "bfd udp session auth deactivate interface %s local-addr %s "\
2588             "peer-addr %s "\
2589             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2590         self.cli_verify_no_response(cli_deactivate)
2591         verify_bfd_session_config(self, session)
2592         self.cli_verify_no_response(cli_deactivate)
2593         verify_bfd_session_config(self, session)
2594
2595     def test_auth_on_off_delayed(self):
2596         """ turn authentication on and off (delayed) """
2597         key = self.factory.create_random_key(
2598             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2599         key.add_vpp_config()
2600         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2601         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2602                                         sha1_key=key)
2603         session.add_vpp_config()
2604         cli_activate = \
2605             "bfd udp session auth activate interface %s local-addr %s "\
2606             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2607             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2608                key.conf_key_id, auth_session.bfd_key_id)
2609         self.cli_verify_no_response(cli_activate)
2610         verify_bfd_session_config(self, auth_session)
2611         self.cli_verify_no_response(cli_activate)
2612         verify_bfd_session_config(self, auth_session)
2613         cli_deactivate = \
2614             "bfd udp session auth deactivate interface %s local-addr %s "\
2615             "peer-addr %s delayed yes"\
2616             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2617         self.cli_verify_no_response(cli_deactivate)
2618         verify_bfd_session_config(self, session)
2619         self.cli_verify_no_response(cli_deactivate)
2620         verify_bfd_session_config(self, session)
2621
2622     def test_admin_up_down(self):
2623         """ put session admin-up and admin-down """
2624         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2625         session.add_vpp_config()
2626         cli_down = \
2627             "bfd udp session set-flags admin down interface %s local-addr %s "\
2628             "peer-addr %s "\
2629             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2630         cli_up = \
2631             "bfd udp session set-flags admin up interface %s local-addr %s "\
2632             "peer-addr %s "\
2633             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2634         self.cli_verify_no_response(cli_down)
2635         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2636         self.cli_verify_no_response(cli_up)
2637         verify_bfd_session_config(self, session, state=BFDState.down)
2638
2639     def test_set_del_udp_echo_source(self):
2640         """ set/del udp echo source """
2641         self.create_loopback_interfaces(1)
2642         self.loopback0 = self.lo_interfaces[0]
2643         self.loopback0.admin_up()
2644         self.cli_verify_response("show bfd echo-source",
2645                                  "UDP echo source is not set.")
2646         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2647         self.cli_verify_no_response(cli_set)
2648         self.cli_verify_response("show bfd echo-source",
2649                                  "UDP echo source is: %s\n"
2650                                  "IPv4 address usable as echo source: none\n"
2651                                  "IPv6 address usable as echo source: none" %
2652                                  self.loopback0.name)
2653         self.loopback0.config_ip4()
2654         unpacked = unpack("!L", self.loopback0.local_ip4n)
2655         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2656         self.cli_verify_response("show bfd echo-source",
2657                                  "UDP echo source is: %s\n"
2658                                  "IPv4 address usable as echo source: %s\n"
2659                                  "IPv6 address usable as echo source: none" %
2660                                  (self.loopback0.name, echo_ip4))
2661         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2662         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2663                                             unpacked[2], unpacked[3] ^ 1))
2664         self.loopback0.config_ip6()
2665         self.cli_verify_response("show bfd echo-source",
2666                                  "UDP echo source is: %s\n"
2667                                  "IPv4 address usable as echo source: %s\n"
2668                                  "IPv6 address usable as echo source: %s" %
2669                                  (self.loopback0.name, echo_ip4, echo_ip6))
2670         cli_del = "bfd udp echo-source del"
2671         self.cli_verify_no_response(cli_del)
2672         self.cli_verify_response("show bfd echo-source",
2673                                  "UDP echo source is not set.")
2674
2675 if __name__ == '__main__':
2676     unittest.main(testRunner=VppTestRunner)