VPP-1508: Use scapy.compat to manage packet level library differences.
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError
29 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
30
31 USEC_IN_SEC = 1000000
32
33
34 class AuthKeyFactory(object):
35     """Factory class for creating auth keys with unique conf key ID"""
36
37     def __init__(self):
38         self._conf_key_ids = {}
39
40     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
41         """ create a random key with unique conf key id """
42         conf_key_id = randint(0, 0xFFFFFFFF)
43         while conf_key_id in self._conf_key_ids:
44             conf_key_id = randint(0, 0xFFFFFFFF)
45         self._conf_key_ids[conf_key_id] = 1
46         key = scapy.compat.raw(
47             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
48         return VppBFDAuthKey(test=test, auth_type=auth_type,
49                              conf_key_id=conf_key_id, key=key)
50
51
52 @unittest.skipUnless(running_extended_tests, "part of extended tests")
53 class BFDAPITestCase(VppTestCase):
54     """Bidirectional Forwarding Detection (BFD) - API"""
55
56     pg0 = None
57     pg1 = None
58
59     @classmethod
60     def setUpClass(cls):
61         super(BFDAPITestCase, cls).setUpClass()
62         cls.vapi.cli("set log class bfd level debug")
63         try:
64             cls.create_pg_interfaces(range(2))
65             for i in cls.pg_interfaces:
66                 i.config_ip4()
67                 i.config_ip6()
68                 i.resolve_arp()
69
70         except Exception:
71             super(BFDAPITestCase, cls).tearDownClass()
72             raise
73
74     @classmethod
75     def tearDownClass(cls):
76         super(BFDAPITestCase, cls).tearDownClass()
77
78     def setUp(self):
79         super(BFDAPITestCase, self).setUp()
80         self.factory = AuthKeyFactory()
81
82     def test_add_bfd(self):
83         """ create a BFD session """
84         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
85         session.add_vpp_config()
86         self.logger.debug("Session state is %s", session.state)
87         session.remove_vpp_config()
88         session.add_vpp_config()
89         self.logger.debug("Session state is %s", session.state)
90         session.remove_vpp_config()
91
92     def test_double_add(self):
93         """ create the same BFD session twice (negative case) """
94         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
95         session.add_vpp_config()
96
97         with self.vapi.assert_negative_api_retval():
98             session.add_vpp_config()
99
100         session.remove_vpp_config()
101
102     def test_add_bfd6(self):
103         """ create IPv6 BFD session """
104         session = VppBFDUDPSession(
105             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
106         session.add_vpp_config()
107         self.logger.debug("Session state is %s", session.state)
108         session.remove_vpp_config()
109         session.add_vpp_config()
110         self.logger.debug("Session state is %s", session.state)
111         session.remove_vpp_config()
112
113     def test_mod_bfd(self):
114         """ modify BFD session parameters """
115         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
116                                    desired_min_tx=50000,
117                                    required_min_rx=10000,
118                                    detect_mult=1)
119         session.add_vpp_config()
120         s = session.get_bfd_udp_session_dump_entry()
121         self.assert_equal(session.desired_min_tx,
122                           s.desired_min_tx,
123                           "desired min transmit interval")
124         self.assert_equal(session.required_min_rx,
125                           s.required_min_rx,
126                           "required min receive interval")
127         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
128         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
129                                   required_min_rx=session.required_min_rx * 2,
130                                   detect_mult=session.detect_mult * 2)
131         s = session.get_bfd_udp_session_dump_entry()
132         self.assert_equal(session.desired_min_tx,
133                           s.desired_min_tx,
134                           "desired min transmit interval")
135         self.assert_equal(session.required_min_rx,
136                           s.required_min_rx,
137                           "required min receive interval")
138         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
139
140     def test_add_sha1_keys(self):
141         """ add SHA1 keys """
142         key_count = 10
143         keys = [self.factory.create_random_key(
144             self) for i in range(0, key_count)]
145         for key in keys:
146             self.assertFalse(key.query_vpp_config())
147         for key in keys:
148             key.add_vpp_config()
149         for key in keys:
150             self.assertTrue(key.query_vpp_config())
151         # remove randomly
152         indexes = range(key_count)
153         shuffle(indexes)
154         removed = []
155         for i in indexes:
156             key = keys[i]
157             key.remove_vpp_config()
158             removed.append(i)
159             for j in range(key_count):
160                 key = keys[j]
161                 if j in removed:
162                     self.assertFalse(key.query_vpp_config())
163                 else:
164                     self.assertTrue(key.query_vpp_config())
165         # should be removed now
166         for key in keys:
167             self.assertFalse(key.query_vpp_config())
168         # add back and remove again
169         for key in keys:
170             key.add_vpp_config()
171         for key in keys:
172             self.assertTrue(key.query_vpp_config())
173         for key in keys:
174             key.remove_vpp_config()
175         for key in keys:
176             self.assertFalse(key.query_vpp_config())
177
178     def test_add_bfd_sha1(self):
179         """ create a BFD session (SHA1) """
180         key = self.factory.create_random_key(self)
181         key.add_vpp_config()
182         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
183                                    sha1_key=key)
184         session.add_vpp_config()
185         self.logger.debug("Session state is %s", session.state)
186         session.remove_vpp_config()
187         session.add_vpp_config()
188         self.logger.debug("Session state is %s", session.state)
189         session.remove_vpp_config()
190
191     def test_double_add_sha1(self):
192         """ create the same BFD session twice (negative case) (SHA1) """
193         key = self.factory.create_random_key(self)
194         key.add_vpp_config()
195         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
196                                    sha1_key=key)
197         session.add_vpp_config()
198         with self.assertRaises(Exception):
199             session.add_vpp_config()
200
201     def test_add_auth_nonexistent_key(self):
202         """ create BFD session using non-existent SHA1 (negative case) """
203         session = VppBFDUDPSession(
204             self, self.pg0, self.pg0.remote_ip4,
205             sha1_key=self.factory.create_random_key(self))
206         with self.assertRaises(Exception):
207             session.add_vpp_config()
208
209     def test_shared_sha1_key(self):
210         """ share single SHA1 key between multiple BFD sessions """
211         key = self.factory.create_random_key(self)
212         key.add_vpp_config()
213         sessions = [
214             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
215                              sha1_key=key),
216             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
217                              sha1_key=key, af=AF_INET6),
218             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
219                              sha1_key=key),
220             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
221                              sha1_key=key, af=AF_INET6)]
222         for s in sessions:
223             s.add_vpp_config()
224         removed = 0
225         for s in sessions:
226             e = key.get_bfd_auth_keys_dump_entry()
227             self.assert_equal(e.use_count, len(sessions) - removed,
228                               "Use count for shared key")
229             s.remove_vpp_config()
230             removed += 1
231         e = key.get_bfd_auth_keys_dump_entry()
232         self.assert_equal(e.use_count, len(sessions) - removed,
233                           "Use count for shared key")
234
235     def test_activate_auth(self):
236         """ activate SHA1 authentication """
237         key = self.factory.create_random_key(self)
238         key.add_vpp_config()
239         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
240         session.add_vpp_config()
241         session.activate_auth(key)
242
243     def test_deactivate_auth(self):
244         """ deactivate SHA1 authentication """
245         key = self.factory.create_random_key(self)
246         key.add_vpp_config()
247         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
248         session.add_vpp_config()
249         session.activate_auth(key)
250         session.deactivate_auth()
251
252     def test_change_key(self):
253         """ change SHA1 key """
254         key1 = self.factory.create_random_key(self)
255         key2 = self.factory.create_random_key(self)
256         while key2.conf_key_id == key1.conf_key_id:
257             key2 = self.factory.create_random_key(self)
258         key1.add_vpp_config()
259         key2.add_vpp_config()
260         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
261                                    sha1_key=key1)
262         session.add_vpp_config()
263         session.activate_auth(key2)
264
265     def test_set_del_udp_echo_source(self):
266         """ set/del udp echo source """
267         self.create_loopback_interfaces(1)
268         self.loopback0 = self.lo_interfaces[0]
269         self.loopback0.admin_up()
270         echo_source = self.vapi.bfd_udp_get_echo_source()
271         self.assertFalse(echo_source.is_set)
272         self.assertFalse(echo_source.have_usable_ip4)
273         self.assertFalse(echo_source.have_usable_ip6)
274
275         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
276         echo_source = self.vapi.bfd_udp_get_echo_source()
277         self.assertTrue(echo_source.is_set)
278         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
279         self.assertFalse(echo_source.have_usable_ip4)
280         self.assertFalse(echo_source.have_usable_ip6)
281
282         self.loopback0.config_ip4()
283         unpacked = unpack("!L", self.loopback0.local_ip4n)
284         echo_ip4 = pack("!L", unpacked[0] ^ 1)
285         echo_source = self.vapi.bfd_udp_get_echo_source()
286         self.assertTrue(echo_source.is_set)
287         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
288         self.assertTrue(echo_source.have_usable_ip4)
289         self.assertEqual(echo_source.ip4_addr, echo_ip4)
290         self.assertFalse(echo_source.have_usable_ip6)
291
292         self.loopback0.config_ip6()
293         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
294         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
295                         unpacked[3] ^ 1)
296         echo_source = self.vapi.bfd_udp_get_echo_source()
297         self.assertTrue(echo_source.is_set)
298         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
299         self.assertTrue(echo_source.have_usable_ip4)
300         self.assertEqual(echo_source.ip4_addr, echo_ip4)
301         self.assertTrue(echo_source.have_usable_ip6)
302         self.assertEqual(echo_source.ip6_addr, echo_ip6)
303
304         self.vapi.bfd_udp_del_echo_source()
305         echo_source = self.vapi.bfd_udp_get_echo_source()
306         self.assertFalse(echo_source.is_set)
307         self.assertFalse(echo_source.have_usable_ip4)
308         self.assertFalse(echo_source.have_usable_ip6)
309
310
311 class BFDTestSession(object):
312     """ BFD session as seen from test framework side """
313
314     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
315                  bfd_key_id=None, our_seq_number=None):
316         self.test = test
317         self.af = af
318         self.sha1_key = sha1_key
319         self.bfd_key_id = bfd_key_id
320         self.interface = interface
321         self.udp_sport = randint(49152, 65535)
322         if our_seq_number is None:
323             self.our_seq_number = randint(0, 40000000)
324         else:
325             self.our_seq_number = our_seq_number
326         self.vpp_seq_number = None
327         self.my_discriminator = 0
328         self.desired_min_tx = 300000
329         self.required_min_rx = 300000
330         self.required_min_echo_rx = None
331         self.detect_mult = detect_mult
332         self.diag = BFDDiagCode.no_diagnostic
333         self.your_discriminator = None
334         self.state = BFDState.down
335         self.auth_type = BFDAuthType.no_auth
336
337     def inc_seq_num(self):
338         """ increment sequence number, wrapping if needed """
339         if self.our_seq_number == 0xFFFFFFFF:
340             self.our_seq_number = 0
341         else:
342             self.our_seq_number += 1
343
344     def update(self, my_discriminator=None, your_discriminator=None,
345                desired_min_tx=None, required_min_rx=None,
346                required_min_echo_rx=None, detect_mult=None,
347                diag=None, state=None, auth_type=None):
348         """ update BFD parameters associated with session """
349         if my_discriminator is not None:
350             self.my_discriminator = my_discriminator
351         if your_discriminator is not None:
352             self.your_discriminator = your_discriminator
353         if required_min_rx is not None:
354             self.required_min_rx = required_min_rx
355         if required_min_echo_rx is not None:
356             self.required_min_echo_rx = required_min_echo_rx
357         if desired_min_tx is not None:
358             self.desired_min_tx = desired_min_tx
359         if detect_mult is not None:
360             self.detect_mult = detect_mult
361         if diag is not None:
362             self.diag = diag
363         if state is not None:
364             self.state = state
365         if auth_type is not None:
366             self.auth_type = auth_type
367
368     def fill_packet_fields(self, packet):
369         """ set packet fields with known values in packet """
370         bfd = packet[BFD]
371         if self.my_discriminator:
372             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
373                                    self.my_discriminator)
374             bfd.my_discriminator = self.my_discriminator
375         if self.your_discriminator:
376             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
377                                    self.your_discriminator)
378             bfd.your_discriminator = self.your_discriminator
379         if self.required_min_rx:
380             self.test.logger.debug(
381                 "BFD: setting packet.required_min_rx_interval=%s",
382                 self.required_min_rx)
383             bfd.required_min_rx_interval = self.required_min_rx
384         if self.required_min_echo_rx:
385             self.test.logger.debug(
386                 "BFD: setting packet.required_min_echo_rx=%s",
387                 self.required_min_echo_rx)
388             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
389         if self.desired_min_tx:
390             self.test.logger.debug(
391                 "BFD: setting packet.desired_min_tx_interval=%s",
392                 self.desired_min_tx)
393             bfd.desired_min_tx_interval = self.desired_min_tx
394         if self.detect_mult:
395             self.test.logger.debug(
396                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
397             bfd.detect_mult = self.detect_mult
398         if self.diag:
399             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
400             bfd.diag = self.diag
401         if self.state:
402             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
403             bfd.state = self.state
404         if self.auth_type:
405             # this is used by a negative test-case
406             self.test.logger.debug("BFD: setting packet.auth_type=%s",
407                                    self.auth_type)
408             bfd.auth_type = self.auth_type
409
410     def create_packet(self):
411         """ create a BFD packet, reflecting the current state of session """
412         if self.sha1_key:
413             bfd = BFD(flags="A")
414             bfd.auth_type = self.sha1_key.auth_type
415             bfd.auth_len = BFD.sha1_auth_len
416             bfd.auth_key_id = self.bfd_key_id
417             bfd.auth_seq_num = self.our_seq_number
418             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
419         else:
420             bfd = BFD()
421         if self.af == AF_INET6:
422             packet = (Ether(src=self.interface.remote_mac,
423                             dst=self.interface.local_mac) /
424                       IPv6(src=self.interface.remote_ip6,
425                            dst=self.interface.local_ip6,
426                            hlim=255) /
427                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
428                       bfd)
429         else:
430             packet = (Ether(src=self.interface.remote_mac,
431                             dst=self.interface.local_mac) /
432                       IP(src=self.interface.remote_ip4,
433                          dst=self.interface.local_ip4,
434                          ttl=255) /
435                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
436                       bfd)
437         self.test.logger.debug("BFD: Creating packet")
438         self.fill_packet_fields(packet)
439         if self.sha1_key:
440             hash_material = scapy.compat.raw(
441                 packet[BFD])[:32] + self.sha1_key.key + \
442                 "\0" * (20 - len(self.sha1_key.key))
443             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
444                                    hashlib.sha1(hash_material).hexdigest())
445             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
446         return packet
447
448     def send_packet(self, packet=None, interface=None):
449         """ send packet on interface, creating the packet if needed """
450         if packet is None:
451             packet = self.create_packet()
452         if interface is None:
453             interface = self.test.pg0
454         self.test.logger.debug(ppp("Sending packet:", packet))
455         interface.add_stream(packet)
456         self.test.pg_start()
457
458     def verify_sha1_auth(self, packet):
459         """ Verify correctness of authentication in BFD layer. """
460         bfd = packet[BFD]
461         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
462         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
463                                BFDAuthType)
464         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
465         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
466         if self.vpp_seq_number is None:
467             self.vpp_seq_number = bfd.auth_seq_num
468             self.test.logger.debug("Received initial sequence number: %s" %
469                                    self.vpp_seq_number)
470         else:
471             recvd_seq_num = bfd.auth_seq_num
472             self.test.logger.debug("Received followup sequence number: %s" %
473                                    recvd_seq_num)
474             if self.vpp_seq_number < 0xffffffff:
475                 if self.sha1_key.auth_type == \
476                         BFDAuthType.meticulous_keyed_sha1:
477                     self.test.assert_equal(recvd_seq_num,
478                                            self.vpp_seq_number + 1,
479                                            "BFD sequence number")
480                 else:
481                     self.test.assert_in_range(recvd_seq_num,
482                                               self.vpp_seq_number,
483                                               self.vpp_seq_number + 1,
484                                               "BFD sequence number")
485             else:
486                 if self.sha1_key.auth_type == \
487                         BFDAuthType.meticulous_keyed_sha1:
488                     self.test.assert_equal(recvd_seq_num, 0,
489                                            "BFD sequence number")
490                 else:
491                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
492                                        "BFD sequence number not one of "
493                                        "(%s, 0)" % self.vpp_seq_number)
494             self.vpp_seq_number = recvd_seq_num
495         # last 20 bytes represent the hash - so replace them with the key,
496         # pad the result with zeros and hash the result
497         hash_material = bfd.original[:-20] + self.sha1_key.key + \
498             b"\0" * (20 - len(self.sha1_key.key))
499         expected_hash = hashlib.sha1(hash_material).hexdigest()
500         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
501                                expected_hash, "Auth key hash")
502
503     def verify_bfd(self, packet):
504         """ Verify correctness of BFD layer. """
505         bfd = packet[BFD]
506         self.test.assert_equal(bfd.version, 1, "BFD version")
507         self.test.assert_equal(bfd.your_discriminator,
508                                self.my_discriminator,
509                                "BFD - your discriminator")
510         if self.sha1_key:
511             self.verify_sha1_auth(packet)
512
513
514 def bfd_session_up(test):
515     """ Bring BFD session up """
516     test.logger.info("BFD: Waiting for slow hello")
517     p = wait_for_bfd_packet(test, 2)
518     old_offset = None
519     if hasattr(test, 'vpp_clock_offset'):
520         old_offset = test.vpp_clock_offset
521     test.vpp_clock_offset = time.time() - p.time
522     test.logger.debug("BFD: Calculated vpp clock offset: %s",
523                       test.vpp_clock_offset)
524     if old_offset:
525         test.assertAlmostEqual(
526             old_offset, test.vpp_clock_offset, delta=0.5,
527             msg="vpp clock offset not stable (new: %s, old: %s)" %
528             (test.vpp_clock_offset, old_offset))
529     test.logger.info("BFD: Sending Init")
530     test.test_session.update(my_discriminator=randint(0, 40000000),
531                              your_discriminator=p[BFD].my_discriminator,
532                              state=BFDState.init)
533     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
534             BFDAuthType.meticulous_keyed_sha1:
535         test.test_session.inc_seq_num()
536     test.test_session.send_packet()
537     test.logger.info("BFD: Waiting for event")
538     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
539     verify_event(test, e, expected_state=BFDState.up)
540     test.logger.info("BFD: Session is Up")
541     test.test_session.update(state=BFDState.up)
542     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
543             BFDAuthType.meticulous_keyed_sha1:
544         test.test_session.inc_seq_num()
545     test.test_session.send_packet()
546     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
547
548
549 def bfd_session_down(test):
550     """ Bring BFD session down """
551     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
552     test.test_session.update(state=BFDState.down)
553     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554             BFDAuthType.meticulous_keyed_sha1:
555         test.test_session.inc_seq_num()
556     test.test_session.send_packet()
557     test.logger.info("BFD: Waiting for event")
558     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
559     verify_event(test, e, expected_state=BFDState.down)
560     test.logger.info("BFD: Session is Down")
561     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
562
563
564 def verify_bfd_session_config(test, session, state=None):
565     dump = session.get_bfd_udp_session_dump_entry()
566     test.assertIsNotNone(dump)
567     # since dump is not none, we have verified that sw_if_index and addresses
568     # are valid (in get_bfd_udp_session_dump_entry)
569     if state:
570         test.assert_equal(dump.state, state, "session state")
571     test.assert_equal(dump.required_min_rx, session.required_min_rx,
572                       "required min rx interval")
573     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
574                       "desired min tx interval")
575     test.assert_equal(dump.detect_mult, session.detect_mult,
576                       "detect multiplier")
577     if session.sha1_key is None:
578         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
579     else:
580         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
581         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
582                           "bfd key id")
583         test.assert_equal(dump.conf_key_id,
584                           session.sha1_key.conf_key_id,
585                           "config key id")
586
587
588 def verify_ip(test, packet):
589     """ Verify correctness of IP layer. """
590     if test.vpp_session.af == AF_INET6:
591         ip = packet[IPv6]
592         local_ip = test.pg0.local_ip6
593         remote_ip = test.pg0.remote_ip6
594         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
595     else:
596         ip = packet[IP]
597         local_ip = test.pg0.local_ip4
598         remote_ip = test.pg0.remote_ip4
599         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
600     test.assert_equal(ip.src, local_ip, "IP source address")
601     test.assert_equal(ip.dst, remote_ip, "IP destination address")
602
603
604 def verify_udp(test, packet):
605     """ Verify correctness of UDP layer. """
606     udp = packet[UDP]
607     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
608     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
609                          "UDP source port")
610
611
612 def verify_event(test, event, expected_state):
613     """ Verify correctness of event values. """
614     e = event
615     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
616     test.assert_equal(e.sw_if_index,
617                       test.vpp_session.interface.sw_if_index,
618                       "BFD interface index")
619     is_ipv6 = 0
620     if test.vpp_session.af == AF_INET6:
621         is_ipv6 = 1
622     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
623     if test.vpp_session.af == AF_INET:
624         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
625                           "Local IPv4 address")
626         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
627                           "Peer IPv4 address")
628     else:
629         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
630                           "Local IPv6 address")
631         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
632                           "Peer IPv6 address")
633     test.assert_equal(e.state, expected_state, BFDState)
634
635
636 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
637     """ wait for BFD packet and verify its correctness
638
639     :param timeout: how long to wait
640     :param pcap_time_min: ignore packets with pcap timestamp lower than this
641
642     :returns: tuple (packet, time spent waiting for packet)
643     """
644     test.logger.info("BFD: Waiting for BFD packet")
645     deadline = time.time() + timeout
646     counter = 0
647     while True:
648         counter += 1
649         # sanity check
650         test.assert_in_range(counter, 0, 100, "number of packets ignored")
651         time_left = deadline - time.time()
652         if time_left < 0:
653             raise CaptureTimeoutError("Packet did not arrive within timeout")
654         p = test.pg0.wait_for_packet(timeout=time_left)
655         test.logger.debug(ppp("BFD: Got packet:", p))
656         if pcap_time_min is not None and p.time < pcap_time_min:
657             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
658                                   "pcap time min %s):" %
659                                   (p.time, pcap_time_min), p))
660         else:
661             break
662     bfd = p[BFD]
663     if bfd is None:
664         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
665     if bfd.payload:
666         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
667     verify_ip(test, p)
668     verify_udp(test, p)
669     test.test_session.verify_bfd(p)
670     return p
671
672
673 @unittest.skipUnless(running_extended_tests, "part of extended tests")
674 class BFD4TestCase(VppTestCase):
675     """Bidirectional Forwarding Detection (BFD)"""
676
677     pg0 = None
678     vpp_clock_offset = None
679     vpp_session = None
680     test_session = None
681
682     @classmethod
683     def setUpClass(cls):
684         super(BFD4TestCase, cls).setUpClass()
685         cls.vapi.cli("set log class bfd level debug")
686         try:
687             cls.create_pg_interfaces([0])
688             cls.create_loopback_interfaces(1)
689             cls.loopback0 = cls.lo_interfaces[0]
690             cls.loopback0.config_ip4()
691             cls.loopback0.admin_up()
692             cls.pg0.config_ip4()
693             cls.pg0.configure_ipv4_neighbors()
694             cls.pg0.admin_up()
695             cls.pg0.resolve_arp()
696
697         except Exception:
698             super(BFD4TestCase, cls).tearDownClass()
699             raise
700
701     @classmethod
702     def tearDownClass(cls):
703         super(BFD4TestCase, cls).tearDownClass()
704
705     def setUp(self):
706         super(BFD4TestCase, self).setUp()
707         self.factory = AuthKeyFactory()
708         self.vapi.want_bfd_events()
709         self.pg0.enable_capture()
710         try:
711             self.vpp_session = VppBFDUDPSession(self, self.pg0,
712                                                 self.pg0.remote_ip4)
713             self.vpp_session.add_vpp_config()
714             self.vpp_session.admin_up()
715             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
716         except:
717             self.vapi.want_bfd_events(enable_disable=0)
718             raise
719
720     def tearDown(self):
721         if not self.vpp_dead:
722             self.vapi.want_bfd_events(enable_disable=0)
723         self.vapi.collect_events()  # clear the event queue
724         super(BFD4TestCase, self).tearDown()
725
726     def test_session_up(self):
727         """ bring BFD session up """
728         bfd_session_up(self)
729
730     def test_session_up_by_ip(self):
731         """ bring BFD session up - first frame looked up by address pair """
732         self.logger.info("BFD: Sending Slow control frame")
733         self.test_session.update(my_discriminator=randint(0, 40000000))
734         self.test_session.send_packet()
735         self.pg0.enable_capture()
736         p = self.pg0.wait_for_packet(1)
737         self.assert_equal(p[BFD].your_discriminator,
738                           self.test_session.my_discriminator,
739                           "BFD - your discriminator")
740         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
741         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
742                                  state=BFDState.up)
743         self.logger.info("BFD: Waiting for event")
744         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
745         verify_event(self, e, expected_state=BFDState.init)
746         self.logger.info("BFD: Sending Up")
747         self.test_session.send_packet()
748         self.logger.info("BFD: Waiting for event")
749         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750         verify_event(self, e, expected_state=BFDState.up)
751         self.logger.info("BFD: Session is Up")
752         self.test_session.update(state=BFDState.up)
753         self.test_session.send_packet()
754         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
755
756     def test_session_down(self):
757         """ bring BFD session down """
758         bfd_session_up(self)
759         bfd_session_down(self)
760
761     @unittest.skipUnless(running_extended_tests, "part of extended tests")
762     def test_hold_up(self):
763         """ hold BFD session up """
764         bfd_session_up(self)
765         for dummy in range(self.test_session.detect_mult * 2):
766             wait_for_bfd_packet(self)
767             self.test_session.send_packet()
768         self.assert_equal(len(self.vapi.collect_events()), 0,
769                           "number of bfd events")
770
771     @unittest.skipUnless(running_extended_tests, "part of extended tests")
772     def test_slow_timer(self):
773         """ verify slow periodic control frames while session down """
774         packet_count = 3
775         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
776         prev_packet = wait_for_bfd_packet(self, 2)
777         for dummy in range(packet_count):
778             next_packet = wait_for_bfd_packet(self, 2)
779             time_diff = next_packet.time - prev_packet.time
780             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
781             # to work around timing issues
782             self.assert_in_range(
783                 time_diff, 0.70, 1.05, "time between slow packets")
784             prev_packet = next_packet
785
786     @unittest.skipUnless(running_extended_tests, "part of extended tests")
787     def test_zero_remote_min_rx(self):
788         """ no packets when zero remote required min rx interval """
789         bfd_session_up(self)
790         self.test_session.update(required_min_rx=0)
791         self.test_session.send_packet()
792         for dummy in range(self.test_session.detect_mult):
793             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
794                        "sleep before transmitting bfd packet")
795             self.test_session.send_packet()
796             try:
797                 p = wait_for_bfd_packet(self, timeout=0)
798                 self.logger.error(ppp("Received unexpected packet:", p))
799             except CaptureTimeoutError:
800                 pass
801         self.assert_equal(
802             len(self.vapi.collect_events()), 0, "number of bfd events")
803         self.test_session.update(required_min_rx=300000)
804         for dummy in range(3):
805             self.test_session.send_packet()
806             wait_for_bfd_packet(
807                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
808         self.assert_equal(
809             len(self.vapi.collect_events()), 0, "number of bfd events")
810
811     @unittest.skipUnless(running_extended_tests, "part of extended tests")
812     def test_conn_down(self):
813         """ verify session goes down after inactivity """
814         bfd_session_up(self)
815         detection_time = self.test_session.detect_mult *\
816             self.vpp_session.required_min_rx / USEC_IN_SEC
817         self.sleep(detection_time, "waiting for BFD session time-out")
818         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
819         verify_event(self, e, expected_state=BFDState.down)
820
821     @unittest.skipUnless(running_extended_tests, "part of extended tests")
822     def test_large_required_min_rx(self):
823         """ large remote required min rx interval """
824         bfd_session_up(self)
825         p = wait_for_bfd_packet(self)
826         interval = 3000000
827         self.test_session.update(required_min_rx=interval)
828         self.test_session.send_packet()
829         time_mark = time.time()
830         count = 0
831         # busy wait here, trying to collect a packet or event, vpp is not
832         # allowed to send packets and the session will timeout first - so the
833         # Up->Down event must arrive before any packets do
834         while time.time() < time_mark + interval / USEC_IN_SEC:
835             try:
836                 p = wait_for_bfd_packet(self, timeout=0)
837                 # if vpp managed to send a packet before we did the session
838                 # session update, then that's fine, ignore it
839                 if p.time < time_mark - self.vpp_clock_offset:
840                     continue
841                 self.logger.error(ppp("Received unexpected packet:", p))
842                 count += 1
843             except CaptureTimeoutError:
844                 pass
845             events = self.vapi.collect_events()
846             if len(events) > 0:
847                 verify_event(self, events[0], BFDState.down)
848                 break
849         self.assert_equal(count, 0, "number of packets received")
850
851     @unittest.skipUnless(running_extended_tests, "part of extended tests")
852     def test_immediate_remote_min_rx_reduction(self):
853         """ immediately honor remote required min rx reduction """
854         self.vpp_session.remove_vpp_config()
855         self.vpp_session = VppBFDUDPSession(
856             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
857         self.pg0.enable_capture()
858         self.vpp_session.add_vpp_config()
859         self.test_session.update(desired_min_tx=1000000,
860                                  required_min_rx=1000000)
861         bfd_session_up(self)
862         reference_packet = wait_for_bfd_packet(self)
863         time_mark = time.time()
864         interval = 300000
865         self.test_session.update(required_min_rx=interval)
866         self.test_session.send_packet()
867         extra_time = time.time() - time_mark
868         p = wait_for_bfd_packet(self)
869         # first packet is allowed to be late by time we spent doing the update
870         # calculated in extra_time
871         self.assert_in_range(p.time - reference_packet.time,
872                              .95 * 0.75 * interval / USEC_IN_SEC,
873                              1.05 * interval / USEC_IN_SEC + extra_time,
874                              "time between BFD packets")
875         reference_packet = p
876         for dummy in range(3):
877             p = wait_for_bfd_packet(self)
878             diff = p.time - reference_packet.time
879             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
880                                  1.05 * interval / USEC_IN_SEC,
881                                  "time between BFD packets")
882             reference_packet = p
883
884     @unittest.skipUnless(running_extended_tests, "part of extended tests")
885     def test_modify_req_min_rx_double(self):
886         """ modify session - double required min rx """
887         bfd_session_up(self)
888         p = wait_for_bfd_packet(self)
889         self.test_session.update(desired_min_tx=10000,
890                                  required_min_rx=10000)
891         self.test_session.send_packet()
892         # double required min rx
893         self.vpp_session.modify_parameters(
894             required_min_rx=2 * self.vpp_session.required_min_rx)
895         p = wait_for_bfd_packet(
896             self, pcap_time_min=time.time() - self.vpp_clock_offset)
897         # poll bit needs to be set
898         self.assertIn("P", p.sprintf("%BFD.flags%"),
899                       "Poll bit not set in BFD packet")
900         # finish poll sequence with final packet
901         final = self.test_session.create_packet()
902         final[BFD].flags = "F"
903         timeout = self.test_session.detect_mult * \
904             max(self.test_session.desired_min_tx,
905                 self.vpp_session.required_min_rx) / USEC_IN_SEC
906         self.test_session.send_packet(final)
907         time_mark = time.time()
908         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
909         verify_event(self, e, expected_state=BFDState.down)
910         time_to_event = time.time() - time_mark
911         self.assert_in_range(time_to_event, .9 * timeout,
912                              1.1 * timeout, "session timeout")
913
914     @unittest.skipUnless(running_extended_tests, "part of extended tests")
915     def test_modify_req_min_rx_halve(self):
916         """ modify session - halve required min rx """
917         self.vpp_session.modify_parameters(
918             required_min_rx=2 * self.vpp_session.required_min_rx)
919         bfd_session_up(self)
920         p = wait_for_bfd_packet(self)
921         self.test_session.update(desired_min_tx=10000,
922                                  required_min_rx=10000)
923         self.test_session.send_packet()
924         p = wait_for_bfd_packet(
925             self, pcap_time_min=time.time() - self.vpp_clock_offset)
926         # halve required min rx
927         old_required_min_rx = self.vpp_session.required_min_rx
928         self.vpp_session.modify_parameters(
929             required_min_rx=self.vpp_session.required_min_rx // 2)
930         # now we wait 0.8*3*old-req-min-rx and the session should still be up
931         self.sleep(0.8 * self.vpp_session.detect_mult *
932                    old_required_min_rx / USEC_IN_SEC,
933                    "wait before finishing poll sequence")
934         self.assert_equal(len(self.vapi.collect_events()), 0,
935                           "number of bfd events")
936         p = wait_for_bfd_packet(self)
937         # poll bit needs to be set
938         self.assertIn("P", p.sprintf("%BFD.flags%"),
939                       "Poll bit not set in BFD packet")
940         # finish poll sequence with final packet
941         final = self.test_session.create_packet()
942         final[BFD].flags = "F"
943         self.test_session.send_packet(final)
944         # now the session should time out under new conditions
945         detection_time = self.test_session.detect_mult *\
946             self.vpp_session.required_min_rx / USEC_IN_SEC
947         before = time.time()
948         e = self.vapi.wait_for_event(
949             2 * detection_time, "bfd_udp_session_details")
950         after = time.time()
951         self.assert_in_range(after - before,
952                              0.9 * detection_time,
953                              1.1 * detection_time,
954                              "time before bfd session goes down")
955         verify_event(self, e, expected_state=BFDState.down)
956
957     @unittest.skipUnless(running_extended_tests, "part of extended tests")
958     def test_modify_detect_mult(self):
959         """ modify detect multiplier """
960         bfd_session_up(self)
961         p = wait_for_bfd_packet(self)
962         self.vpp_session.modify_parameters(detect_mult=1)
963         p = wait_for_bfd_packet(
964             self, pcap_time_min=time.time() - self.vpp_clock_offset)
965         self.assert_equal(self.vpp_session.detect_mult,
966                           p[BFD].detect_mult,
967                           "detect mult")
968         # poll bit must not be set
969         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
970                          "Poll bit not set in BFD packet")
971         self.vpp_session.modify_parameters(detect_mult=10)
972         p = wait_for_bfd_packet(
973             self, pcap_time_min=time.time() - self.vpp_clock_offset)
974         self.assert_equal(self.vpp_session.detect_mult,
975                           p[BFD].detect_mult,
976                           "detect mult")
977         # poll bit must not be set
978         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
979                          "Poll bit not set in BFD packet")
980
981     @unittest.skipUnless(running_extended_tests, "part of extended tests")
982     def test_queued_poll(self):
983         """ test poll sequence queueing """
984         bfd_session_up(self)
985         p = wait_for_bfd_packet(self)
986         self.vpp_session.modify_parameters(
987             required_min_rx=2 * self.vpp_session.required_min_rx)
988         p = wait_for_bfd_packet(self)
989         poll_sequence_start = time.time()
990         poll_sequence_length_min = 0.5
991         send_final_after = time.time() + poll_sequence_length_min
992         # poll bit needs to be set
993         self.assertIn("P", p.sprintf("%BFD.flags%"),
994                       "Poll bit not set in BFD packet")
995         self.assert_equal(p[BFD].required_min_rx_interval,
996                           self.vpp_session.required_min_rx,
997                           "BFD required min rx interval")
998         self.vpp_session.modify_parameters(
999             required_min_rx=2 * self.vpp_session.required_min_rx)
1000         # 2nd poll sequence should be queued now
1001         # don't send the reply back yet, wait for some time to emulate
1002         # longer round-trip time
1003         packet_count = 0
1004         while time.time() < send_final_after:
1005             self.test_session.send_packet()
1006             p = wait_for_bfd_packet(self)
1007             self.assert_equal(len(self.vapi.collect_events()), 0,
1008                               "number of bfd events")
1009             self.assert_equal(p[BFD].required_min_rx_interval,
1010                               self.vpp_session.required_min_rx,
1011                               "BFD required min rx interval")
1012             packet_count += 1
1013             # poll bit must be set
1014             self.assertIn("P", p.sprintf("%BFD.flags%"),
1015                           "Poll bit not set in BFD packet")
1016         final = self.test_session.create_packet()
1017         final[BFD].flags = "F"
1018         self.test_session.send_packet(final)
1019         # finish 1st with final
1020         poll_sequence_length = time.time() - poll_sequence_start
1021         # vpp must wait for some time before starting new poll sequence
1022         poll_no_2_started = False
1023         for dummy in range(2 * packet_count):
1024             p = wait_for_bfd_packet(self)
1025             self.assert_equal(len(self.vapi.collect_events()), 0,
1026                               "number of bfd events")
1027             if "P" in p.sprintf("%BFD.flags%"):
1028                 poll_no_2_started = True
1029                 if time.time() < poll_sequence_start + poll_sequence_length:
1030                     raise Exception("VPP started 2nd poll sequence too soon")
1031                 final = self.test_session.create_packet()
1032                 final[BFD].flags = "F"
1033                 self.test_session.send_packet(final)
1034                 break
1035             else:
1036                 self.test_session.send_packet()
1037         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1038         # finish 2nd with final
1039         final = self.test_session.create_packet()
1040         final[BFD].flags = "F"
1041         self.test_session.send_packet(final)
1042         p = wait_for_bfd_packet(self)
1043         # poll bit must not be set
1044         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1045                          "Poll bit set in BFD packet")
1046
1047     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1048     def test_poll_response(self):
1049         """ test correct response to control frame with poll bit set """
1050         bfd_session_up(self)
1051         poll = self.test_session.create_packet()
1052         poll[BFD].flags = "P"
1053         self.test_session.send_packet(poll)
1054         final = wait_for_bfd_packet(
1055             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1056         self.assertIn("F", final.sprintf("%BFD.flags%"))
1057
1058     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1059     def test_no_periodic_if_remote_demand(self):
1060         """ no periodic frames outside poll sequence if remote demand set """
1061         bfd_session_up(self)
1062         demand = self.test_session.create_packet()
1063         demand[BFD].flags = "D"
1064         self.test_session.send_packet(demand)
1065         transmit_time = 0.9 \
1066             * max(self.vpp_session.required_min_rx,
1067                   self.test_session.desired_min_tx) \
1068             / USEC_IN_SEC
1069         count = 0
1070         for dummy in range(self.test_session.detect_mult * 2):
1071             self.sleep(transmit_time)
1072             self.test_session.send_packet(demand)
1073             try:
1074                 p = wait_for_bfd_packet(self, timeout=0)
1075                 self.logger.error(ppp("Received unexpected packet:", p))
1076                 count += 1
1077             except CaptureTimeoutError:
1078                 pass
1079         events = self.vapi.collect_events()
1080         for e in events:
1081             self.logger.error("Received unexpected event: %s", e)
1082         self.assert_equal(count, 0, "number of packets received")
1083         self.assert_equal(len(events), 0, "number of events received")
1084
1085     def test_echo_looped_back(self):
1086         """ echo packets looped back """
1087         # don't need a session in this case..
1088         self.vpp_session.remove_vpp_config()
1089         self.pg0.enable_capture()
1090         echo_packet_count = 10
1091         # random source port low enough to increment a few times..
1092         udp_sport_tx = randint(1, 50000)
1093         udp_sport_rx = udp_sport_tx
1094         echo_packet = (Ether(src=self.pg0.remote_mac,
1095                              dst=self.pg0.local_mac) /
1096                        IP(src=self.pg0.remote_ip4,
1097                           dst=self.pg0.remote_ip4) /
1098                        UDP(dport=BFD.udp_dport_echo) /
1099                        Raw("this should be looped back"))
1100         for dummy in range(echo_packet_count):
1101             self.sleep(.01, "delay between echo packets")
1102             echo_packet[UDP].sport = udp_sport_tx
1103             udp_sport_tx += 1
1104             self.logger.debug(ppp("Sending packet:", echo_packet))
1105             self.pg0.add_stream(echo_packet)
1106             self.pg_start()
1107         for dummy in range(echo_packet_count):
1108             p = self.pg0.wait_for_packet(1)
1109             self.logger.debug(ppp("Got packet:", p))
1110             ether = p[Ether]
1111             self.assert_equal(self.pg0.remote_mac,
1112                               ether.dst, "Destination MAC")
1113             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1114             ip = p[IP]
1115             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1116             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1117             udp = p[UDP]
1118             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1119                               "UDP destination port")
1120             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1121             udp_sport_rx += 1
1122             # need to compare the hex payload here, otherwise BFD_vpp_echo
1123             # gets in way
1124             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1125                              scapy.compat.raw(echo_packet[UDP].payload),
1126                              "Received packet is not the echo packet sent")
1127         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1128                           "ECHO packet identifier for test purposes)")
1129
1130     def test_echo(self):
1131         """ echo function """
1132         bfd_session_up(self)
1133         self.test_session.update(required_min_echo_rx=150000)
1134         self.test_session.send_packet()
1135         detection_time = self.test_session.detect_mult *\
1136             self.vpp_session.required_min_rx / USEC_IN_SEC
1137         # echo shouldn't work without echo source set
1138         for dummy in range(10):
1139             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1140             self.sleep(sleep, "delay before sending bfd packet")
1141             self.test_session.send_packet()
1142         p = wait_for_bfd_packet(
1143             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1144         self.assert_equal(p[BFD].required_min_rx_interval,
1145                           self.vpp_session.required_min_rx,
1146                           "BFD required min rx interval")
1147         self.test_session.send_packet()
1148         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1149         echo_seen = False
1150         # should be turned on - loopback echo packets
1151         for dummy in range(3):
1152             loop_until = time.time() + 0.75 * detection_time
1153             while time.time() < loop_until:
1154                 p = self.pg0.wait_for_packet(1)
1155                 self.logger.debug(ppp("Got packet:", p))
1156                 if p[UDP].dport == BFD.udp_dport_echo:
1157                     self.assert_equal(
1158                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1159                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1160                                         "BFD ECHO src IP equal to loopback IP")
1161                     self.logger.debug(ppp("Looping back packet:", p))
1162                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1163                                       "ECHO packet destination MAC address")
1164                     p[Ether].dst = self.pg0.local_mac
1165                     self.pg0.add_stream(p)
1166                     self.pg_start()
1167                     echo_seen = True
1168                 elif p.haslayer(BFD):
1169                     if echo_seen:
1170                         self.assertGreaterEqual(
1171                             p[BFD].required_min_rx_interval,
1172                             1000000)
1173                     if "P" in p.sprintf("%BFD.flags%"):
1174                         final = self.test_session.create_packet()
1175                         final[BFD].flags = "F"
1176                         self.test_session.send_packet(final)
1177                 else:
1178                     raise Exception(ppp("Received unknown packet:", p))
1179
1180                 self.assert_equal(len(self.vapi.collect_events()), 0,
1181                                   "number of bfd events")
1182             self.test_session.send_packet()
1183         self.assertTrue(echo_seen, "No echo packets received")
1184
1185     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1186     def test_echo_fail(self):
1187         """ session goes down if echo function fails """
1188         bfd_session_up(self)
1189         self.test_session.update(required_min_echo_rx=150000)
1190         self.test_session.send_packet()
1191         detection_time = self.test_session.detect_mult *\
1192             self.vpp_session.required_min_rx / USEC_IN_SEC
1193         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1194         # echo function should be used now, but we will drop the echo packets
1195         verified_diag = False
1196         for dummy in range(3):
1197             loop_until = time.time() + 0.75 * detection_time
1198             while time.time() < loop_until:
1199                 p = self.pg0.wait_for_packet(1)
1200                 self.logger.debug(ppp("Got packet:", p))
1201                 if p[UDP].dport == BFD.udp_dport_echo:
1202                     # dropped
1203                     pass
1204                 elif p.haslayer(BFD):
1205                     if "P" in p.sprintf("%BFD.flags%"):
1206                         self.assertGreaterEqual(
1207                             p[BFD].required_min_rx_interval,
1208                             1000000)
1209                         final = self.test_session.create_packet()
1210                         final[BFD].flags = "F"
1211                         self.test_session.send_packet(final)
1212                     if p[BFD].state == BFDState.down:
1213                         self.assert_equal(p[BFD].diag,
1214                                           BFDDiagCode.echo_function_failed,
1215                                           BFDDiagCode)
1216                         verified_diag = True
1217                 else:
1218                     raise Exception(ppp("Received unknown packet:", p))
1219             self.test_session.send_packet()
1220         events = self.vapi.collect_events()
1221         self.assert_equal(len(events), 1, "number of bfd events")
1222         self.assert_equal(events[0].state, BFDState.down, BFDState)
1223         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1224
1225     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1226     def test_echo_stop(self):
1227         """ echo function stops if peer sets required min echo rx zero """
1228         bfd_session_up(self)
1229         self.test_session.update(required_min_echo_rx=150000)
1230         self.test_session.send_packet()
1231         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1232         # wait for first echo packet
1233         while True:
1234             p = self.pg0.wait_for_packet(1)
1235             self.logger.debug(ppp("Got packet:", p))
1236             if p[UDP].dport == BFD.udp_dport_echo:
1237                 self.logger.debug(ppp("Looping back packet:", p))
1238                 p[Ether].dst = self.pg0.local_mac
1239                 self.pg0.add_stream(p)
1240                 self.pg_start()
1241                 break
1242             elif p.haslayer(BFD):
1243                 # ignore BFD
1244                 pass
1245             else:
1246                 raise Exception(ppp("Received unknown packet:", p))
1247         self.test_session.update(required_min_echo_rx=0)
1248         self.test_session.send_packet()
1249         # echo packets shouldn't arrive anymore
1250         for dummy in range(5):
1251             wait_for_bfd_packet(
1252                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1253             self.test_session.send_packet()
1254             events = self.vapi.collect_events()
1255             self.assert_equal(len(events), 0, "number of bfd events")
1256
1257     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1258     def test_echo_source_removed(self):
1259         """ echo function stops if echo source is removed """
1260         bfd_session_up(self)
1261         self.test_session.update(required_min_echo_rx=150000)
1262         self.test_session.send_packet()
1263         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1264         # wait for first echo packet
1265         while True:
1266             p = self.pg0.wait_for_packet(1)
1267             self.logger.debug(ppp("Got packet:", p))
1268             if p[UDP].dport == BFD.udp_dport_echo:
1269                 self.logger.debug(ppp("Looping back packet:", p))
1270                 p[Ether].dst = self.pg0.local_mac
1271                 self.pg0.add_stream(p)
1272                 self.pg_start()
1273                 break
1274             elif p.haslayer(BFD):
1275                 # ignore BFD
1276                 pass
1277             else:
1278                 raise Exception(ppp("Received unknown packet:", p))
1279         self.vapi.bfd_udp_del_echo_source()
1280         self.test_session.send_packet()
1281         # echo packets shouldn't arrive anymore
1282         for dummy in range(5):
1283             wait_for_bfd_packet(
1284                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1285             self.test_session.send_packet()
1286             events = self.vapi.collect_events()
1287             self.assert_equal(len(events), 0, "number of bfd events")
1288
1289     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1290     def test_stale_echo(self):
1291         """ stale echo packets don't keep a session up """
1292         bfd_session_up(self)
1293         self.test_session.update(required_min_echo_rx=150000)
1294         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1295         self.test_session.send_packet()
1296         # should be turned on - loopback echo packets
1297         echo_packet = None
1298         timeout_at = None
1299         timeout_ok = False
1300         for dummy in range(10 * self.vpp_session.detect_mult):
1301             p = self.pg0.wait_for_packet(1)
1302             if p[UDP].dport == BFD.udp_dport_echo:
1303                 if echo_packet is None:
1304                     self.logger.debug(ppp("Got first echo packet:", p))
1305                     echo_packet = p
1306                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1307                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1308                 else:
1309                     self.logger.debug(ppp("Got followup echo packet:", p))
1310                 self.logger.debug(ppp("Looping back first echo packet:", p))
1311                 echo_packet[Ether].dst = self.pg0.local_mac
1312                 self.pg0.add_stream(echo_packet)
1313                 self.pg_start()
1314             elif p.haslayer(BFD):
1315                 self.logger.debug(ppp("Got packet:", p))
1316                 if "P" in p.sprintf("%BFD.flags%"):
1317                     final = self.test_session.create_packet()
1318                     final[BFD].flags = "F"
1319                     self.test_session.send_packet(final)
1320                 if p[BFD].state == BFDState.down:
1321                     self.assertIsNotNone(
1322                         timeout_at,
1323                         "Session went down before first echo packet received")
1324                     now = time.time()
1325                     self.assertGreaterEqual(
1326                         now, timeout_at,
1327                         "Session timeout at %s, but is expected at %s" %
1328                         (now, timeout_at))
1329                     self.assert_equal(p[BFD].diag,
1330                                       BFDDiagCode.echo_function_failed,
1331                                       BFDDiagCode)
1332                     events = self.vapi.collect_events()
1333                     self.assert_equal(len(events), 1, "number of bfd events")
1334                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1335                     timeout_ok = True
1336                     break
1337             else:
1338                 raise Exception(ppp("Received unknown packet:", p))
1339             self.test_session.send_packet()
1340         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1341
1342     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1343     def test_invalid_echo_checksum(self):
1344         """ echo packets with invalid checksum don't keep a session up """
1345         bfd_session_up(self)
1346         self.test_session.update(required_min_echo_rx=150000)
1347         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1348         self.test_session.send_packet()
1349         # should be turned on - loopback echo packets
1350         timeout_at = None
1351         timeout_ok = False
1352         for dummy in range(10 * self.vpp_session.detect_mult):
1353             p = self.pg0.wait_for_packet(1)
1354             if p[UDP].dport == BFD.udp_dport_echo:
1355                 self.logger.debug(ppp("Got echo packet:", p))
1356                 if timeout_at is None:
1357                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1358                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1359                 p[BFD_vpp_echo].checksum = getrandbits(64)
1360                 p[Ether].dst = self.pg0.local_mac
1361                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1362                 self.pg0.add_stream(p)
1363                 self.pg_start()
1364             elif p.haslayer(BFD):
1365                 self.logger.debug(ppp("Got packet:", p))
1366                 if "P" in p.sprintf("%BFD.flags%"):
1367                     final = self.test_session.create_packet()
1368                     final[BFD].flags = "F"
1369                     self.test_session.send_packet(final)
1370                 if p[BFD].state == BFDState.down:
1371                     self.assertIsNotNone(
1372                         timeout_at,
1373                         "Session went down before first echo packet received")
1374                     now = time.time()
1375                     self.assertGreaterEqual(
1376                         now, timeout_at,
1377                         "Session timeout at %s, but is expected at %s" %
1378                         (now, timeout_at))
1379                     self.assert_equal(p[BFD].diag,
1380                                       BFDDiagCode.echo_function_failed,
1381                                       BFDDiagCode)
1382                     events = self.vapi.collect_events()
1383                     self.assert_equal(len(events), 1, "number of bfd events")
1384                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1385                     timeout_ok = True
1386                     break
1387             else:
1388                 raise Exception(ppp("Received unknown packet:", p))
1389             self.test_session.send_packet()
1390         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1391
1392     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1393     def test_admin_up_down(self):
1394         """ put session admin-up and admin-down """
1395         bfd_session_up(self)
1396         self.vpp_session.admin_down()
1397         self.pg0.enable_capture()
1398         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1399         verify_event(self, e, expected_state=BFDState.admin_down)
1400         for dummy in range(2):
1401             p = wait_for_bfd_packet(self)
1402             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1403         # try to bring session up - shouldn't be possible
1404         self.test_session.update(state=BFDState.init)
1405         self.test_session.send_packet()
1406         for dummy in range(2):
1407             p = wait_for_bfd_packet(self)
1408             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1409         self.vpp_session.admin_up()
1410         self.test_session.update(state=BFDState.down)
1411         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1412         verify_event(self, e, expected_state=BFDState.down)
1413         p = wait_for_bfd_packet(
1414             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1415         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1416         self.test_session.send_packet()
1417         p = wait_for_bfd_packet(
1418             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1419         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1420         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1421         verify_event(self, e, expected_state=BFDState.init)
1422         self.test_session.update(state=BFDState.up)
1423         self.test_session.send_packet()
1424         p = wait_for_bfd_packet(
1425             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1426         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1427         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1428         verify_event(self, e, expected_state=BFDState.up)
1429
1430     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1431     def test_config_change_remote_demand(self):
1432         """ configuration change while peer in demand mode """
1433         bfd_session_up(self)
1434         demand = self.test_session.create_packet()
1435         demand[BFD].flags = "D"
1436         self.test_session.send_packet(demand)
1437         self.vpp_session.modify_parameters(
1438             required_min_rx=2 * self.vpp_session.required_min_rx)
1439         p = wait_for_bfd_packet(
1440             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1441         # poll bit must be set
1442         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1443         # terminate poll sequence
1444         final = self.test_session.create_packet()
1445         final[BFD].flags = "D+F"
1446         self.test_session.send_packet(final)
1447         # vpp should be quiet now again
1448         transmit_time = 0.9 \
1449             * max(self.vpp_session.required_min_rx,
1450                   self.test_session.desired_min_tx) \
1451             / USEC_IN_SEC
1452         count = 0
1453         for dummy in range(self.test_session.detect_mult * 2):
1454             self.sleep(transmit_time)
1455             self.test_session.send_packet(demand)
1456             try:
1457                 p = wait_for_bfd_packet(self, timeout=0)
1458                 self.logger.error(ppp("Received unexpected packet:", p))
1459                 count += 1
1460             except CaptureTimeoutError:
1461                 pass
1462         events = self.vapi.collect_events()
1463         for e in events:
1464             self.logger.error("Received unexpected event: %s", e)
1465         self.assert_equal(count, 0, "number of packets received")
1466         self.assert_equal(len(events), 0, "number of events received")
1467
1468     def test_intf_deleted(self):
1469         """ interface with bfd session deleted """
1470         intf = VppLoInterface(self)
1471         intf.config_ip4()
1472         intf.admin_up()
1473         sw_if_index = intf.sw_if_index
1474         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1475         vpp_session.add_vpp_config()
1476         vpp_session.admin_up()
1477         intf.remove_vpp_config()
1478         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1479         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1480         self.assertFalse(vpp_session.query_vpp_config())
1481
1482
1483 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1484 class BFD6TestCase(VppTestCase):
1485     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1486
1487     pg0 = None
1488     vpp_clock_offset = None
1489     vpp_session = None
1490     test_session = None
1491
1492     @classmethod
1493     def setUpClass(cls):
1494         super(BFD6TestCase, cls).setUpClass()
1495         cls.vapi.cli("set log class bfd level debug")
1496         try:
1497             cls.create_pg_interfaces([0])
1498             cls.pg0.config_ip6()
1499             cls.pg0.configure_ipv6_neighbors()
1500             cls.pg0.admin_up()
1501             cls.pg0.resolve_ndp()
1502             cls.create_loopback_interfaces(1)
1503             cls.loopback0 = cls.lo_interfaces[0]
1504             cls.loopback0.config_ip6()
1505             cls.loopback0.admin_up()
1506
1507         except Exception:
1508             super(BFD6TestCase, cls).tearDownClass()
1509             raise
1510
1511     @classmethod
1512     def tearDownClass(cls):
1513         super(BFD6TestCase, cls).tearDownClass()
1514
1515     def setUp(self):
1516         super(BFD6TestCase, self).setUp()
1517         self.factory = AuthKeyFactory()
1518         self.vapi.want_bfd_events()
1519         self.pg0.enable_capture()
1520         try:
1521             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1522                                                 self.pg0.remote_ip6,
1523                                                 af=AF_INET6)
1524             self.vpp_session.add_vpp_config()
1525             self.vpp_session.admin_up()
1526             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1527             self.logger.debug(self.vapi.cli("show adj nbr"))
1528         except:
1529             self.vapi.want_bfd_events(enable_disable=0)
1530             raise
1531
1532     def tearDown(self):
1533         if not self.vpp_dead:
1534             self.vapi.want_bfd_events(enable_disable=0)
1535         self.vapi.collect_events()  # clear the event queue
1536         super(BFD6TestCase, self).tearDown()
1537
1538     def test_session_up(self):
1539         """ bring BFD session up """
1540         bfd_session_up(self)
1541
1542     def test_session_up_by_ip(self):
1543         """ bring BFD session up - first frame looked up by address pair """
1544         self.logger.info("BFD: Sending Slow control frame")
1545         self.test_session.update(my_discriminator=randint(0, 40000000))
1546         self.test_session.send_packet()
1547         self.pg0.enable_capture()
1548         p = self.pg0.wait_for_packet(1)
1549         self.assert_equal(p[BFD].your_discriminator,
1550                           self.test_session.my_discriminator,
1551                           "BFD - your discriminator")
1552         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1553         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1554                                  state=BFDState.up)
1555         self.logger.info("BFD: Waiting for event")
1556         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1557         verify_event(self, e, expected_state=BFDState.init)
1558         self.logger.info("BFD: Sending Up")
1559         self.test_session.send_packet()
1560         self.logger.info("BFD: Waiting for event")
1561         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1562         verify_event(self, e, expected_state=BFDState.up)
1563         self.logger.info("BFD: Session is Up")
1564         self.test_session.update(state=BFDState.up)
1565         self.test_session.send_packet()
1566         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1567
1568     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1569     def test_hold_up(self):
1570         """ hold BFD session up """
1571         bfd_session_up(self)
1572         for dummy in range(self.test_session.detect_mult * 2):
1573             wait_for_bfd_packet(self)
1574             self.test_session.send_packet()
1575         self.assert_equal(len(self.vapi.collect_events()), 0,
1576                           "number of bfd events")
1577         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1578
1579     def test_echo_looped_back(self):
1580         """ echo packets looped back """
1581         # don't need a session in this case..
1582         self.vpp_session.remove_vpp_config()
1583         self.pg0.enable_capture()
1584         echo_packet_count = 10
1585         # random source port low enough to increment a few times..
1586         udp_sport_tx = randint(1, 50000)
1587         udp_sport_rx = udp_sport_tx
1588         echo_packet = (Ether(src=self.pg0.remote_mac,
1589                              dst=self.pg0.local_mac) /
1590                        IPv6(src=self.pg0.remote_ip6,
1591                             dst=self.pg0.remote_ip6) /
1592                        UDP(dport=BFD.udp_dport_echo) /
1593                        Raw("this should be looped back"))
1594         for dummy in range(echo_packet_count):
1595             self.sleep(.01, "delay between echo packets")
1596             echo_packet[UDP].sport = udp_sport_tx
1597             udp_sport_tx += 1
1598             self.logger.debug(ppp("Sending packet:", echo_packet))
1599             self.pg0.add_stream(echo_packet)
1600             self.pg_start()
1601         for dummy in range(echo_packet_count):
1602             p = self.pg0.wait_for_packet(1)
1603             self.logger.debug(ppp("Got packet:", p))
1604             ether = p[Ether]
1605             self.assert_equal(self.pg0.remote_mac,
1606                               ether.dst, "Destination MAC")
1607             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1608             ip = p[IPv6]
1609             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1610             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1611             udp = p[UDP]
1612             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1613                               "UDP destination port")
1614             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1615             udp_sport_rx += 1
1616             # need to compare the hex payload here, otherwise BFD_vpp_echo
1617             # gets in way
1618             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1619                              scapy.compat.raw(echo_packet[UDP].payload),
1620                              "Received packet is not the echo packet sent")
1621         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1622                           "ECHO packet identifier for test purposes)")
1623         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1624                           "ECHO packet identifier for test purposes)")
1625
1626     def test_echo(self):
1627         """ echo function """
1628         bfd_session_up(self)
1629         self.test_session.update(required_min_echo_rx=150000)
1630         self.test_session.send_packet()
1631         detection_time = self.test_session.detect_mult *\
1632             self.vpp_session.required_min_rx / USEC_IN_SEC
1633         # echo shouldn't work without echo source set
1634         for dummy in range(10):
1635             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1636             self.sleep(sleep, "delay before sending bfd packet")
1637             self.test_session.send_packet()
1638         p = wait_for_bfd_packet(
1639             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1640         self.assert_equal(p[BFD].required_min_rx_interval,
1641                           self.vpp_session.required_min_rx,
1642                           "BFD required min rx interval")
1643         self.test_session.send_packet()
1644         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1645         echo_seen = False
1646         # should be turned on - loopback echo packets
1647         for dummy in range(3):
1648             loop_until = time.time() + 0.75 * detection_time
1649             while time.time() < loop_until:
1650                 p = self.pg0.wait_for_packet(1)
1651                 self.logger.debug(ppp("Got packet:", p))
1652                 if p[UDP].dport == BFD.udp_dport_echo:
1653                     self.assert_equal(
1654                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1655                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1656                                         "BFD ECHO src IP equal to loopback IP")
1657                     self.logger.debug(ppp("Looping back packet:", p))
1658                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1659                                       "ECHO packet destination MAC address")
1660                     p[Ether].dst = self.pg0.local_mac
1661                     self.pg0.add_stream(p)
1662                     self.pg_start()
1663                     echo_seen = True
1664                 elif p.haslayer(BFD):
1665                     if echo_seen:
1666                         self.assertGreaterEqual(
1667                             p[BFD].required_min_rx_interval,
1668                             1000000)
1669                     if "P" in p.sprintf("%BFD.flags%"):
1670                         final = self.test_session.create_packet()
1671                         final[BFD].flags = "F"
1672                         self.test_session.send_packet(final)
1673                 else:
1674                     raise Exception(ppp("Received unknown packet:", p))
1675
1676                 self.assert_equal(len(self.vapi.collect_events()), 0,
1677                                   "number of bfd events")
1678             self.test_session.send_packet()
1679         self.assertTrue(echo_seen, "No echo packets received")
1680
1681     def test_intf_deleted(self):
1682         """ interface with bfd session deleted """
1683         intf = VppLoInterface(self)
1684         intf.config_ip6()
1685         intf.admin_up()
1686         sw_if_index = intf.sw_if_index
1687         vpp_session = VppBFDUDPSession(
1688             self, intf, intf.remote_ip6, af=AF_INET6)
1689         vpp_session.add_vpp_config()
1690         vpp_session.admin_up()
1691         intf.remove_vpp_config()
1692         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1693         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1694         self.assertFalse(vpp_session.query_vpp_config())
1695
1696
1697 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1698 class BFDFIBTestCase(VppTestCase):
1699     """ BFD-FIB interactions (IPv6) """
1700
1701     vpp_session = None
1702     test_session = None
1703
1704     @classmethod
1705     def setUpClass(cls):
1706         super(BFDFIBTestCase, cls).setUpClass()
1707
1708     @classmethod
1709     def tearDownClass(cls):
1710         super(BFDFIBTestCase, cls).tearDownClass()
1711
1712     def setUp(self):
1713         super(BFDFIBTestCase, self).setUp()
1714         self.create_pg_interfaces(range(1))
1715
1716         self.vapi.want_bfd_events()
1717         self.pg0.enable_capture()
1718
1719         for i in self.pg_interfaces:
1720             i.admin_up()
1721             i.config_ip6()
1722             i.configure_ipv6_neighbors()
1723
1724     def tearDown(self):
1725         if not self.vpp_dead:
1726             self.vapi.want_bfd_events(enable_disable=0)
1727
1728         super(BFDFIBTestCase, self).tearDown()
1729
1730     @staticmethod
1731     def pkt_is_not_data_traffic(p):
1732         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1733         if p.haslayer(BFD) or is_ipv6_misc(p):
1734             return True
1735         return False
1736
1737     def test_session_with_fib(self):
1738         """ BFD-FIB interactions """
1739
1740         # packets to match against both of the routes
1741         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1742               IPv6(src="3001::1", dst="2001::1") /
1743               UDP(sport=1234, dport=1234) /
1744               Raw('\xa5' * 100)),
1745              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1746               IPv6(src="3001::1", dst="2002::1") /
1747               UDP(sport=1234, dport=1234) /
1748               Raw('\xa5' * 100))]
1749
1750         # A recursive and a non-recursive route via a next-hop that
1751         # will have a BFD session
1752         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1753                                   [VppRoutePath(self.pg0.remote_ip6,
1754                                                 self.pg0.sw_if_index,
1755                                                 proto=DpoProto.DPO_PROTO_IP6)],
1756                                   is_ip6=1)
1757         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1758                                   [VppRoutePath(self.pg0.remote_ip6,
1759                                                 0xffffffff,
1760                                                 proto=DpoProto.DPO_PROTO_IP6)],
1761                                   is_ip6=1)
1762         ip_2001_s_64.add_vpp_config()
1763         ip_2002_s_64.add_vpp_config()
1764
1765         # bring the session up now the routes are present
1766         self.vpp_session = VppBFDUDPSession(self,
1767                                             self.pg0,
1768                                             self.pg0.remote_ip6,
1769                                             af=AF_INET6)
1770         self.vpp_session.add_vpp_config()
1771         self.vpp_session.admin_up()
1772         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1773
1774         # session is up - traffic passes
1775         bfd_session_up(self)
1776
1777         self.pg0.add_stream(p)
1778         self.pg_start()
1779         for packet in p:
1780             captured = self.pg0.wait_for_packet(
1781                 1,
1782                 filter_out_fn=self.pkt_is_not_data_traffic)
1783             self.assertEqual(captured[IPv6].dst,
1784                              packet[IPv6].dst)
1785
1786         # session is up - traffic is dropped
1787         bfd_session_down(self)
1788
1789         self.pg0.add_stream(p)
1790         self.pg_start()
1791         with self.assertRaises(CaptureTimeoutError):
1792             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1793
1794         # session is up - traffic passes
1795         bfd_session_up(self)
1796
1797         self.pg0.add_stream(p)
1798         self.pg_start()
1799         for packet in p:
1800             captured = self.pg0.wait_for_packet(
1801                 1,
1802                 filter_out_fn=self.pkt_is_not_data_traffic)
1803             self.assertEqual(captured[IPv6].dst,
1804                              packet[IPv6].dst)
1805
1806
1807 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1808 class BFDSHA1TestCase(VppTestCase):
1809     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1810
1811     pg0 = None
1812     vpp_clock_offset = None
1813     vpp_session = None
1814     test_session = None
1815
1816     @classmethod
1817     def setUpClass(cls):
1818         super(BFDSHA1TestCase, cls).setUpClass()
1819         cls.vapi.cli("set log class bfd level debug")
1820         try:
1821             cls.create_pg_interfaces([0])
1822             cls.pg0.config_ip4()
1823             cls.pg0.admin_up()
1824             cls.pg0.resolve_arp()
1825
1826         except Exception:
1827             super(BFDSHA1TestCase, cls).tearDownClass()
1828             raise
1829
1830     @classmethod
1831     def tearDownClass(cls):
1832         super(BFDSHA1TestCase, cls).tearDownClass()
1833
1834     def setUp(self):
1835         super(BFDSHA1TestCase, self).setUp()
1836         self.factory = AuthKeyFactory()
1837         self.vapi.want_bfd_events()
1838         self.pg0.enable_capture()
1839
1840     def tearDown(self):
1841         if not self.vpp_dead:
1842             self.vapi.want_bfd_events(enable_disable=0)
1843         self.vapi.collect_events()  # clear the event queue
1844         super(BFDSHA1TestCase, self).tearDown()
1845
1846     def test_session_up(self):
1847         """ bring BFD session up """
1848         key = self.factory.create_random_key(self)
1849         key.add_vpp_config()
1850         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1851                                             self.pg0.remote_ip4,
1852                                             sha1_key=key)
1853         self.vpp_session.add_vpp_config()
1854         self.vpp_session.admin_up()
1855         self.test_session = BFDTestSession(
1856             self, self.pg0, AF_INET, sha1_key=key,
1857             bfd_key_id=self.vpp_session.bfd_key_id)
1858         bfd_session_up(self)
1859
1860     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1861     def test_hold_up(self):
1862         """ hold BFD session up """
1863         key = self.factory.create_random_key(self)
1864         key.add_vpp_config()
1865         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1866                                             self.pg0.remote_ip4,
1867                                             sha1_key=key)
1868         self.vpp_session.add_vpp_config()
1869         self.vpp_session.admin_up()
1870         self.test_session = BFDTestSession(
1871             self, self.pg0, AF_INET, sha1_key=key,
1872             bfd_key_id=self.vpp_session.bfd_key_id)
1873         bfd_session_up(self)
1874         for dummy in range(self.test_session.detect_mult * 2):
1875             wait_for_bfd_packet(self)
1876             self.test_session.send_packet()
1877         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1878
1879     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1880     def test_hold_up_meticulous(self):
1881         """ hold BFD session up - meticulous auth """
1882         key = self.factory.create_random_key(
1883             self, BFDAuthType.meticulous_keyed_sha1)
1884         key.add_vpp_config()
1885         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1886                                             self.pg0.remote_ip4, sha1_key=key)
1887         self.vpp_session.add_vpp_config()
1888         self.vpp_session.admin_up()
1889         # specify sequence number so that it wraps
1890         self.test_session = BFDTestSession(
1891             self, self.pg0, AF_INET, sha1_key=key,
1892             bfd_key_id=self.vpp_session.bfd_key_id,
1893             our_seq_number=0xFFFFFFFF - 4)
1894         bfd_session_up(self)
1895         for dummy in range(30):
1896             wait_for_bfd_packet(self)
1897             self.test_session.inc_seq_num()
1898             self.test_session.send_packet()
1899         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1900
1901     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1902     def test_send_bad_seq_number(self):
1903         """ session is not kept alive by msgs with bad sequence numbers"""
1904         key = self.factory.create_random_key(
1905             self, BFDAuthType.meticulous_keyed_sha1)
1906         key.add_vpp_config()
1907         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1908                                             self.pg0.remote_ip4, sha1_key=key)
1909         self.vpp_session.add_vpp_config()
1910         self.test_session = BFDTestSession(
1911             self, self.pg0, AF_INET, sha1_key=key,
1912             bfd_key_id=self.vpp_session.bfd_key_id)
1913         bfd_session_up(self)
1914         detection_time = self.test_session.detect_mult *\
1915             self.vpp_session.required_min_rx / USEC_IN_SEC
1916         send_until = time.time() + 2 * detection_time
1917         while time.time() < send_until:
1918             self.test_session.send_packet()
1919             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1920                        "time between bfd packets")
1921         e = self.vapi.collect_events()
1922         # session should be down now, because the sequence numbers weren't
1923         # updated
1924         self.assert_equal(len(e), 1, "number of bfd events")
1925         verify_event(self, e[0], expected_state=BFDState.down)
1926
1927     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1928                                        legitimate_test_session,
1929                                        rogue_test_session,
1930                                        rogue_bfd_values=None):
1931         """ execute a rogue session interaction scenario
1932
1933         1. create vpp session, add config
1934         2. bring the legitimate session up
1935         3. copy the bfd values from legitimate session to rogue session
1936         4. apply rogue_bfd_values to rogue session
1937         5. set rogue session state to down
1938         6. send message to take the session down from the rogue session
1939         7. assert that the legitimate session is unaffected
1940         """
1941
1942         self.vpp_session = vpp_bfd_udp_session
1943         self.vpp_session.add_vpp_config()
1944         self.test_session = legitimate_test_session
1945         # bring vpp session up
1946         bfd_session_up(self)
1947         # send packet from rogue session
1948         rogue_test_session.update(
1949             my_discriminator=self.test_session.my_discriminator,
1950             your_discriminator=self.test_session.your_discriminator,
1951             desired_min_tx=self.test_session.desired_min_tx,
1952             required_min_rx=self.test_session.required_min_rx,
1953             detect_mult=self.test_session.detect_mult,
1954             diag=self.test_session.diag,
1955             state=self.test_session.state,
1956             auth_type=self.test_session.auth_type)
1957         if rogue_bfd_values:
1958             rogue_test_session.update(**rogue_bfd_values)
1959         rogue_test_session.update(state=BFDState.down)
1960         rogue_test_session.send_packet()
1961         wait_for_bfd_packet(self)
1962         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1963
1964     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1965     def test_mismatch_auth(self):
1966         """ session is not brought down by unauthenticated msg """
1967         key = self.factory.create_random_key(self)
1968         key.add_vpp_config()
1969         vpp_session = VppBFDUDPSession(
1970             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1971         legitimate_test_session = BFDTestSession(
1972             self, self.pg0, AF_INET, sha1_key=key,
1973             bfd_key_id=vpp_session.bfd_key_id)
1974         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1975         self.execute_rogue_session_scenario(vpp_session,
1976                                             legitimate_test_session,
1977                                             rogue_test_session)
1978
1979     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1980     def test_mismatch_bfd_key_id(self):
1981         """ session is not brought down by msg with non-existent key-id """
1982         key = self.factory.create_random_key(self)
1983         key.add_vpp_config()
1984         vpp_session = VppBFDUDPSession(
1985             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1986         # pick a different random bfd key id
1987         x = randint(0, 255)
1988         while x == vpp_session.bfd_key_id:
1989             x = randint(0, 255)
1990         legitimate_test_session = BFDTestSession(
1991             self, self.pg0, AF_INET, sha1_key=key,
1992             bfd_key_id=vpp_session.bfd_key_id)
1993         rogue_test_session = BFDTestSession(
1994             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1995         self.execute_rogue_session_scenario(vpp_session,
1996                                             legitimate_test_session,
1997                                             rogue_test_session)
1998
1999     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2000     def test_mismatched_auth_type(self):
2001         """ session is not brought down by msg with wrong auth type """
2002         key = self.factory.create_random_key(self)
2003         key.add_vpp_config()
2004         vpp_session = VppBFDUDPSession(
2005             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2006         legitimate_test_session = BFDTestSession(
2007             self, self.pg0, AF_INET, sha1_key=key,
2008             bfd_key_id=vpp_session.bfd_key_id)
2009         rogue_test_session = BFDTestSession(
2010             self, self.pg0, AF_INET, sha1_key=key,
2011             bfd_key_id=vpp_session.bfd_key_id)
2012         self.execute_rogue_session_scenario(
2013             vpp_session, legitimate_test_session, rogue_test_session,
2014             {'auth_type': BFDAuthType.keyed_md5})
2015
2016     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2017     def test_restart(self):
2018         """ simulate remote peer restart and resynchronization """
2019         key = self.factory.create_random_key(
2020             self, BFDAuthType.meticulous_keyed_sha1)
2021         key.add_vpp_config()
2022         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2023                                             self.pg0.remote_ip4, sha1_key=key)
2024         self.vpp_session.add_vpp_config()
2025         self.test_session = BFDTestSession(
2026             self, self.pg0, AF_INET, sha1_key=key,
2027             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2028         bfd_session_up(self)
2029         # don't send any packets for 2*detection_time
2030         detection_time = self.test_session.detect_mult *\
2031             self.vpp_session.required_min_rx / USEC_IN_SEC
2032         self.sleep(2 * detection_time, "simulating peer restart")
2033         events = self.vapi.collect_events()
2034         self.assert_equal(len(events), 1, "number of bfd events")
2035         verify_event(self, events[0], expected_state=BFDState.down)
2036         self.test_session.update(state=BFDState.down)
2037         # reset sequence number
2038         self.test_session.our_seq_number = 0
2039         self.test_session.vpp_seq_number = None
2040         # now throw away any pending packets
2041         self.pg0.enable_capture()
2042         bfd_session_up(self)
2043
2044
2045 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2046 class BFDAuthOnOffTestCase(VppTestCase):
2047     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2048
2049     pg0 = None
2050     vpp_session = None
2051     test_session = None
2052
2053     @classmethod
2054     def setUpClass(cls):
2055         super(BFDAuthOnOffTestCase, cls).setUpClass()
2056         cls.vapi.cli("set log class bfd level debug")
2057         try:
2058             cls.create_pg_interfaces([0])
2059             cls.pg0.config_ip4()
2060             cls.pg0.admin_up()
2061             cls.pg0.resolve_arp()
2062
2063         except Exception:
2064             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2065             raise
2066
2067     @classmethod
2068     def tearDownClass(cls):
2069         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2070
2071     def setUp(self):
2072         super(BFDAuthOnOffTestCase, self).setUp()
2073         self.factory = AuthKeyFactory()
2074         self.vapi.want_bfd_events()
2075         self.pg0.enable_capture()
2076
2077     def tearDown(self):
2078         if not self.vpp_dead:
2079             self.vapi.want_bfd_events(enable_disable=0)
2080         self.vapi.collect_events()  # clear the event queue
2081         super(BFDAuthOnOffTestCase, self).tearDown()
2082
2083     def test_auth_on_immediate(self):
2084         """ turn auth on without disturbing session state (immediate) """
2085         key = self.factory.create_random_key(self)
2086         key.add_vpp_config()
2087         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2088                                             self.pg0.remote_ip4)
2089         self.vpp_session.add_vpp_config()
2090         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2091         bfd_session_up(self)
2092         for dummy in range(self.test_session.detect_mult * 2):
2093             p = wait_for_bfd_packet(self)
2094             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2095             self.test_session.send_packet()
2096         self.vpp_session.activate_auth(key)
2097         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2098         self.test_session.sha1_key = key
2099         for dummy in range(self.test_session.detect_mult * 2):
2100             p = wait_for_bfd_packet(self)
2101             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2102             self.test_session.send_packet()
2103         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2104         self.assert_equal(len(self.vapi.collect_events()), 0,
2105                           "number of bfd events")
2106
2107     def test_auth_off_immediate(self):
2108         """ turn auth off without disturbing session state (immediate) """
2109         key = self.factory.create_random_key(self)
2110         key.add_vpp_config()
2111         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2112                                             self.pg0.remote_ip4, sha1_key=key)
2113         self.vpp_session.add_vpp_config()
2114         self.test_session = BFDTestSession(
2115             self, self.pg0, AF_INET, sha1_key=key,
2116             bfd_key_id=self.vpp_session.bfd_key_id)
2117         bfd_session_up(self)
2118         # self.vapi.want_bfd_events(enable_disable=0)
2119         for dummy in range(self.test_session.detect_mult * 2):
2120             p = wait_for_bfd_packet(self)
2121             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2122             self.test_session.inc_seq_num()
2123             self.test_session.send_packet()
2124         self.vpp_session.deactivate_auth()
2125         self.test_session.bfd_key_id = None
2126         self.test_session.sha1_key = None
2127         for dummy in range(self.test_session.detect_mult * 2):
2128             p = wait_for_bfd_packet(self)
2129             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2130             self.test_session.inc_seq_num()
2131             self.test_session.send_packet()
2132         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2133         self.assert_equal(len(self.vapi.collect_events()), 0,
2134                           "number of bfd events")
2135
2136     def test_auth_change_key_immediate(self):
2137         """ change auth key without disturbing session state (immediate) """
2138         key1 = self.factory.create_random_key(self)
2139         key1.add_vpp_config()
2140         key2 = self.factory.create_random_key(self)
2141         key2.add_vpp_config()
2142         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2143                                             self.pg0.remote_ip4, sha1_key=key1)
2144         self.vpp_session.add_vpp_config()
2145         self.test_session = BFDTestSession(
2146             self, self.pg0, AF_INET, sha1_key=key1,
2147             bfd_key_id=self.vpp_session.bfd_key_id)
2148         bfd_session_up(self)
2149         for dummy in range(self.test_session.detect_mult * 2):
2150             p = wait_for_bfd_packet(self)
2151             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2152             self.test_session.send_packet()
2153         self.vpp_session.activate_auth(key2)
2154         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2155         self.test_session.sha1_key = key2
2156         for dummy in range(self.test_session.detect_mult * 2):
2157             p = wait_for_bfd_packet(self)
2158             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2159             self.test_session.send_packet()
2160         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2161         self.assert_equal(len(self.vapi.collect_events()), 0,
2162                           "number of bfd events")
2163
2164     def test_auth_on_delayed(self):
2165         """ turn auth on without disturbing session state (delayed) """
2166         key = self.factory.create_random_key(self)
2167         key.add_vpp_config()
2168         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2169                                             self.pg0.remote_ip4)
2170         self.vpp_session.add_vpp_config()
2171         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2172         bfd_session_up(self)
2173         for dummy in range(self.test_session.detect_mult * 2):
2174             wait_for_bfd_packet(self)
2175             self.test_session.send_packet()
2176         self.vpp_session.activate_auth(key, delayed=True)
2177         for dummy in range(self.test_session.detect_mult * 2):
2178             p = wait_for_bfd_packet(self)
2179             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2180             self.test_session.send_packet()
2181         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2182         self.test_session.sha1_key = key
2183         self.test_session.send_packet()
2184         for dummy in range(self.test_session.detect_mult * 2):
2185             p = wait_for_bfd_packet(self)
2186             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2187             self.test_session.send_packet()
2188         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2189         self.assert_equal(len(self.vapi.collect_events()), 0,
2190                           "number of bfd events")
2191
2192     def test_auth_off_delayed(self):
2193         """ turn auth off without disturbing session state (delayed) """
2194         key = self.factory.create_random_key(self)
2195         key.add_vpp_config()
2196         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2197                                             self.pg0.remote_ip4, sha1_key=key)
2198         self.vpp_session.add_vpp_config()
2199         self.test_session = BFDTestSession(
2200             self, self.pg0, AF_INET, sha1_key=key,
2201             bfd_key_id=self.vpp_session.bfd_key_id)
2202         bfd_session_up(self)
2203         for dummy in range(self.test_session.detect_mult * 2):
2204             p = wait_for_bfd_packet(self)
2205             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2206             self.test_session.send_packet()
2207         self.vpp_session.deactivate_auth(delayed=True)
2208         for dummy in range(self.test_session.detect_mult * 2):
2209             p = wait_for_bfd_packet(self)
2210             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2211             self.test_session.send_packet()
2212         self.test_session.bfd_key_id = None
2213         self.test_session.sha1_key = None
2214         self.test_session.send_packet()
2215         for dummy in range(self.test_session.detect_mult * 2):
2216             p = wait_for_bfd_packet(self)
2217             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2218             self.test_session.send_packet()
2219         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2220         self.assert_equal(len(self.vapi.collect_events()), 0,
2221                           "number of bfd events")
2222
2223     def test_auth_change_key_delayed(self):
2224         """ change auth key without disturbing session state (delayed) """
2225         key1 = self.factory.create_random_key(self)
2226         key1.add_vpp_config()
2227         key2 = self.factory.create_random_key(self)
2228         key2.add_vpp_config()
2229         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2230                                             self.pg0.remote_ip4, sha1_key=key1)
2231         self.vpp_session.add_vpp_config()
2232         self.vpp_session.admin_up()
2233         self.test_session = BFDTestSession(
2234             self, self.pg0, AF_INET, sha1_key=key1,
2235             bfd_key_id=self.vpp_session.bfd_key_id)
2236         bfd_session_up(self)
2237         for dummy in range(self.test_session.detect_mult * 2):
2238             p = wait_for_bfd_packet(self)
2239             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2240             self.test_session.send_packet()
2241         self.vpp_session.activate_auth(key2, delayed=True)
2242         for dummy in range(self.test_session.detect_mult * 2):
2243             p = wait_for_bfd_packet(self)
2244             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2245             self.test_session.send_packet()
2246         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2247         self.test_session.sha1_key = key2
2248         self.test_session.send_packet()
2249         for dummy in range(self.test_session.detect_mult * 2):
2250             p = wait_for_bfd_packet(self)
2251             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2252             self.test_session.send_packet()
2253         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2254         self.assert_equal(len(self.vapi.collect_events()), 0,
2255                           "number of bfd events")
2256
2257
2258 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2259 class BFDCLITestCase(VppTestCase):
2260     """Bidirectional Forwarding Detection (BFD) (CLI) """
2261     pg0 = None
2262
2263     @classmethod
2264     def setUpClass(cls):
2265         super(BFDCLITestCase, cls).setUpClass()
2266         cls.vapi.cli("set log class bfd level debug")
2267         try:
2268             cls.create_pg_interfaces((0,))
2269             cls.pg0.config_ip4()
2270             cls.pg0.config_ip6()
2271             cls.pg0.resolve_arp()
2272             cls.pg0.resolve_ndp()
2273
2274         except Exception:
2275             super(BFDCLITestCase, cls).tearDownClass()
2276             raise
2277
2278     @classmethod
2279     def tearDownClass(cls):
2280         super(BFDCLITestCase, cls).tearDownClass()
2281
2282     def setUp(self):
2283         super(BFDCLITestCase, self).setUp()
2284         self.factory = AuthKeyFactory()
2285         self.pg0.enable_capture()
2286
2287     def tearDown(self):
2288         try:
2289             self.vapi.want_bfd_events(enable_disable=0)
2290         except UnexpectedApiReturnValueError:
2291             # some tests aren't subscribed, so this is not an issue
2292             pass
2293         self.vapi.collect_events()  # clear the event queue
2294         super(BFDCLITestCase, self).tearDown()
2295
2296     def cli_verify_no_response(self, cli):
2297         """ execute a CLI, asserting that the response is empty """
2298         self.assert_equal(self.vapi.cli(cli),
2299                           b"",
2300                           "CLI command response")
2301
2302     def cli_verify_response(self, cli, expected):
2303         """ execute a CLI, asserting that the response matches expectation """
2304         self.assert_equal(self.vapi.cli(cli).strip(),
2305                           expected,
2306                           "CLI command response")
2307
2308     def test_show(self):
2309         """ show commands """
2310         k1 = self.factory.create_random_key(self)
2311         k1.add_vpp_config()
2312         k2 = self.factory.create_random_key(
2313             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2314         k2.add_vpp_config()
2315         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2316         s1.add_vpp_config()
2317         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2318                               sha1_key=k2)
2319         s2.add_vpp_config()
2320         self.logger.info(self.vapi.ppcli("show bfd keys"))
2321         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2322         self.logger.info(self.vapi.ppcli("show bfd"))
2323
2324     def test_set_del_sha1_key(self):
2325         """ set/delete SHA1 auth key """
2326         k = self.factory.create_random_key(self)
2327         self.registry.register(k, self.logger)
2328         self.cli_verify_no_response(
2329             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2330             (k.conf_key_id,
2331                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2332         self.assertTrue(k.query_vpp_config())
2333         self.vpp_session = VppBFDUDPSession(
2334             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2335         self.vpp_session.add_vpp_config()
2336         self.test_session = \
2337             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2338                            bfd_key_id=self.vpp_session.bfd_key_id)
2339         self.vapi.want_bfd_events()
2340         bfd_session_up(self)
2341         bfd_session_down(self)
2342         # try to replace the secret for the key - should fail because the key
2343         # is in-use
2344         k2 = self.factory.create_random_key(self)
2345         self.cli_verify_response(
2346             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2347             (k.conf_key_id,
2348                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2349             "bfd key set: `bfd_auth_set_key' API call failed, "
2350             "rv=-103:BFD object in use")
2351         # manipulating the session using old secret should still work
2352         bfd_session_up(self)
2353         bfd_session_down(self)
2354         self.vpp_session.remove_vpp_config()
2355         self.cli_verify_no_response(
2356             "bfd key del conf-key-id %s" % k.conf_key_id)
2357         self.assertFalse(k.query_vpp_config())
2358
2359     def test_set_del_meticulous_sha1_key(self):
2360         """ set/delete meticulous SHA1 auth key """
2361         k = self.factory.create_random_key(
2362             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2363         self.registry.register(k, self.logger)
2364         self.cli_verify_no_response(
2365             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2366             (k.conf_key_id,
2367                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2368         self.assertTrue(k.query_vpp_config())
2369         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2370                                             self.pg0.remote_ip6, af=AF_INET6,
2371                                             sha1_key=k)
2372         self.vpp_session.add_vpp_config()
2373         self.vpp_session.admin_up()
2374         self.test_session = \
2375             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2376                            bfd_key_id=self.vpp_session.bfd_key_id)
2377         self.vapi.want_bfd_events()
2378         bfd_session_up(self)
2379         bfd_session_down(self)
2380         # try to replace the secret for the key - should fail because the key
2381         # is in-use
2382         k2 = self.factory.create_random_key(self)
2383         self.cli_verify_response(
2384             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2385             (k.conf_key_id,
2386                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2387             "bfd key set: `bfd_auth_set_key' API call failed, "
2388             "rv=-103:BFD object in use")
2389         # manipulating the session using old secret should still work
2390         bfd_session_up(self)
2391         bfd_session_down(self)
2392         self.vpp_session.remove_vpp_config()
2393         self.cli_verify_no_response(
2394             "bfd key del conf-key-id %s" % k.conf_key_id)
2395         self.assertFalse(k.query_vpp_config())
2396
2397     def test_add_mod_del_bfd_udp(self):
2398         """ create/modify/delete IPv4 BFD UDP session """
2399         vpp_session = VppBFDUDPSession(
2400             self, self.pg0, self.pg0.remote_ip4)
2401         self.registry.register(vpp_session, self.logger)
2402         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2403             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2404             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2405                                 self.pg0.remote_ip4,
2406                                 vpp_session.desired_min_tx,
2407                                 vpp_session.required_min_rx,
2408                                 vpp_session.detect_mult)
2409         self.cli_verify_no_response(cli_add_cmd)
2410         # 2nd add should fail
2411         self.cli_verify_response(
2412             cli_add_cmd,
2413             "bfd udp session add: `bfd_add_add_session' API call"
2414             " failed, rv=-101:Duplicate BFD object")
2415         verify_bfd_session_config(self, vpp_session)
2416         mod_session = VppBFDUDPSession(
2417             self, self.pg0, self.pg0.remote_ip4,
2418             required_min_rx=2 * vpp_session.required_min_rx,
2419             desired_min_tx=3 * vpp_session.desired_min_tx,
2420             detect_mult=4 * vpp_session.detect_mult)
2421         self.cli_verify_no_response(
2422             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2423             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2424             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2425              mod_session.desired_min_tx, mod_session.required_min_rx,
2426              mod_session.detect_mult))
2427         verify_bfd_session_config(self, mod_session)
2428         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2429             "peer-addr %s" % (self.pg0.name,
2430                               self.pg0.local_ip4, self.pg0.remote_ip4)
2431         self.cli_verify_no_response(cli_del_cmd)
2432         # 2nd del is expected to fail
2433         self.cli_verify_response(
2434             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2435             " failed, rv=-102:No such BFD object")
2436         self.assertFalse(vpp_session.query_vpp_config())
2437
2438     def test_add_mod_del_bfd_udp6(self):
2439         """ create/modify/delete IPv6 BFD UDP session """
2440         vpp_session = VppBFDUDPSession(
2441             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2442         self.registry.register(vpp_session, self.logger)
2443         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2444             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2445             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2446                                 self.pg0.remote_ip6,
2447                                 vpp_session.desired_min_tx,
2448                                 vpp_session.required_min_rx,
2449                                 vpp_session.detect_mult)
2450         self.cli_verify_no_response(cli_add_cmd)
2451         # 2nd add should fail
2452         self.cli_verify_response(
2453             cli_add_cmd,
2454             "bfd udp session add: `bfd_add_add_session' API call"
2455             " failed, rv=-101:Duplicate BFD object")
2456         verify_bfd_session_config(self, vpp_session)
2457         mod_session = VppBFDUDPSession(
2458             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2459             required_min_rx=2 * vpp_session.required_min_rx,
2460             desired_min_tx=3 * vpp_session.desired_min_tx,
2461             detect_mult=4 * vpp_session.detect_mult)
2462         self.cli_verify_no_response(
2463             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2464             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2465             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2466              mod_session.desired_min_tx,
2467              mod_session.required_min_rx, mod_session.detect_mult))
2468         verify_bfd_session_config(self, mod_session)
2469         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2470             "peer-addr %s" % (self.pg0.name,
2471                               self.pg0.local_ip6, self.pg0.remote_ip6)
2472         self.cli_verify_no_response(cli_del_cmd)
2473         # 2nd del is expected to fail
2474         self.cli_verify_response(
2475             cli_del_cmd,
2476             "bfd udp session del: `bfd_udp_del_session' API call"
2477             " failed, rv=-102:No such BFD object")
2478         self.assertFalse(vpp_session.query_vpp_config())
2479
2480     def test_add_mod_del_bfd_udp_auth(self):
2481         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2482         key = self.factory.create_random_key(self)
2483         key.add_vpp_config()
2484         vpp_session = VppBFDUDPSession(
2485             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2486         self.registry.register(vpp_session, self.logger)
2487         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2488             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2489             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2490             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2491                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2492                vpp_session.detect_mult, key.conf_key_id,
2493                vpp_session.bfd_key_id)
2494         self.cli_verify_no_response(cli_add_cmd)
2495         # 2nd add should fail
2496         self.cli_verify_response(
2497             cli_add_cmd,
2498             "bfd udp session add: `bfd_add_add_session' API call"
2499             " failed, rv=-101:Duplicate BFD object")
2500         verify_bfd_session_config(self, vpp_session)
2501         mod_session = VppBFDUDPSession(
2502             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2503             bfd_key_id=vpp_session.bfd_key_id,
2504             required_min_rx=2 * vpp_session.required_min_rx,
2505             desired_min_tx=3 * vpp_session.desired_min_tx,
2506             detect_mult=4 * vpp_session.detect_mult)
2507         self.cli_verify_no_response(
2508             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2509             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2510             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2511              mod_session.desired_min_tx,
2512              mod_session.required_min_rx, mod_session.detect_mult))
2513         verify_bfd_session_config(self, mod_session)
2514         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2515             "peer-addr %s" % (self.pg0.name,
2516                               self.pg0.local_ip4, self.pg0.remote_ip4)
2517         self.cli_verify_no_response(cli_del_cmd)
2518         # 2nd del is expected to fail
2519         self.cli_verify_response(
2520             cli_del_cmd,
2521             "bfd udp session del: `bfd_udp_del_session' API call"
2522             " failed, rv=-102:No such BFD object")
2523         self.assertFalse(vpp_session.query_vpp_config())
2524
2525     def test_add_mod_del_bfd_udp6_auth(self):
2526         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2527         key = self.factory.create_random_key(
2528             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2529         key.add_vpp_config()
2530         vpp_session = VppBFDUDPSession(
2531             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2532         self.registry.register(vpp_session, self.logger)
2533         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2534             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2535             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2536             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2537                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2538                vpp_session.detect_mult, key.conf_key_id,
2539                vpp_session.bfd_key_id)
2540         self.cli_verify_no_response(cli_add_cmd)
2541         # 2nd add should fail
2542         self.cli_verify_response(
2543             cli_add_cmd,
2544             "bfd udp session add: `bfd_add_add_session' API call"
2545             " failed, rv=-101:Duplicate BFD object")
2546         verify_bfd_session_config(self, vpp_session)
2547         mod_session = VppBFDUDPSession(
2548             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2549             bfd_key_id=vpp_session.bfd_key_id,
2550             required_min_rx=2 * vpp_session.required_min_rx,
2551             desired_min_tx=3 * vpp_session.desired_min_tx,
2552             detect_mult=4 * vpp_session.detect_mult)
2553         self.cli_verify_no_response(
2554             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2555             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2556             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2557              mod_session.desired_min_tx,
2558              mod_session.required_min_rx, mod_session.detect_mult))
2559         verify_bfd_session_config(self, mod_session)
2560         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2561             "peer-addr %s" % (self.pg0.name,
2562                               self.pg0.local_ip6, self.pg0.remote_ip6)
2563         self.cli_verify_no_response(cli_del_cmd)
2564         # 2nd del is expected to fail
2565         self.cli_verify_response(
2566             cli_del_cmd,
2567             "bfd udp session del: `bfd_udp_del_session' API call"
2568             " failed, rv=-102:No such BFD object")
2569         self.assertFalse(vpp_session.query_vpp_config())
2570
2571     def test_auth_on_off(self):
2572         """ turn authentication on and off """
2573         key = self.factory.create_random_key(
2574             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2575         key.add_vpp_config()
2576         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2577         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2578                                         sha1_key=key)
2579         session.add_vpp_config()
2580         cli_activate = \
2581             "bfd udp session auth activate interface %s local-addr %s "\
2582             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2583             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2584                key.conf_key_id, auth_session.bfd_key_id)
2585         self.cli_verify_no_response(cli_activate)
2586         verify_bfd_session_config(self, auth_session)
2587         self.cli_verify_no_response(cli_activate)
2588         verify_bfd_session_config(self, auth_session)
2589         cli_deactivate = \
2590             "bfd udp session auth deactivate interface %s local-addr %s "\
2591             "peer-addr %s "\
2592             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2593         self.cli_verify_no_response(cli_deactivate)
2594         verify_bfd_session_config(self, session)
2595         self.cli_verify_no_response(cli_deactivate)
2596         verify_bfd_session_config(self, session)
2597
2598     def test_auth_on_off_delayed(self):
2599         """ turn authentication on and off (delayed) """
2600         key = self.factory.create_random_key(
2601             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2602         key.add_vpp_config()
2603         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2604         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2605                                         sha1_key=key)
2606         session.add_vpp_config()
2607         cli_activate = \
2608             "bfd udp session auth activate interface %s local-addr %s "\
2609             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2610             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2611                key.conf_key_id, auth_session.bfd_key_id)
2612         self.cli_verify_no_response(cli_activate)
2613         verify_bfd_session_config(self, auth_session)
2614         self.cli_verify_no_response(cli_activate)
2615         verify_bfd_session_config(self, auth_session)
2616         cli_deactivate = \
2617             "bfd udp session auth deactivate interface %s local-addr %s "\
2618             "peer-addr %s delayed yes"\
2619             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2620         self.cli_verify_no_response(cli_deactivate)
2621         verify_bfd_session_config(self, session)
2622         self.cli_verify_no_response(cli_deactivate)
2623         verify_bfd_session_config(self, session)
2624
2625     def test_admin_up_down(self):
2626         """ put session admin-up and admin-down """
2627         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2628         session.add_vpp_config()
2629         cli_down = \
2630             "bfd udp session set-flags admin down interface %s local-addr %s "\
2631             "peer-addr %s "\
2632             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2633         cli_up = \
2634             "bfd udp session set-flags admin up interface %s local-addr %s "\
2635             "peer-addr %s "\
2636             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2637         self.cli_verify_no_response(cli_down)
2638         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2639         self.cli_verify_no_response(cli_up)
2640         verify_bfd_session_config(self, session, state=BFDState.down)
2641
2642     def test_set_del_udp_echo_source(self):
2643         """ set/del udp echo source """
2644         self.create_loopback_interfaces(1)
2645         self.loopback0 = self.lo_interfaces[0]
2646         self.loopback0.admin_up()
2647         self.cli_verify_response("show bfd echo-source",
2648                                  "UDP echo source is not set.")
2649         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2650         self.cli_verify_no_response(cli_set)
2651         self.cli_verify_response("show bfd echo-source",
2652                                  "UDP echo source is: %s\n"
2653                                  "IPv4 address usable as echo source: none\n"
2654                                  "IPv6 address usable as echo source: none" %
2655                                  self.loopback0.name)
2656         self.loopback0.config_ip4()
2657         unpacked = unpack("!L", self.loopback0.local_ip4n)
2658         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2659         self.cli_verify_response("show bfd echo-source",
2660                                  "UDP echo source is: %s\n"
2661                                  "IPv4 address usable as echo source: %s\n"
2662                                  "IPv6 address usable as echo source: none" %
2663                                  (self.loopback0.name, echo_ip4))
2664         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2665         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2666                                             unpacked[2], unpacked[3] ^ 1))
2667         self.loopback0.config_ip6()
2668         self.cli_verify_response("show bfd echo-source",
2669                                  "UDP echo source is: %s\n"
2670                                  "IPv4 address usable as echo source: %s\n"
2671                                  "IPv6 address usable as echo source: %s" %
2672                                  (self.loopback0.name, echo_ip4, echo_ip6))
2673         cli_del = "bfd udp echo-source del"
2674         self.cli_verify_no_response(cli_del)
2675         self.cli_verify_response("show bfd echo-source",
2676                                  "UDP echo source is not set.")
2677
2678 if __name__ == '__main__':
2679     unittest.main(testRunner=VppTestRunner)