tests: add generalized tags for tests, use them for run-solo tests
[vpp.git] / src / vnet / bfd / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import ipaddress
9 import reprlib
10 import time
11 import unittest
12 from random import randint, shuffle, getrandbits
13 from socket import AF_INET, AF_INET6, inet_ntop
14 from struct import pack, unpack
15
16 import scapy.compat
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
21
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23     BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from framework import tag_run_solo
26 from util import ppp
27 from vpp_ip import DpoProto
28 from vpp_ip_route import VppIpRoute, VppRoutePath
29 from vpp_lo_interface import VppLoInterface
30 from vpp_papi_provider import UnexpectedApiReturnValueError, \
31     CliFailedCommandError
32 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
33 from vpp_gre_interface import VppGreInterface
34 from vpp_papi import VppEnum
35
36 USEC_IN_SEC = 1000000
37
38
39 class AuthKeyFactory(object):
40     """Factory class for creating auth keys with unique conf key ID"""
41
42     def __init__(self):
43         self._conf_key_ids = {}
44
45     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
46         """ create a random key with unique conf key id """
47         conf_key_id = randint(0, 0xFFFFFFFF)
48         while conf_key_id in self._conf_key_ids:
49             conf_key_id = randint(0, 0xFFFFFFFF)
50         self._conf_key_ids[conf_key_id] = 1
51         key = scapy.compat.raw(
52             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
53         return VppBFDAuthKey(test=test, auth_type=auth_type,
54                              conf_key_id=conf_key_id, key=key)
55
56
57 class BFDAPITestCase(VppTestCase):
58     """Bidirectional Forwarding Detection (BFD) - API"""
59
60     pg0 = None
61     pg1 = None
62
63     @classmethod
64     def setUpClass(cls):
65         super(BFDAPITestCase, cls).setUpClass()
66         cls.vapi.cli("set log class bfd level debug")
67         try:
68             cls.create_pg_interfaces(range(2))
69             for i in cls.pg_interfaces:
70                 i.config_ip4()
71                 i.config_ip6()
72                 i.resolve_arp()
73
74         except Exception:
75             super(BFDAPITestCase, cls).tearDownClass()
76             raise
77
78     @classmethod
79     def tearDownClass(cls):
80         super(BFDAPITestCase, cls).tearDownClass()
81
82     def setUp(self):
83         super(BFDAPITestCase, self).setUp()
84         self.factory = AuthKeyFactory()
85
86     def test_add_bfd(self):
87         """ create a BFD session """
88         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
89         session.add_vpp_config()
90         self.logger.debug("Session state is %s", session.state)
91         session.remove_vpp_config()
92         session.add_vpp_config()
93         self.logger.debug("Session state is %s", session.state)
94         session.remove_vpp_config()
95
96     def test_double_add(self):
97         """ create the same BFD session twice (negative case) """
98         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
99         session.add_vpp_config()
100
101         with self.vapi.assert_negative_api_retval():
102             session.add_vpp_config()
103
104         session.remove_vpp_config()
105
106     def test_add_bfd6(self):
107         """ create IPv6 BFD session """
108         session = VppBFDUDPSession(
109             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
110         session.add_vpp_config()
111         self.logger.debug("Session state is %s", session.state)
112         session.remove_vpp_config()
113         session.add_vpp_config()
114         self.logger.debug("Session state is %s", session.state)
115         session.remove_vpp_config()
116
117     def test_mod_bfd(self):
118         """ modify BFD session parameters """
119         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
120                                    desired_min_tx=50000,
121                                    required_min_rx=10000,
122                                    detect_mult=1)
123         session.add_vpp_config()
124         s = session.get_bfd_udp_session_dump_entry()
125         self.assert_equal(session.desired_min_tx,
126                           s.desired_min_tx,
127                           "desired min transmit interval")
128         self.assert_equal(session.required_min_rx,
129                           s.required_min_rx,
130                           "required min receive interval")
131         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
132         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
133                                   required_min_rx=session.required_min_rx * 2,
134                                   detect_mult=session.detect_mult * 2)
135         s = session.get_bfd_udp_session_dump_entry()
136         self.assert_equal(session.desired_min_tx,
137                           s.desired_min_tx,
138                           "desired min transmit interval")
139         self.assert_equal(session.required_min_rx,
140                           s.required_min_rx,
141                           "required min receive interval")
142         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
143
144     def test_add_sha1_keys(self):
145         """ add SHA1 keys """
146         key_count = 10
147         keys = [self.factory.create_random_key(
148             self) for i in range(0, key_count)]
149         for key in keys:
150             self.assertFalse(key.query_vpp_config())
151         for key in keys:
152             key.add_vpp_config()
153         for key in keys:
154             self.assertTrue(key.query_vpp_config())
155         # remove randomly
156         indexes = list(range(key_count))
157         shuffle(indexes)
158         removed = []
159         for i in indexes:
160             key = keys[i]
161             key.remove_vpp_config()
162             removed.append(i)
163             for j in range(key_count):
164                 key = keys[j]
165                 if j in removed:
166                     self.assertFalse(key.query_vpp_config())
167                 else:
168                     self.assertTrue(key.query_vpp_config())
169         # should be removed now
170         for key in keys:
171             self.assertFalse(key.query_vpp_config())
172         # add back and remove again
173         for key in keys:
174             key.add_vpp_config()
175         for key in keys:
176             self.assertTrue(key.query_vpp_config())
177         for key in keys:
178             key.remove_vpp_config()
179         for key in keys:
180             self.assertFalse(key.query_vpp_config())
181
182     def test_add_bfd_sha1(self):
183         """ create a BFD session (SHA1) """
184         key = self.factory.create_random_key(self)
185         key.add_vpp_config()
186         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
187                                    sha1_key=key)
188         session.add_vpp_config()
189         self.logger.debug("Session state is %s", session.state)
190         session.remove_vpp_config()
191         session.add_vpp_config()
192         self.logger.debug("Session state is %s", session.state)
193         session.remove_vpp_config()
194
195     def test_double_add_sha1(self):
196         """ create the same BFD session twice (negative case) (SHA1) """
197         key = self.factory.create_random_key(self)
198         key.add_vpp_config()
199         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
200                                    sha1_key=key)
201         session.add_vpp_config()
202         with self.assertRaises(Exception):
203             session.add_vpp_config()
204
205     def test_add_auth_nonexistent_key(self):
206         """ create BFD session using non-existent SHA1 (negative case) """
207         session = VppBFDUDPSession(
208             self, self.pg0, self.pg0.remote_ip4,
209             sha1_key=self.factory.create_random_key(self))
210         with self.assertRaises(Exception):
211             session.add_vpp_config()
212
213     def test_shared_sha1_key(self):
214         """ share single SHA1 key between multiple BFD sessions """
215         key = self.factory.create_random_key(self)
216         key.add_vpp_config()
217         sessions = [
218             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
219                              sha1_key=key),
220             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
221                              sha1_key=key, af=AF_INET6),
222             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
223                              sha1_key=key),
224             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
225                              sha1_key=key, af=AF_INET6)]
226         for s in sessions:
227             s.add_vpp_config()
228         removed = 0
229         for s in sessions:
230             e = key.get_bfd_auth_keys_dump_entry()
231             self.assert_equal(e.use_count, len(sessions) - removed,
232                               "Use count for shared key")
233             s.remove_vpp_config()
234             removed += 1
235         e = key.get_bfd_auth_keys_dump_entry()
236         self.assert_equal(e.use_count, len(sessions) - removed,
237                           "Use count for shared key")
238
239     def test_activate_auth(self):
240         """ activate SHA1 authentication """
241         key = self.factory.create_random_key(self)
242         key.add_vpp_config()
243         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
244         session.add_vpp_config()
245         session.activate_auth(key)
246
247     def test_deactivate_auth(self):
248         """ deactivate SHA1 authentication """
249         key = self.factory.create_random_key(self)
250         key.add_vpp_config()
251         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
252         session.add_vpp_config()
253         session.activate_auth(key)
254         session.deactivate_auth()
255
256     def test_change_key(self):
257         """ change SHA1 key """
258         key1 = self.factory.create_random_key(self)
259         key2 = self.factory.create_random_key(self)
260         while key2.conf_key_id == key1.conf_key_id:
261             key2 = self.factory.create_random_key(self)
262         key1.add_vpp_config()
263         key2.add_vpp_config()
264         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
265                                    sha1_key=key1)
266         session.add_vpp_config()
267         session.activate_auth(key2)
268
269     def test_set_del_udp_echo_source(self):
270         """ set/del udp echo source """
271         self.create_loopback_interfaces(1)
272         self.loopback0 = self.lo_interfaces[0]
273         self.loopback0.admin_up()
274         echo_source = self.vapi.bfd_udp_get_echo_source()
275         self.assertFalse(echo_source.is_set)
276         self.assertFalse(echo_source.have_usable_ip4)
277         self.assertFalse(echo_source.have_usable_ip6)
278
279         self.vapi.bfd_udp_set_echo_source(
280             sw_if_index=self.loopback0.sw_if_index)
281         echo_source = self.vapi.bfd_udp_get_echo_source()
282         self.assertTrue(echo_source.is_set)
283         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
284         self.assertFalse(echo_source.have_usable_ip4)
285         self.assertFalse(echo_source.have_usable_ip6)
286
287         self.loopback0.config_ip4()
288         echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
289             self.loopback0.local_ip4)) ^ 1).packed
290         echo_source = self.vapi.bfd_udp_get_echo_source()
291         self.assertTrue(echo_source.is_set)
292         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
293         self.assertTrue(echo_source.have_usable_ip4)
294         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
295         self.assertFalse(echo_source.have_usable_ip6)
296
297         self.loopback0.config_ip6()
298         echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
299             self.loopback0.local_ip6)) ^ 1).packed
300
301         echo_source = self.vapi.bfd_udp_get_echo_source()
302         self.assertTrue(echo_source.is_set)
303         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
304         self.assertTrue(echo_source.have_usable_ip4)
305         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
306         self.assertTrue(echo_source.have_usable_ip6)
307         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
308
309         self.vapi.bfd_udp_del_echo_source()
310         echo_source = self.vapi.bfd_udp_get_echo_source()
311         self.assertFalse(echo_source.is_set)
312         self.assertFalse(echo_source.have_usable_ip4)
313         self.assertFalse(echo_source.have_usable_ip6)
314
315
316 class BFDTestSession(object):
317     """ BFD session as seen from test framework side """
318
319     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
320                  bfd_key_id=None, our_seq_number=None,
321                  tunnel_header=None, phy_interface=None):
322         self.test = test
323         self.af = af
324         self.sha1_key = sha1_key
325         self.bfd_key_id = bfd_key_id
326         self.interface = interface
327         if phy_interface:
328             self.phy_interface = phy_interface
329         else:
330             self.phy_interface = self.interface
331         self.udp_sport = randint(49152, 65535)
332         if our_seq_number is None:
333             self.our_seq_number = randint(0, 40000000)
334         else:
335             self.our_seq_number = our_seq_number
336         self.vpp_seq_number = None
337         self.my_discriminator = 0
338         self.desired_min_tx = 300000
339         self.required_min_rx = 300000
340         self.required_min_echo_rx = None
341         self.detect_mult = detect_mult
342         self.diag = BFDDiagCode.no_diagnostic
343         self.your_discriminator = None
344         self.state = BFDState.down
345         self.auth_type = BFDAuthType.no_auth
346         self.tunnel_header = tunnel_header
347
348     def inc_seq_num(self):
349         """ increment sequence number, wrapping if needed """
350         if self.our_seq_number == 0xFFFFFFFF:
351             self.our_seq_number = 0
352         else:
353             self.our_seq_number += 1
354
355     def update(self, my_discriminator=None, your_discriminator=None,
356                desired_min_tx=None, required_min_rx=None,
357                required_min_echo_rx=None, detect_mult=None,
358                diag=None, state=None, auth_type=None):
359         """ update BFD parameters associated with session """
360         if my_discriminator is not None:
361             self.my_discriminator = my_discriminator
362         if your_discriminator is not None:
363             self.your_discriminator = your_discriminator
364         if required_min_rx is not None:
365             self.required_min_rx = required_min_rx
366         if required_min_echo_rx is not None:
367             self.required_min_echo_rx = required_min_echo_rx
368         if desired_min_tx is not None:
369             self.desired_min_tx = desired_min_tx
370         if detect_mult is not None:
371             self.detect_mult = detect_mult
372         if diag is not None:
373             self.diag = diag
374         if state is not None:
375             self.state = state
376         if auth_type is not None:
377             self.auth_type = auth_type
378
379     def fill_packet_fields(self, packet):
380         """ set packet fields with known values in packet """
381         bfd = packet[BFD]
382         if self.my_discriminator:
383             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
384                                    self.my_discriminator)
385             bfd.my_discriminator = self.my_discriminator
386         if self.your_discriminator:
387             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
388                                    self.your_discriminator)
389             bfd.your_discriminator = self.your_discriminator
390         if self.required_min_rx:
391             self.test.logger.debug(
392                 "BFD: setting packet.required_min_rx_interval=%s",
393                 self.required_min_rx)
394             bfd.required_min_rx_interval = self.required_min_rx
395         if self.required_min_echo_rx:
396             self.test.logger.debug(
397                 "BFD: setting packet.required_min_echo_rx=%s",
398                 self.required_min_echo_rx)
399             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
400         if self.desired_min_tx:
401             self.test.logger.debug(
402                 "BFD: setting packet.desired_min_tx_interval=%s",
403                 self.desired_min_tx)
404             bfd.desired_min_tx_interval = self.desired_min_tx
405         if self.detect_mult:
406             self.test.logger.debug(
407                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
408             bfd.detect_mult = self.detect_mult
409         if self.diag:
410             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
411             bfd.diag = self.diag
412         if self.state:
413             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
414             bfd.state = self.state
415         if self.auth_type:
416             # this is used by a negative test-case
417             self.test.logger.debug("BFD: setting packet.auth_type=%s",
418                                    self.auth_type)
419             bfd.auth_type = self.auth_type
420
421     def create_packet(self):
422         """ create a BFD packet, reflecting the current state of session """
423         if self.sha1_key:
424             bfd = BFD(flags="A")
425             bfd.auth_type = self.sha1_key.auth_type
426             bfd.auth_len = BFD.sha1_auth_len
427             bfd.auth_key_id = self.bfd_key_id
428             bfd.auth_seq_num = self.our_seq_number
429             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
430         else:
431             bfd = BFD()
432         packet = Ether(src=self.phy_interface.remote_mac,
433                        dst=self.phy_interface.local_mac)
434         if self.tunnel_header:
435             packet = packet / self.tunnel_header
436         if self.af == AF_INET6:
437             packet = (packet /
438                       IPv6(src=self.interface.remote_ip6,
439                            dst=self.interface.local_ip6,
440                            hlim=255) /
441                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
442                       bfd)
443         else:
444             packet = (packet /
445                       IP(src=self.interface.remote_ip4,
446                          dst=self.interface.local_ip4,
447                          ttl=255) /
448                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
449                       bfd)
450         self.test.logger.debug("BFD: Creating packet")
451         self.fill_packet_fields(packet)
452         if self.sha1_key:
453             hash_material = scapy.compat.raw(
454                 packet[BFD])[:32] + self.sha1_key.key + \
455                 b"\0" * (20 - len(self.sha1_key.key))
456             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
457                                    hashlib.sha1(hash_material).hexdigest())
458             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
459         return packet
460
461     def send_packet(self, packet=None, interface=None):
462         """ send packet on interface, creating the packet if needed """
463         if packet is None:
464             packet = self.create_packet()
465         if interface is None:
466             interface = self.phy_interface
467         self.test.logger.debug(ppp("Sending packet:", packet))
468         interface.add_stream(packet)
469         self.test.pg_start()
470
471     def verify_sha1_auth(self, packet):
472         """ Verify correctness of authentication in BFD layer. """
473         bfd = packet[BFD]
474         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
475         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
476                                BFDAuthType)
477         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
478         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
479         if self.vpp_seq_number is None:
480             self.vpp_seq_number = bfd.auth_seq_num
481             self.test.logger.debug("Received initial sequence number: %s" %
482                                    self.vpp_seq_number)
483         else:
484             recvd_seq_num = bfd.auth_seq_num
485             self.test.logger.debug("Received followup sequence number: %s" %
486                                    recvd_seq_num)
487             if self.vpp_seq_number < 0xffffffff:
488                 if self.sha1_key.auth_type == \
489                         BFDAuthType.meticulous_keyed_sha1:
490                     self.test.assert_equal(recvd_seq_num,
491                                            self.vpp_seq_number + 1,
492                                            "BFD sequence number")
493                 else:
494                     self.test.assert_in_range(recvd_seq_num,
495                                               self.vpp_seq_number,
496                                               self.vpp_seq_number + 1,
497                                               "BFD sequence number")
498             else:
499                 if self.sha1_key.auth_type == \
500                         BFDAuthType.meticulous_keyed_sha1:
501                     self.test.assert_equal(recvd_seq_num, 0,
502                                            "BFD sequence number")
503                 else:
504                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
505                                        "BFD sequence number not one of "
506                                        "(%s, 0)" % self.vpp_seq_number)
507             self.vpp_seq_number = recvd_seq_num
508         # last 20 bytes represent the hash - so replace them with the key,
509         # pad the result with zeros and hash the result
510         hash_material = bfd.original[:-20] + self.sha1_key.key + \
511             b"\0" * (20 - len(self.sha1_key.key))
512         expected_hash = hashlib.sha1(hash_material).hexdigest()
513         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
514                                expected_hash.encode(), "Auth key hash")
515
516     def verify_bfd(self, packet):
517         """ Verify correctness of BFD layer. """
518         bfd = packet[BFD]
519         self.test.assert_equal(bfd.version, 1, "BFD version")
520         self.test.assert_equal(bfd.your_discriminator,
521                                self.my_discriminator,
522                                "BFD - your discriminator")
523         if self.sha1_key:
524             self.verify_sha1_auth(packet)
525
526
527 def bfd_session_up(test):
528     """ Bring BFD session up """
529     test.logger.info("BFD: Waiting for slow hello")
530     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
531     old_offset = None
532     if hasattr(test, 'vpp_clock_offset'):
533         old_offset = test.vpp_clock_offset
534     test.vpp_clock_offset = time.time() - float(p.time)
535     test.logger.debug("BFD: Calculated vpp clock offset: %s",
536                       test.vpp_clock_offset)
537     if old_offset:
538         test.assertAlmostEqual(
539             old_offset, test.vpp_clock_offset, delta=0.5,
540             msg="vpp clock offset not stable (new: %s, old: %s)" %
541             (test.vpp_clock_offset, old_offset))
542     test.logger.info("BFD: Sending Init")
543     test.test_session.update(my_discriminator=randint(0, 40000000),
544                              your_discriminator=p[BFD].my_discriminator,
545                              state=BFDState.init)
546     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
547             BFDAuthType.meticulous_keyed_sha1:
548         test.test_session.inc_seq_num()
549     test.test_session.send_packet()
550     test.logger.info("BFD: Waiting for event")
551     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
552     verify_event(test, e, expected_state=BFDState.up)
553     test.logger.info("BFD: Session is Up")
554     test.test_session.update(state=BFDState.up)
555     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
556             BFDAuthType.meticulous_keyed_sha1:
557         test.test_session.inc_seq_num()
558     test.test_session.send_packet()
559     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
560
561
562 def bfd_session_down(test):
563     """ Bring BFD session down """
564     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
565     test.test_session.update(state=BFDState.down)
566     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
567             BFDAuthType.meticulous_keyed_sha1:
568         test.test_session.inc_seq_num()
569     test.test_session.send_packet()
570     test.logger.info("BFD: Waiting for event")
571     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
572     verify_event(test, e, expected_state=BFDState.down)
573     test.logger.info("BFD: Session is Down")
574     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
575
576
577 def verify_bfd_session_config(test, session, state=None):
578     dump = session.get_bfd_udp_session_dump_entry()
579     test.assertIsNotNone(dump)
580     # since dump is not none, we have verified that sw_if_index and addresses
581     # are valid (in get_bfd_udp_session_dump_entry)
582     if state:
583         test.assert_equal(dump.state, state, "session state")
584     test.assert_equal(dump.required_min_rx, session.required_min_rx,
585                       "required min rx interval")
586     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
587                       "desired min tx interval")
588     test.assert_equal(dump.detect_mult, session.detect_mult,
589                       "detect multiplier")
590     if session.sha1_key is None:
591         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
592     else:
593         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
594         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
595                           "bfd key id")
596         test.assert_equal(dump.conf_key_id,
597                           session.sha1_key.conf_key_id,
598                           "config key id")
599
600
601 def verify_ip(test, packet):
602     """ Verify correctness of IP layer. """
603     if test.vpp_session.af == AF_INET6:
604         ip = packet[IPv6]
605         local_ip = test.vpp_session.interface.local_ip6
606         remote_ip = test.vpp_session.interface.remote_ip6
607         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
608     else:
609         ip = packet[IP]
610         local_ip = test.vpp_session.interface.local_ip4
611         remote_ip = test.vpp_session.interface.remote_ip4
612         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
613     test.assert_equal(ip.src, local_ip, "IP source address")
614     test.assert_equal(ip.dst, remote_ip, "IP destination address")
615
616
617 def verify_udp(test, packet):
618     """ Verify correctness of UDP layer. """
619     udp = packet[UDP]
620     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
621     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
622                          "UDP source port")
623
624
625 def verify_event(test, event, expected_state):
626     """ Verify correctness of event values. """
627     e = event
628     test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
629     test.assert_equal(e.sw_if_index,
630                       test.vpp_session.interface.sw_if_index,
631                       "BFD interface index")
632
633     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
634                       "Local IPv6 address")
635     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
636                       "Peer IPv6 address")
637     test.assert_equal(e.state, expected_state, BFDState)
638
639
640 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
641     """ wait for BFD packet and verify its correctness
642
643     :param timeout: how long to wait
644     :param pcap_time_min: ignore packets with pcap timestamp lower than this
645
646     :returns: tuple (packet, time spent waiting for packet)
647     """
648     test.logger.info("BFD: Waiting for BFD packet")
649     deadline = time.time() + timeout
650     counter = 0
651     while True:
652         counter += 1
653         # sanity check
654         test.assert_in_range(counter, 0, 100, "number of packets ignored")
655         time_left = deadline - time.time()
656         if time_left < 0:
657             raise CaptureTimeoutError("Packet did not arrive within timeout")
658         p = test.pg0.wait_for_packet(timeout=time_left)
659         test.logger.debug(ppp("BFD: Got packet:", p))
660         if pcap_time_min is not None and p.time < pcap_time_min:
661             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
662                                   "pcap time min %s):" %
663                                   (p.time, pcap_time_min), p))
664         else:
665             break
666     if is_tunnel:
667         # strip an IP layer and move to the next
668         p = p[IP].payload
669
670     bfd = p[BFD]
671     if bfd is None:
672         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
673     if bfd.payload:
674         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
675     verify_ip(test, p)
676     verify_udp(test, p)
677     test.test_session.verify_bfd(p)
678     return p
679
680
681 @tag_run_solo
682 class BFD4TestCase(VppTestCase):
683     """Bidirectional Forwarding Detection (BFD)"""
684
685     pg0 = None
686     vpp_clock_offset = None
687     vpp_session = None
688     test_session = None
689
690     @classmethod
691     def setUpClass(cls):
692         super(BFD4TestCase, cls).setUpClass()
693         cls.vapi.cli("set log class bfd level debug")
694         try:
695             cls.create_pg_interfaces([0])
696             cls.create_loopback_interfaces(1)
697             cls.loopback0 = cls.lo_interfaces[0]
698             cls.loopback0.config_ip4()
699             cls.loopback0.admin_up()
700             cls.pg0.config_ip4()
701             cls.pg0.configure_ipv4_neighbors()
702             cls.pg0.admin_up()
703             cls.pg0.resolve_arp()
704
705         except Exception:
706             super(BFD4TestCase, cls).tearDownClass()
707             raise
708
709     @classmethod
710     def tearDownClass(cls):
711         super(BFD4TestCase, cls).tearDownClass()
712
713     def setUp(self):
714         super(BFD4TestCase, self).setUp()
715         self.factory = AuthKeyFactory()
716         self.vapi.want_bfd_events()
717         self.pg0.enable_capture()
718         try:
719             self.vpp_session = VppBFDUDPSession(self, self.pg0,
720                                                 self.pg0.remote_ip4)
721             self.vpp_session.add_vpp_config()
722             self.vpp_session.admin_up()
723             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
724         except BaseException:
725             self.vapi.want_bfd_events(enable_disable=0)
726             raise
727
728     def tearDown(self):
729         if not self.vpp_dead:
730             self.vapi.want_bfd_events(enable_disable=0)
731         self.vapi.collect_events()  # clear the event queue
732         super(BFD4TestCase, self).tearDown()
733
734     def test_session_up(self):
735         """ bring BFD session up """
736         bfd_session_up(self)
737
738     def test_session_up_by_ip(self):
739         """ bring BFD session up - first frame looked up by address pair """
740         self.logger.info("BFD: Sending Slow control frame")
741         self.test_session.update(my_discriminator=randint(0, 40000000))
742         self.test_session.send_packet()
743         self.pg0.enable_capture()
744         p = self.pg0.wait_for_packet(1)
745         self.assert_equal(p[BFD].your_discriminator,
746                           self.test_session.my_discriminator,
747                           "BFD - your discriminator")
748         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
749         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
750                                  state=BFDState.up)
751         self.logger.info("BFD: Waiting for event")
752         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
753         verify_event(self, e, expected_state=BFDState.init)
754         self.logger.info("BFD: Sending Up")
755         self.test_session.send_packet()
756         self.logger.info("BFD: Waiting for event")
757         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758         verify_event(self, e, expected_state=BFDState.up)
759         self.logger.info("BFD: Session is Up")
760         self.test_session.update(state=BFDState.up)
761         self.test_session.send_packet()
762         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
763
764     def test_session_down(self):
765         """ bring BFD session down """
766         bfd_session_up(self)
767         bfd_session_down(self)
768
769     def test_hold_up(self):
770         """ hold BFD session up """
771         bfd_session_up(self)
772         for dummy in range(self.test_session.detect_mult * 2):
773             wait_for_bfd_packet(self)
774             self.test_session.send_packet()
775         self.assert_equal(len(self.vapi.collect_events()), 0,
776                           "number of bfd events")
777
778     def test_slow_timer(self):
779         """ verify slow periodic control frames while session down """
780         packet_count = 3
781         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
782         prev_packet = wait_for_bfd_packet(self, 2)
783         for dummy in range(packet_count):
784             next_packet = wait_for_bfd_packet(self, 2)
785             time_diff = next_packet.time - prev_packet.time
786             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
787             # to work around timing issues
788             self.assert_in_range(
789                 time_diff, 0.70, 1.05, "time between slow packets")
790             prev_packet = next_packet
791
792     def test_zero_remote_min_rx(self):
793         """ no packets when zero remote required min rx interval """
794         bfd_session_up(self)
795         self.test_session.update(required_min_rx=0)
796         self.test_session.send_packet()
797         for dummy in range(self.test_session.detect_mult):
798             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
799                        "sleep before transmitting bfd packet")
800             self.test_session.send_packet()
801             try:
802                 p = wait_for_bfd_packet(self, timeout=0)
803                 self.logger.error(ppp("Received unexpected packet:", p))
804             except CaptureTimeoutError:
805                 pass
806         self.assert_equal(
807             len(self.vapi.collect_events()), 0, "number of bfd events")
808         self.test_session.update(required_min_rx=300000)
809         for dummy in range(3):
810             self.test_session.send_packet()
811             wait_for_bfd_packet(
812                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
813         self.assert_equal(
814             len(self.vapi.collect_events()), 0, "number of bfd events")
815
816     def test_conn_down(self):
817         """ verify session goes down after inactivity """
818         bfd_session_up(self)
819         detection_time = self.test_session.detect_mult *\
820             self.vpp_session.required_min_rx / USEC_IN_SEC
821         self.sleep(detection_time, "waiting for BFD session time-out")
822         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
823         verify_event(self, e, expected_state=BFDState.down)
824
825     def test_peer_discr_reset_sess_down(self):
826         """ peer discriminator reset after session goes down """
827         bfd_session_up(self)
828         detection_time = self.test_session.detect_mult *\
829             self.vpp_session.required_min_rx / USEC_IN_SEC
830         self.sleep(detection_time, "waiting for BFD session time-out")
831         self.test_session.my_discriminator = 0
832         wait_for_bfd_packet(self,
833                             pcap_time_min=time.time() - self.vpp_clock_offset)
834
835     def test_large_required_min_rx(self):
836         """ large remote required min rx interval """
837         bfd_session_up(self)
838         p = wait_for_bfd_packet(self)
839         interval = 3000000
840         self.test_session.update(required_min_rx=interval)
841         self.test_session.send_packet()
842         time_mark = time.time()
843         count = 0
844         # busy wait here, trying to collect a packet or event, vpp is not
845         # allowed to send packets and the session will timeout first - so the
846         # Up->Down event must arrive before any packets do
847         while time.time() < time_mark + interval / USEC_IN_SEC:
848             try:
849                 p = wait_for_bfd_packet(self, timeout=0)
850                 # if vpp managed to send a packet before we did the session
851                 # session update, then that's fine, ignore it
852                 if p.time < time_mark - self.vpp_clock_offset:
853                     continue
854                 self.logger.error(ppp("Received unexpected packet:", p))
855                 count += 1
856             except CaptureTimeoutError:
857                 pass
858             events = self.vapi.collect_events()
859             if len(events) > 0:
860                 verify_event(self, events[0], BFDState.down)
861                 break
862         self.assert_equal(count, 0, "number of packets received")
863
864     def test_immediate_remote_min_rx_reduction(self):
865         """ immediately honor remote required min rx reduction """
866         self.vpp_session.remove_vpp_config()
867         self.vpp_session = VppBFDUDPSession(
868             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
869         self.pg0.enable_capture()
870         self.vpp_session.add_vpp_config()
871         self.test_session.update(desired_min_tx=1000000,
872                                  required_min_rx=1000000)
873         bfd_session_up(self)
874         reference_packet = wait_for_bfd_packet(self)
875         time_mark = time.time()
876         interval = 300000
877         self.test_session.update(required_min_rx=interval)
878         self.test_session.send_packet()
879         extra_time = time.time() - time_mark
880         p = wait_for_bfd_packet(self)
881         # first packet is allowed to be late by time we spent doing the update
882         # calculated in extra_time
883         self.assert_in_range(p.time - reference_packet.time,
884                              .95 * 0.75 * interval / USEC_IN_SEC,
885                              1.05 * interval / USEC_IN_SEC + extra_time,
886                              "time between BFD packets")
887         reference_packet = p
888         for dummy in range(3):
889             p = wait_for_bfd_packet(self)
890             diff = p.time - reference_packet.time
891             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
892                                  1.05 * interval / USEC_IN_SEC,
893                                  "time between BFD packets")
894             reference_packet = p
895
896     def test_modify_req_min_rx_double(self):
897         """ modify session - double required min rx """
898         bfd_session_up(self)
899         p = wait_for_bfd_packet(self)
900         self.test_session.update(desired_min_tx=10000,
901                                  required_min_rx=10000)
902         self.test_session.send_packet()
903         # double required min rx
904         self.vpp_session.modify_parameters(
905             required_min_rx=2 * self.vpp_session.required_min_rx)
906         p = wait_for_bfd_packet(
907             self, pcap_time_min=time.time() - self.vpp_clock_offset)
908         # poll bit needs to be set
909         self.assertIn("P", p.sprintf("%BFD.flags%"),
910                       "Poll bit not set in BFD packet")
911         # finish poll sequence with final packet
912         final = self.test_session.create_packet()
913         final[BFD].flags = "F"
914         timeout = self.test_session.detect_mult * \
915             max(self.test_session.desired_min_tx,
916                 self.vpp_session.required_min_rx) / USEC_IN_SEC
917         self.test_session.send_packet(final)
918         time_mark = time.time()
919         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
920         verify_event(self, e, expected_state=BFDState.down)
921         time_to_event = time.time() - time_mark
922         self.assert_in_range(time_to_event, .9 * timeout,
923                              1.1 * timeout, "session timeout")
924
925     def test_modify_req_min_rx_halve(self):
926         """ modify session - halve required min rx """
927         self.vpp_session.modify_parameters(
928             required_min_rx=2 * self.vpp_session.required_min_rx)
929         bfd_session_up(self)
930         p = wait_for_bfd_packet(self)
931         self.test_session.update(desired_min_tx=10000,
932                                  required_min_rx=10000)
933         self.test_session.send_packet()
934         p = wait_for_bfd_packet(
935             self, pcap_time_min=time.time() - self.vpp_clock_offset)
936         # halve required min rx
937         old_required_min_rx = self.vpp_session.required_min_rx
938         self.vpp_session.modify_parameters(
939             required_min_rx=self.vpp_session.required_min_rx // 2)
940         # now we wait 0.8*3*old-req-min-rx and the session should still be up
941         self.sleep(0.8 * self.vpp_session.detect_mult *
942                    old_required_min_rx / USEC_IN_SEC,
943                    "wait before finishing poll sequence")
944         self.assert_equal(len(self.vapi.collect_events()), 0,
945                           "number of bfd events")
946         p = wait_for_bfd_packet(self)
947         # poll bit needs to be set
948         self.assertIn("P", p.sprintf("%BFD.flags%"),
949                       "Poll bit not set in BFD packet")
950         # finish poll sequence with final packet
951         final = self.test_session.create_packet()
952         final[BFD].flags = "F"
953         self.test_session.send_packet(final)
954         # now the session should time out under new conditions
955         detection_time = self.test_session.detect_mult *\
956             self.vpp_session.required_min_rx / USEC_IN_SEC
957         before = time.time()
958         e = self.vapi.wait_for_event(
959             2 * detection_time, "bfd_udp_session_details")
960         after = time.time()
961         self.assert_in_range(after - before,
962                              0.9 * detection_time,
963                              1.1 * detection_time,
964                              "time before bfd session goes down")
965         verify_event(self, e, expected_state=BFDState.down)
966
967     def test_modify_detect_mult(self):
968         """ modify detect multiplier """
969         bfd_session_up(self)
970         p = wait_for_bfd_packet(self)
971         self.vpp_session.modify_parameters(detect_mult=1)
972         p = wait_for_bfd_packet(
973             self, pcap_time_min=time.time() - self.vpp_clock_offset)
974         self.assert_equal(self.vpp_session.detect_mult,
975                           p[BFD].detect_mult,
976                           "detect mult")
977         # poll bit must not be set
978         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
979                          "Poll bit not set in BFD packet")
980         self.vpp_session.modify_parameters(detect_mult=10)
981         p = wait_for_bfd_packet(
982             self, pcap_time_min=time.time() - self.vpp_clock_offset)
983         self.assert_equal(self.vpp_session.detect_mult,
984                           p[BFD].detect_mult,
985                           "detect mult")
986         # poll bit must not be set
987         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
988                          "Poll bit not set in BFD packet")
989
990     def test_queued_poll(self):
991         """ test poll sequence queueing """
992         bfd_session_up(self)
993         p = wait_for_bfd_packet(self)
994         self.vpp_session.modify_parameters(
995             required_min_rx=2 * self.vpp_session.required_min_rx)
996         p = wait_for_bfd_packet(self)
997         poll_sequence_start = time.time()
998         poll_sequence_length_min = 0.5
999         send_final_after = time.time() + poll_sequence_length_min
1000         # poll bit needs to be set
1001         self.assertIn("P", p.sprintf("%BFD.flags%"),
1002                       "Poll bit not set in BFD packet")
1003         self.assert_equal(p[BFD].required_min_rx_interval,
1004                           self.vpp_session.required_min_rx,
1005                           "BFD required min rx interval")
1006         self.vpp_session.modify_parameters(
1007             required_min_rx=2 * self.vpp_session.required_min_rx)
1008         # 2nd poll sequence should be queued now
1009         # don't send the reply back yet, wait for some time to emulate
1010         # longer round-trip time
1011         packet_count = 0
1012         while time.time() < send_final_after:
1013             self.test_session.send_packet()
1014             p = wait_for_bfd_packet(self)
1015             self.assert_equal(len(self.vapi.collect_events()), 0,
1016                               "number of bfd events")
1017             self.assert_equal(p[BFD].required_min_rx_interval,
1018                               self.vpp_session.required_min_rx,
1019                               "BFD required min rx interval")
1020             packet_count += 1
1021             # poll bit must be set
1022             self.assertIn("P", p.sprintf("%BFD.flags%"),
1023                           "Poll bit not set in BFD packet")
1024         final = self.test_session.create_packet()
1025         final[BFD].flags = "F"
1026         self.test_session.send_packet(final)
1027         # finish 1st with final
1028         poll_sequence_length = time.time() - poll_sequence_start
1029         # vpp must wait for some time before starting new poll sequence
1030         poll_no_2_started = False
1031         for dummy in range(2 * packet_count):
1032             p = wait_for_bfd_packet(self)
1033             self.assert_equal(len(self.vapi.collect_events()), 0,
1034                               "number of bfd events")
1035             if "P" in p.sprintf("%BFD.flags%"):
1036                 poll_no_2_started = True
1037                 if time.time() < poll_sequence_start + poll_sequence_length:
1038                     raise Exception("VPP started 2nd poll sequence too soon")
1039                 final = self.test_session.create_packet()
1040                 final[BFD].flags = "F"
1041                 self.test_session.send_packet(final)
1042                 break
1043             else:
1044                 self.test_session.send_packet()
1045         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1046         # finish 2nd with final
1047         final = self.test_session.create_packet()
1048         final[BFD].flags = "F"
1049         self.test_session.send_packet(final)
1050         p = wait_for_bfd_packet(self)
1051         # poll bit must not be set
1052         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1053                          "Poll bit set in BFD packet")
1054
1055     # returning inconsistent results requiring retries in per-patch tests
1056     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1057     def test_poll_response(self):
1058         """ test correct response to control frame with poll bit set """
1059         bfd_session_up(self)
1060         poll = self.test_session.create_packet()
1061         poll[BFD].flags = "P"
1062         self.test_session.send_packet(poll)
1063         final = wait_for_bfd_packet(
1064             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1065         self.assertIn("F", final.sprintf("%BFD.flags%"))
1066
1067     def test_no_periodic_if_remote_demand(self):
1068         """ no periodic frames outside poll sequence if remote demand set """
1069         bfd_session_up(self)
1070         demand = self.test_session.create_packet()
1071         demand[BFD].flags = "D"
1072         self.test_session.send_packet(demand)
1073         transmit_time = 0.9 \
1074             * max(self.vpp_session.required_min_rx,
1075                   self.test_session.desired_min_tx) \
1076             / USEC_IN_SEC
1077         count = 0
1078         for dummy in range(self.test_session.detect_mult * 2):
1079             self.sleep(transmit_time)
1080             self.test_session.send_packet(demand)
1081             try:
1082                 p = wait_for_bfd_packet(self, timeout=0)
1083                 self.logger.error(ppp("Received unexpected packet:", p))
1084                 count += 1
1085             except CaptureTimeoutError:
1086                 pass
1087         events = self.vapi.collect_events()
1088         for e in events:
1089             self.logger.error("Received unexpected event: %s", e)
1090         self.assert_equal(count, 0, "number of packets received")
1091         self.assert_equal(len(events), 0, "number of events received")
1092
1093     def test_echo_looped_back(self):
1094         """ echo packets looped back """
1095         # don't need a session in this case..
1096         self.vpp_session.remove_vpp_config()
1097         self.pg0.enable_capture()
1098         echo_packet_count = 10
1099         # random source port low enough to increment a few times..
1100         udp_sport_tx = randint(1, 50000)
1101         udp_sport_rx = udp_sport_tx
1102         echo_packet = (Ether(src=self.pg0.remote_mac,
1103                              dst=self.pg0.local_mac) /
1104                        IP(src=self.pg0.remote_ip4,
1105                           dst=self.pg0.remote_ip4) /
1106                        UDP(dport=BFD.udp_dport_echo) /
1107                        Raw("this should be looped back"))
1108         for dummy in range(echo_packet_count):
1109             self.sleep(.01, "delay between echo packets")
1110             echo_packet[UDP].sport = udp_sport_tx
1111             udp_sport_tx += 1
1112             self.logger.debug(ppp("Sending packet:", echo_packet))
1113             self.pg0.add_stream(echo_packet)
1114             self.pg_start()
1115         for dummy in range(echo_packet_count):
1116             p = self.pg0.wait_for_packet(1)
1117             self.logger.debug(ppp("Got packet:", p))
1118             ether = p[Ether]
1119             self.assert_equal(self.pg0.remote_mac,
1120                               ether.dst, "Destination MAC")
1121             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1122             ip = p[IP]
1123             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1124             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1125             udp = p[UDP]
1126             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1127                               "UDP destination port")
1128             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1129             udp_sport_rx += 1
1130             # need to compare the hex payload here, otherwise BFD_vpp_echo
1131             # gets in way
1132             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1133                              scapy.compat.raw(echo_packet[UDP].payload),
1134                              "Received packet is not the echo packet sent")
1135         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1136                           "ECHO packet identifier for test purposes)")
1137
1138     def test_echo(self):
1139         """ echo function """
1140         bfd_session_up(self)
1141         self.test_session.update(required_min_echo_rx=150000)
1142         self.test_session.send_packet()
1143         detection_time = self.test_session.detect_mult *\
1144             self.vpp_session.required_min_rx / USEC_IN_SEC
1145         # echo shouldn't work without echo source set
1146         for dummy in range(10):
1147             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1148             self.sleep(sleep, "delay before sending bfd packet")
1149             self.test_session.send_packet()
1150         p = wait_for_bfd_packet(
1151             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1152         self.assert_equal(p[BFD].required_min_rx_interval,
1153                           self.vpp_session.required_min_rx,
1154                           "BFD required min rx interval")
1155         self.test_session.send_packet()
1156         self.vapi.bfd_udp_set_echo_source(
1157             sw_if_index=self.loopback0.sw_if_index)
1158         echo_seen = False
1159         # should be turned on - loopback echo packets
1160         for dummy in range(3):
1161             loop_until = time.time() + 0.75 * detection_time
1162             while time.time() < loop_until:
1163                 p = self.pg0.wait_for_packet(1)
1164                 self.logger.debug(ppp("Got packet:", p))
1165                 if p[UDP].dport == BFD.udp_dport_echo:
1166                     self.assert_equal(
1167                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1168                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1169                                         "BFD ECHO src IP equal to loopback IP")
1170                     self.logger.debug(ppp("Looping back packet:", p))
1171                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1172                                       "ECHO packet destination MAC address")
1173                     p[Ether].dst = self.pg0.local_mac
1174                     self.pg0.add_stream(p)
1175                     self.pg_start()
1176                     echo_seen = True
1177                 elif p.haslayer(BFD):
1178                     if echo_seen:
1179                         self.assertGreaterEqual(
1180                             p[BFD].required_min_rx_interval,
1181                             1000000)
1182                     if "P" in p.sprintf("%BFD.flags%"):
1183                         final = self.test_session.create_packet()
1184                         final[BFD].flags = "F"
1185                         self.test_session.send_packet(final)
1186                 else:
1187                     raise Exception(ppp("Received unknown packet:", p))
1188
1189                 self.assert_equal(len(self.vapi.collect_events()), 0,
1190                                   "number of bfd events")
1191             self.test_session.send_packet()
1192         self.assertTrue(echo_seen, "No echo packets received")
1193
1194     def test_echo_fail(self):
1195         """ session goes down if echo function fails """
1196         bfd_session_up(self)
1197         self.test_session.update(required_min_echo_rx=150000)
1198         self.test_session.send_packet()
1199         detection_time = self.test_session.detect_mult *\
1200             self.vpp_session.required_min_rx / USEC_IN_SEC
1201         self.vapi.bfd_udp_set_echo_source(
1202             sw_if_index=self.loopback0.sw_if_index)
1203         # echo function should be used now, but we will drop the echo packets
1204         verified_diag = False
1205         for dummy in range(3):
1206             loop_until = time.time() + 0.75 * detection_time
1207             while time.time() < loop_until:
1208                 p = self.pg0.wait_for_packet(1)
1209                 self.logger.debug(ppp("Got packet:", p))
1210                 if p[UDP].dport == BFD.udp_dport_echo:
1211                     # dropped
1212                     pass
1213                 elif p.haslayer(BFD):
1214                     if "P" in p.sprintf("%BFD.flags%"):
1215                         self.assertGreaterEqual(
1216                             p[BFD].required_min_rx_interval,
1217                             1000000)
1218                         final = self.test_session.create_packet()
1219                         final[BFD].flags = "F"
1220                         self.test_session.send_packet(final)
1221                     if p[BFD].state == BFDState.down:
1222                         self.assert_equal(p[BFD].diag,
1223                                           BFDDiagCode.echo_function_failed,
1224                                           BFDDiagCode)
1225                         verified_diag = True
1226                 else:
1227                     raise Exception(ppp("Received unknown packet:", p))
1228             self.test_session.send_packet()
1229         events = self.vapi.collect_events()
1230         self.assert_equal(len(events), 1, "number of bfd events")
1231         self.assert_equal(events[0].state, BFDState.down, BFDState)
1232         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1233
1234     def test_echo_stop(self):
1235         """ echo function stops if peer sets required min echo rx zero """
1236         bfd_session_up(self)
1237         self.test_session.update(required_min_echo_rx=150000)
1238         self.test_session.send_packet()
1239         self.vapi.bfd_udp_set_echo_source(
1240             sw_if_index=self.loopback0.sw_if_index)
1241         # wait for first echo packet
1242         while True:
1243             p = self.pg0.wait_for_packet(1)
1244             self.logger.debug(ppp("Got packet:", p))
1245             if p[UDP].dport == BFD.udp_dport_echo:
1246                 self.logger.debug(ppp("Looping back packet:", p))
1247                 p[Ether].dst = self.pg0.local_mac
1248                 self.pg0.add_stream(p)
1249                 self.pg_start()
1250                 break
1251             elif p.haslayer(BFD):
1252                 # ignore BFD
1253                 pass
1254             else:
1255                 raise Exception(ppp("Received unknown packet:", p))
1256         self.test_session.update(required_min_echo_rx=0)
1257         self.test_session.send_packet()
1258         # echo packets shouldn't arrive anymore
1259         for dummy in range(5):
1260             wait_for_bfd_packet(
1261                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1262             self.test_session.send_packet()
1263             events = self.vapi.collect_events()
1264             self.assert_equal(len(events), 0, "number of bfd events")
1265
1266     def test_echo_source_removed(self):
1267         """ echo function stops if echo source is removed """
1268         bfd_session_up(self)
1269         self.test_session.update(required_min_echo_rx=150000)
1270         self.test_session.send_packet()
1271         self.vapi.bfd_udp_set_echo_source(
1272             sw_if_index=self.loopback0.sw_if_index)
1273         # wait for first echo packet
1274         while True:
1275             p = self.pg0.wait_for_packet(1)
1276             self.logger.debug(ppp("Got packet:", p))
1277             if p[UDP].dport == BFD.udp_dport_echo:
1278                 self.logger.debug(ppp("Looping back packet:", p))
1279                 p[Ether].dst = self.pg0.local_mac
1280                 self.pg0.add_stream(p)
1281                 self.pg_start()
1282                 break
1283             elif p.haslayer(BFD):
1284                 # ignore BFD
1285                 pass
1286             else:
1287                 raise Exception(ppp("Received unknown packet:", p))
1288         self.vapi.bfd_udp_del_echo_source()
1289         self.test_session.send_packet()
1290         # echo packets shouldn't arrive anymore
1291         for dummy in range(5):
1292             wait_for_bfd_packet(
1293                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1294             self.test_session.send_packet()
1295             events = self.vapi.collect_events()
1296             self.assert_equal(len(events), 0, "number of bfd events")
1297
1298     def test_stale_echo(self):
1299         """ stale echo packets don't keep a session up """
1300         bfd_session_up(self)
1301         self.test_session.update(required_min_echo_rx=150000)
1302         self.vapi.bfd_udp_set_echo_source(
1303             sw_if_index=self.loopback0.sw_if_index)
1304         self.test_session.send_packet()
1305         # should be turned on - loopback echo packets
1306         echo_packet = None
1307         timeout_at = None
1308         timeout_ok = False
1309         for dummy in range(10 * self.vpp_session.detect_mult):
1310             p = self.pg0.wait_for_packet(1)
1311             if p[UDP].dport == BFD.udp_dport_echo:
1312                 if echo_packet is None:
1313                     self.logger.debug(ppp("Got first echo packet:", p))
1314                     echo_packet = p
1315                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1316                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1317                 else:
1318                     self.logger.debug(ppp("Got followup echo packet:", p))
1319                 self.logger.debug(ppp("Looping back first echo packet:", p))
1320                 echo_packet[Ether].dst = self.pg0.local_mac
1321                 self.pg0.add_stream(echo_packet)
1322                 self.pg_start()
1323             elif p.haslayer(BFD):
1324                 self.logger.debug(ppp("Got packet:", p))
1325                 if "P" in p.sprintf("%BFD.flags%"):
1326                     final = self.test_session.create_packet()
1327                     final[BFD].flags = "F"
1328                     self.test_session.send_packet(final)
1329                 if p[BFD].state == BFDState.down:
1330                     self.assertIsNotNone(
1331                         timeout_at,
1332                         "Session went down before first echo packet received")
1333                     now = time.time()
1334                     self.assertGreaterEqual(
1335                         now, timeout_at,
1336                         "Session timeout at %s, but is expected at %s" %
1337                         (now, timeout_at))
1338                     self.assert_equal(p[BFD].diag,
1339                                       BFDDiagCode.echo_function_failed,
1340                                       BFDDiagCode)
1341                     events = self.vapi.collect_events()
1342                     self.assert_equal(len(events), 1, "number of bfd events")
1343                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1344                     timeout_ok = True
1345                     break
1346             else:
1347                 raise Exception(ppp("Received unknown packet:", p))
1348             self.test_session.send_packet()
1349         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1350
1351     def test_invalid_echo_checksum(self):
1352         """ echo packets with invalid checksum don't keep a session up """
1353         bfd_session_up(self)
1354         self.test_session.update(required_min_echo_rx=150000)
1355         self.vapi.bfd_udp_set_echo_source(
1356             sw_if_index=self.loopback0.sw_if_index)
1357         self.test_session.send_packet()
1358         # should be turned on - loopback echo packets
1359         timeout_at = None
1360         timeout_ok = False
1361         for dummy in range(10 * self.vpp_session.detect_mult):
1362             p = self.pg0.wait_for_packet(1)
1363             if p[UDP].dport == BFD.udp_dport_echo:
1364                 self.logger.debug(ppp("Got echo packet:", p))
1365                 if timeout_at is None:
1366                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1367                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1368                 p[BFD_vpp_echo].checksum = getrandbits(64)
1369                 p[Ether].dst = self.pg0.local_mac
1370                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1371                 self.pg0.add_stream(p)
1372                 self.pg_start()
1373             elif p.haslayer(BFD):
1374                 self.logger.debug(ppp("Got packet:", p))
1375                 if "P" in p.sprintf("%BFD.flags%"):
1376                     final = self.test_session.create_packet()
1377                     final[BFD].flags = "F"
1378                     self.test_session.send_packet(final)
1379                 if p[BFD].state == BFDState.down:
1380                     self.assertIsNotNone(
1381                         timeout_at,
1382                         "Session went down before first echo packet received")
1383                     now = time.time()
1384                     self.assertGreaterEqual(
1385                         now, timeout_at,
1386                         "Session timeout at %s, but is expected at %s" %
1387                         (now, timeout_at))
1388                     self.assert_equal(p[BFD].diag,
1389                                       BFDDiagCode.echo_function_failed,
1390                                       BFDDiagCode)
1391                     events = self.vapi.collect_events()
1392                     self.assert_equal(len(events), 1, "number of bfd events")
1393                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1394                     timeout_ok = True
1395                     break
1396             else:
1397                 raise Exception(ppp("Received unknown packet:", p))
1398             self.test_session.send_packet()
1399         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1400
1401     def test_admin_up_down(self):
1402         """ put session admin-up and admin-down """
1403         bfd_session_up(self)
1404         self.vpp_session.admin_down()
1405         self.pg0.enable_capture()
1406         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1407         verify_event(self, e, expected_state=BFDState.admin_down)
1408         for dummy in range(2):
1409             p = wait_for_bfd_packet(self)
1410             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1411         # try to bring session up - shouldn't be possible
1412         self.test_session.update(state=BFDState.init)
1413         self.test_session.send_packet()
1414         for dummy in range(2):
1415             p = wait_for_bfd_packet(self)
1416             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1417         self.vpp_session.admin_up()
1418         self.test_session.update(state=BFDState.down)
1419         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1420         verify_event(self, e, expected_state=BFDState.down)
1421         p = wait_for_bfd_packet(
1422             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1423         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1424         self.test_session.send_packet()
1425         p = wait_for_bfd_packet(
1426             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1427         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1428         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1429         verify_event(self, e, expected_state=BFDState.init)
1430         self.test_session.update(state=BFDState.up)
1431         self.test_session.send_packet()
1432         p = wait_for_bfd_packet(
1433             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1434         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1435         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1436         verify_event(self, e, expected_state=BFDState.up)
1437
1438     def test_config_change_remote_demand(self):
1439         """ configuration change while peer in demand mode """
1440         bfd_session_up(self)
1441         demand = self.test_session.create_packet()
1442         demand[BFD].flags = "D"
1443         self.test_session.send_packet(demand)
1444         self.vpp_session.modify_parameters(
1445             required_min_rx=2 * self.vpp_session.required_min_rx)
1446         p = wait_for_bfd_packet(
1447             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1448         # poll bit must be set
1449         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1450         # terminate poll sequence
1451         final = self.test_session.create_packet()
1452         final[BFD].flags = "D+F"
1453         self.test_session.send_packet(final)
1454         # vpp should be quiet now again
1455         transmit_time = 0.9 \
1456             * max(self.vpp_session.required_min_rx,
1457                   self.test_session.desired_min_tx) \
1458             / USEC_IN_SEC
1459         count = 0
1460         for dummy in range(self.test_session.detect_mult * 2):
1461             self.sleep(transmit_time)
1462             self.test_session.send_packet(demand)
1463             try:
1464                 p = wait_for_bfd_packet(self, timeout=0)
1465                 self.logger.error(ppp("Received unexpected packet:", p))
1466                 count += 1
1467             except CaptureTimeoutError:
1468                 pass
1469         events = self.vapi.collect_events()
1470         for e in events:
1471             self.logger.error("Received unexpected event: %s", e)
1472         self.assert_equal(count, 0, "number of packets received")
1473         self.assert_equal(len(events), 0, "number of events received")
1474
1475     def test_intf_deleted(self):
1476         """ interface with bfd session deleted """
1477         intf = VppLoInterface(self)
1478         intf.config_ip4()
1479         intf.admin_up()
1480         sw_if_index = intf.sw_if_index
1481         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1482         vpp_session.add_vpp_config()
1483         vpp_session.admin_up()
1484         intf.remove_vpp_config()
1485         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1486         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1487         self.assertFalse(vpp_session.query_vpp_config())
1488
1489
1490 @tag_run_solo
1491 class BFD6TestCase(VppTestCase):
1492     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1493
1494     pg0 = None
1495     vpp_clock_offset = None
1496     vpp_session = None
1497     test_session = None
1498
1499     @classmethod
1500     def setUpClass(cls):
1501         super(BFD6TestCase, cls).setUpClass()
1502         cls.vapi.cli("set log class bfd level debug")
1503         try:
1504             cls.create_pg_interfaces([0])
1505             cls.pg0.config_ip6()
1506             cls.pg0.configure_ipv6_neighbors()
1507             cls.pg0.admin_up()
1508             cls.pg0.resolve_ndp()
1509             cls.create_loopback_interfaces(1)
1510             cls.loopback0 = cls.lo_interfaces[0]
1511             cls.loopback0.config_ip6()
1512             cls.loopback0.admin_up()
1513
1514         except Exception:
1515             super(BFD6TestCase, cls).tearDownClass()
1516             raise
1517
1518     @classmethod
1519     def tearDownClass(cls):
1520         super(BFD6TestCase, cls).tearDownClass()
1521
1522     def setUp(self):
1523         super(BFD6TestCase, self).setUp()
1524         self.factory = AuthKeyFactory()
1525         self.vapi.want_bfd_events()
1526         self.pg0.enable_capture()
1527         try:
1528             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1529                                                 self.pg0.remote_ip6,
1530                                                 af=AF_INET6)
1531             self.vpp_session.add_vpp_config()
1532             self.vpp_session.admin_up()
1533             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1534             self.logger.debug(self.vapi.cli("show adj nbr"))
1535         except BaseException:
1536             self.vapi.want_bfd_events(enable_disable=0)
1537             raise
1538
1539     def tearDown(self):
1540         if not self.vpp_dead:
1541             self.vapi.want_bfd_events(enable_disable=0)
1542         self.vapi.collect_events()  # clear the event queue
1543         super(BFD6TestCase, self).tearDown()
1544
1545     def test_session_up(self):
1546         """ bring BFD session up """
1547         bfd_session_up(self)
1548
1549     def test_session_up_by_ip(self):
1550         """ bring BFD session up - first frame looked up by address pair """
1551         self.logger.info("BFD: Sending Slow control frame")
1552         self.test_session.update(my_discriminator=randint(0, 40000000))
1553         self.test_session.send_packet()
1554         self.pg0.enable_capture()
1555         p = self.pg0.wait_for_packet(1)
1556         self.assert_equal(p[BFD].your_discriminator,
1557                           self.test_session.my_discriminator,
1558                           "BFD - your discriminator")
1559         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1560         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1561                                  state=BFDState.up)
1562         self.logger.info("BFD: Waiting for event")
1563         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1564         verify_event(self, e, expected_state=BFDState.init)
1565         self.logger.info("BFD: Sending Up")
1566         self.test_session.send_packet()
1567         self.logger.info("BFD: Waiting for event")
1568         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1569         verify_event(self, e, expected_state=BFDState.up)
1570         self.logger.info("BFD: Session is Up")
1571         self.test_session.update(state=BFDState.up)
1572         self.test_session.send_packet()
1573         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1574
1575     def test_hold_up(self):
1576         """ hold BFD session up """
1577         bfd_session_up(self)
1578         for dummy in range(self.test_session.detect_mult * 2):
1579             wait_for_bfd_packet(self)
1580             self.test_session.send_packet()
1581         self.assert_equal(len(self.vapi.collect_events()), 0,
1582                           "number of bfd events")
1583         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1584
1585     def test_echo_looped_back(self):
1586         """ echo packets looped back """
1587         # don't need a session in this case..
1588         self.vpp_session.remove_vpp_config()
1589         self.pg0.enable_capture()
1590         echo_packet_count = 10
1591         # random source port low enough to increment a few times..
1592         udp_sport_tx = randint(1, 50000)
1593         udp_sport_rx = udp_sport_tx
1594         echo_packet = (Ether(src=self.pg0.remote_mac,
1595                              dst=self.pg0.local_mac) /
1596                        IPv6(src=self.pg0.remote_ip6,
1597                             dst=self.pg0.remote_ip6) /
1598                        UDP(dport=BFD.udp_dport_echo) /
1599                        Raw("this should be looped back"))
1600         for dummy in range(echo_packet_count):
1601             self.sleep(.01, "delay between echo packets")
1602             echo_packet[UDP].sport = udp_sport_tx
1603             udp_sport_tx += 1
1604             self.logger.debug(ppp("Sending packet:", echo_packet))
1605             self.pg0.add_stream(echo_packet)
1606             self.pg_start()
1607         for dummy in range(echo_packet_count):
1608             p = self.pg0.wait_for_packet(1)
1609             self.logger.debug(ppp("Got packet:", p))
1610             ether = p[Ether]
1611             self.assert_equal(self.pg0.remote_mac,
1612                               ether.dst, "Destination MAC")
1613             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1614             ip = p[IPv6]
1615             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1616             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1617             udp = p[UDP]
1618             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1619                               "UDP destination port")
1620             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1621             udp_sport_rx += 1
1622             # need to compare the hex payload here, otherwise BFD_vpp_echo
1623             # gets in way
1624             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1625                              scapy.compat.raw(echo_packet[UDP].payload),
1626                              "Received packet is not the echo packet sent")
1627         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1628                           "ECHO packet identifier for test purposes)")
1629         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1630                           "ECHO packet identifier for test purposes)")
1631
1632     def test_echo(self):
1633         """ echo function """
1634         bfd_session_up(self)
1635         self.test_session.update(required_min_echo_rx=150000)
1636         self.test_session.send_packet()
1637         detection_time = self.test_session.detect_mult *\
1638             self.vpp_session.required_min_rx / USEC_IN_SEC
1639         # echo shouldn't work without echo source set
1640         for dummy in range(10):
1641             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1642             self.sleep(sleep, "delay before sending bfd packet")
1643             self.test_session.send_packet()
1644         p = wait_for_bfd_packet(
1645             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1646         self.assert_equal(p[BFD].required_min_rx_interval,
1647                           self.vpp_session.required_min_rx,
1648                           "BFD required min rx interval")
1649         self.test_session.send_packet()
1650         self.vapi.bfd_udp_set_echo_source(
1651             sw_if_index=self.loopback0.sw_if_index)
1652         echo_seen = False
1653         # should be turned on - loopback echo packets
1654         for dummy in range(3):
1655             loop_until = time.time() + 0.75 * detection_time
1656             while time.time() < loop_until:
1657                 p = self.pg0.wait_for_packet(1)
1658                 self.logger.debug(ppp("Got packet:", p))
1659                 if p[UDP].dport == BFD.udp_dport_echo:
1660                     self.assert_equal(
1661                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1662                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1663                                         "BFD ECHO src IP equal to loopback IP")
1664                     self.logger.debug(ppp("Looping back packet:", p))
1665                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1666                                       "ECHO packet destination MAC address")
1667                     p[Ether].dst = self.pg0.local_mac
1668                     self.pg0.add_stream(p)
1669                     self.pg_start()
1670                     echo_seen = True
1671                 elif p.haslayer(BFD):
1672                     if echo_seen:
1673                         self.assertGreaterEqual(
1674                             p[BFD].required_min_rx_interval,
1675                             1000000)
1676                     if "P" in p.sprintf("%BFD.flags%"):
1677                         final = self.test_session.create_packet()
1678                         final[BFD].flags = "F"
1679                         self.test_session.send_packet(final)
1680                 else:
1681                     raise Exception(ppp("Received unknown packet:", p))
1682
1683                 self.assert_equal(len(self.vapi.collect_events()), 0,
1684                                   "number of bfd events")
1685             self.test_session.send_packet()
1686         self.assertTrue(echo_seen, "No echo packets received")
1687
1688     def test_intf_deleted(self):
1689         """ interface with bfd session deleted """
1690         intf = VppLoInterface(self)
1691         intf.config_ip6()
1692         intf.admin_up()
1693         sw_if_index = intf.sw_if_index
1694         vpp_session = VppBFDUDPSession(
1695             self, intf, intf.remote_ip6, af=AF_INET6)
1696         vpp_session.add_vpp_config()
1697         vpp_session.admin_up()
1698         intf.remove_vpp_config()
1699         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1700         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1701         self.assertFalse(vpp_session.query_vpp_config())
1702
1703
1704 @tag_run_solo
1705 class BFDFIBTestCase(VppTestCase):
1706     """ BFD-FIB interactions (IPv6) """
1707
1708     vpp_session = None
1709     test_session = None
1710
1711     @classmethod
1712     def setUpClass(cls):
1713         super(BFDFIBTestCase, cls).setUpClass()
1714
1715     @classmethod
1716     def tearDownClass(cls):
1717         super(BFDFIBTestCase, cls).tearDownClass()
1718
1719     def setUp(self):
1720         super(BFDFIBTestCase, self).setUp()
1721         self.create_pg_interfaces(range(1))
1722
1723         self.vapi.want_bfd_events()
1724         self.pg0.enable_capture()
1725
1726         for i in self.pg_interfaces:
1727             i.admin_up()
1728             i.config_ip6()
1729             i.configure_ipv6_neighbors()
1730
1731     def tearDown(self):
1732         if not self.vpp_dead:
1733             self.vapi.want_bfd_events(enable_disable=False)
1734
1735         super(BFDFIBTestCase, self).tearDown()
1736
1737     @staticmethod
1738     def pkt_is_not_data_traffic(p):
1739         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1740         if p.haslayer(BFD) or is_ipv6_misc(p):
1741             return True
1742         return False
1743
1744     def test_session_with_fib(self):
1745         """ BFD-FIB interactions """
1746
1747         # packets to match against both of the routes
1748         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1749               IPv6(src="3001::1", dst="2001::1") /
1750               UDP(sport=1234, dport=1234) /
1751               Raw(b'\xa5' * 100)),
1752              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1753               IPv6(src="3001::1", dst="2002::1") /
1754               UDP(sport=1234, dport=1234) /
1755               Raw(b'\xa5' * 100))]
1756
1757         # A recursive and a non-recursive route via a next-hop that
1758         # will have a BFD session
1759         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1760                                   [VppRoutePath(self.pg0.remote_ip6,
1761                                                 self.pg0.sw_if_index)])
1762         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1763                                   [VppRoutePath(self.pg0.remote_ip6,
1764                                                 0xffffffff)])
1765         ip_2001_s_64.add_vpp_config()
1766         ip_2002_s_64.add_vpp_config()
1767
1768         # bring the session up now the routes are present
1769         self.vpp_session = VppBFDUDPSession(self,
1770                                             self.pg0,
1771                                             self.pg0.remote_ip6,
1772                                             af=AF_INET6)
1773         self.vpp_session.add_vpp_config()
1774         self.vpp_session.admin_up()
1775         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1776
1777         # session is up - traffic passes
1778         bfd_session_up(self)
1779
1780         self.pg0.add_stream(p)
1781         self.pg_start()
1782         for packet in p:
1783             captured = self.pg0.wait_for_packet(
1784                 1,
1785                 filter_out_fn=self.pkt_is_not_data_traffic)
1786             self.assertEqual(captured[IPv6].dst,
1787                              packet[IPv6].dst)
1788
1789         # session is up - traffic is dropped
1790         bfd_session_down(self)
1791
1792         self.pg0.add_stream(p)
1793         self.pg_start()
1794         with self.assertRaises(CaptureTimeoutError):
1795             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1796
1797         # session is up - traffic passes
1798         bfd_session_up(self)
1799
1800         self.pg0.add_stream(p)
1801         self.pg_start()
1802         for packet in p:
1803             captured = self.pg0.wait_for_packet(
1804                 1,
1805                 filter_out_fn=self.pkt_is_not_data_traffic)
1806             self.assertEqual(captured[IPv6].dst,
1807                              packet[IPv6].dst)
1808
1809
1810 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1811 class BFDTunTestCase(VppTestCase):
1812     """ BFD over GRE tunnel """
1813
1814     vpp_session = None
1815     test_session = None
1816
1817     @classmethod
1818     def setUpClass(cls):
1819         super(BFDTunTestCase, cls).setUpClass()
1820
1821     @classmethod
1822     def tearDownClass(cls):
1823         super(BFDTunTestCase, cls).tearDownClass()
1824
1825     def setUp(self):
1826         super(BFDTunTestCase, self).setUp()
1827         self.create_pg_interfaces(range(1))
1828
1829         self.vapi.want_bfd_events()
1830         self.pg0.enable_capture()
1831
1832         for i in self.pg_interfaces:
1833             i.admin_up()
1834             i.config_ip4()
1835             i.resolve_arp()
1836
1837     def tearDown(self):
1838         if not self.vpp_dead:
1839             self.vapi.want_bfd_events(enable_disable=0)
1840
1841         super(BFDTunTestCase, self).tearDown()
1842
1843     @staticmethod
1844     def pkt_is_not_data_traffic(p):
1845         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1846         if p.haslayer(BFD) or is_ipv6_misc(p):
1847             return True
1848         return False
1849
1850     def test_bfd_o_gre(self):
1851         """ BFD-o-GRE  """
1852
1853         # A GRE interface over which to run a BFD session
1854         gre_if = VppGreInterface(self,
1855                                  self.pg0.local_ip4,
1856                                  self.pg0.remote_ip4)
1857         gre_if.add_vpp_config()
1858         gre_if.admin_up()
1859         gre_if.config_ip4()
1860
1861         # bring the session up now the routes are present
1862         self.vpp_session = VppBFDUDPSession(self,
1863                                             gre_if,
1864                                             gre_if.remote_ip4,
1865                                             is_tunnel=True)
1866         self.vpp_session.add_vpp_config()
1867         self.vpp_session.admin_up()
1868
1869         self.test_session = BFDTestSession(
1870             self, gre_if, AF_INET,
1871             tunnel_header=(IP(src=self.pg0.remote_ip4,
1872                               dst=self.pg0.local_ip4) /
1873                            GRE()),
1874             phy_interface=self.pg0)
1875
1876         # packets to match against both of the routes
1877         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1878               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1879               UDP(sport=1234, dport=1234) /
1880               Raw(b'\xa5' * 100))]
1881
1882         # session is up - traffic passes
1883         bfd_session_up(self)
1884
1885         self.send_and_expect(self.pg0, p, self.pg0)
1886
1887         # bring session down
1888         bfd_session_down(self)
1889
1890
1891 @tag_run_solo
1892 class BFDSHA1TestCase(VppTestCase):
1893     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1894
1895     pg0 = None
1896     vpp_clock_offset = None
1897     vpp_session = None
1898     test_session = None
1899
1900     @classmethod
1901     def setUpClass(cls):
1902         super(BFDSHA1TestCase, cls).setUpClass()
1903         cls.vapi.cli("set log class bfd level debug")
1904         try:
1905             cls.create_pg_interfaces([0])
1906             cls.pg0.config_ip4()
1907             cls.pg0.admin_up()
1908             cls.pg0.resolve_arp()
1909
1910         except Exception:
1911             super(BFDSHA1TestCase, cls).tearDownClass()
1912             raise
1913
1914     @classmethod
1915     def tearDownClass(cls):
1916         super(BFDSHA1TestCase, cls).tearDownClass()
1917
1918     def setUp(self):
1919         super(BFDSHA1TestCase, self).setUp()
1920         self.factory = AuthKeyFactory()
1921         self.vapi.want_bfd_events()
1922         self.pg0.enable_capture()
1923
1924     def tearDown(self):
1925         if not self.vpp_dead:
1926             self.vapi.want_bfd_events(enable_disable=False)
1927         self.vapi.collect_events()  # clear the event queue
1928         super(BFDSHA1TestCase, self).tearDown()
1929
1930     def test_session_up(self):
1931         """ bring BFD session up """
1932         key = self.factory.create_random_key(self)
1933         key.add_vpp_config()
1934         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1935                                             self.pg0.remote_ip4,
1936                                             sha1_key=key)
1937         self.vpp_session.add_vpp_config()
1938         self.vpp_session.admin_up()
1939         self.test_session = BFDTestSession(
1940             self, self.pg0, AF_INET, sha1_key=key,
1941             bfd_key_id=self.vpp_session.bfd_key_id)
1942         bfd_session_up(self)
1943
1944     def test_hold_up(self):
1945         """ hold BFD session up """
1946         key = self.factory.create_random_key(self)
1947         key.add_vpp_config()
1948         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1949                                             self.pg0.remote_ip4,
1950                                             sha1_key=key)
1951         self.vpp_session.add_vpp_config()
1952         self.vpp_session.admin_up()
1953         self.test_session = BFDTestSession(
1954             self, self.pg0, AF_INET, sha1_key=key,
1955             bfd_key_id=self.vpp_session.bfd_key_id)
1956         bfd_session_up(self)
1957         for dummy in range(self.test_session.detect_mult * 2):
1958             wait_for_bfd_packet(self)
1959             self.test_session.send_packet()
1960         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1961
1962     def test_hold_up_meticulous(self):
1963         """ hold BFD session up - meticulous auth """
1964         key = self.factory.create_random_key(
1965             self, BFDAuthType.meticulous_keyed_sha1)
1966         key.add_vpp_config()
1967         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1968                                             self.pg0.remote_ip4, sha1_key=key)
1969         self.vpp_session.add_vpp_config()
1970         self.vpp_session.admin_up()
1971         # specify sequence number so that it wraps
1972         self.test_session = BFDTestSession(
1973             self, self.pg0, AF_INET, sha1_key=key,
1974             bfd_key_id=self.vpp_session.bfd_key_id,
1975             our_seq_number=0xFFFFFFFF - 4)
1976         bfd_session_up(self)
1977         for dummy in range(30):
1978             wait_for_bfd_packet(self)
1979             self.test_session.inc_seq_num()
1980             self.test_session.send_packet()
1981         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1982
1983     def test_send_bad_seq_number(self):
1984         """ session is not kept alive by msgs with bad sequence numbers"""
1985         key = self.factory.create_random_key(
1986             self, BFDAuthType.meticulous_keyed_sha1)
1987         key.add_vpp_config()
1988         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1989                                             self.pg0.remote_ip4, sha1_key=key)
1990         self.vpp_session.add_vpp_config()
1991         self.test_session = BFDTestSession(
1992             self, self.pg0, AF_INET, sha1_key=key,
1993             bfd_key_id=self.vpp_session.bfd_key_id)
1994         bfd_session_up(self)
1995         detection_time = self.test_session.detect_mult *\
1996             self.vpp_session.required_min_rx / USEC_IN_SEC
1997         send_until = time.time() + 2 * detection_time
1998         while time.time() < send_until:
1999             self.test_session.send_packet()
2000             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2001                        "time between bfd packets")
2002         e = self.vapi.collect_events()
2003         # session should be down now, because the sequence numbers weren't
2004         # updated
2005         self.assert_equal(len(e), 1, "number of bfd events")
2006         verify_event(self, e[0], expected_state=BFDState.down)
2007
2008     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2009                                        legitimate_test_session,
2010                                        rogue_test_session,
2011                                        rogue_bfd_values=None):
2012         """ execute a rogue session interaction scenario
2013
2014         1. create vpp session, add config
2015         2. bring the legitimate session up
2016         3. copy the bfd values from legitimate session to rogue session
2017         4. apply rogue_bfd_values to rogue session
2018         5. set rogue session state to down
2019         6. send message to take the session down from the rogue session
2020         7. assert that the legitimate session is unaffected
2021         """
2022
2023         self.vpp_session = vpp_bfd_udp_session
2024         self.vpp_session.add_vpp_config()
2025         self.test_session = legitimate_test_session
2026         # bring vpp session up
2027         bfd_session_up(self)
2028         # send packet from rogue session
2029         rogue_test_session.update(
2030             my_discriminator=self.test_session.my_discriminator,
2031             your_discriminator=self.test_session.your_discriminator,
2032             desired_min_tx=self.test_session.desired_min_tx,
2033             required_min_rx=self.test_session.required_min_rx,
2034             detect_mult=self.test_session.detect_mult,
2035             diag=self.test_session.diag,
2036             state=self.test_session.state,
2037             auth_type=self.test_session.auth_type)
2038         if rogue_bfd_values:
2039             rogue_test_session.update(**rogue_bfd_values)
2040         rogue_test_session.update(state=BFDState.down)
2041         rogue_test_session.send_packet()
2042         wait_for_bfd_packet(self)
2043         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2044
2045     def test_mismatch_auth(self):
2046         """ session is not brought down by unauthenticated msg """
2047         key = self.factory.create_random_key(self)
2048         key.add_vpp_config()
2049         vpp_session = VppBFDUDPSession(
2050             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2051         legitimate_test_session = BFDTestSession(
2052             self, self.pg0, AF_INET, sha1_key=key,
2053             bfd_key_id=vpp_session.bfd_key_id)
2054         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2055         self.execute_rogue_session_scenario(vpp_session,
2056                                             legitimate_test_session,
2057                                             rogue_test_session)
2058
2059     def test_mismatch_bfd_key_id(self):
2060         """ session is not brought down by msg with non-existent key-id """
2061         key = self.factory.create_random_key(self)
2062         key.add_vpp_config()
2063         vpp_session = VppBFDUDPSession(
2064             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2065         # pick a different random bfd key id
2066         x = randint(0, 255)
2067         while x == vpp_session.bfd_key_id:
2068             x = randint(0, 255)
2069         legitimate_test_session = BFDTestSession(
2070             self, self.pg0, AF_INET, sha1_key=key,
2071             bfd_key_id=vpp_session.bfd_key_id)
2072         rogue_test_session = BFDTestSession(
2073             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2074         self.execute_rogue_session_scenario(vpp_session,
2075                                             legitimate_test_session,
2076                                             rogue_test_session)
2077
2078     def test_mismatched_auth_type(self):
2079         """ session is not brought down by msg with wrong auth type """
2080         key = self.factory.create_random_key(self)
2081         key.add_vpp_config()
2082         vpp_session = VppBFDUDPSession(
2083             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2084         legitimate_test_session = BFDTestSession(
2085             self, self.pg0, AF_INET, sha1_key=key,
2086             bfd_key_id=vpp_session.bfd_key_id)
2087         rogue_test_session = BFDTestSession(
2088             self, self.pg0, AF_INET, sha1_key=key,
2089             bfd_key_id=vpp_session.bfd_key_id)
2090         self.execute_rogue_session_scenario(
2091             vpp_session, legitimate_test_session, rogue_test_session,
2092             {'auth_type': BFDAuthType.keyed_md5})
2093
2094     def test_restart(self):
2095         """ simulate remote peer restart and resynchronization """
2096         key = self.factory.create_random_key(
2097             self, BFDAuthType.meticulous_keyed_sha1)
2098         key.add_vpp_config()
2099         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2100                                             self.pg0.remote_ip4, sha1_key=key)
2101         self.vpp_session.add_vpp_config()
2102         self.test_session = BFDTestSession(
2103             self, self.pg0, AF_INET, sha1_key=key,
2104             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2105         bfd_session_up(self)
2106         # don't send any packets for 2*detection_time
2107         detection_time = self.test_session.detect_mult *\
2108             self.vpp_session.required_min_rx / USEC_IN_SEC
2109         self.sleep(2 * detection_time, "simulating peer restart")
2110         events = self.vapi.collect_events()
2111         self.assert_equal(len(events), 1, "number of bfd events")
2112         verify_event(self, events[0], expected_state=BFDState.down)
2113         self.test_session.update(state=BFDState.down)
2114         # reset sequence number
2115         self.test_session.our_seq_number = 0
2116         self.test_session.vpp_seq_number = None
2117         # now throw away any pending packets
2118         self.pg0.enable_capture()
2119         self.test_session.my_discriminator = 0
2120         bfd_session_up(self)
2121
2122
2123 @tag_run_solo
2124 class BFDAuthOnOffTestCase(VppTestCase):
2125     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2126
2127     pg0 = None
2128     vpp_session = None
2129     test_session = None
2130
2131     @classmethod
2132     def setUpClass(cls):
2133         super(BFDAuthOnOffTestCase, cls).setUpClass()
2134         cls.vapi.cli("set log class bfd level debug")
2135         try:
2136             cls.create_pg_interfaces([0])
2137             cls.pg0.config_ip4()
2138             cls.pg0.admin_up()
2139             cls.pg0.resolve_arp()
2140
2141         except Exception:
2142             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2143             raise
2144
2145     @classmethod
2146     def tearDownClass(cls):
2147         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2148
2149     def setUp(self):
2150         super(BFDAuthOnOffTestCase, self).setUp()
2151         self.factory = AuthKeyFactory()
2152         self.vapi.want_bfd_events()
2153         self.pg0.enable_capture()
2154
2155     def tearDown(self):
2156         if not self.vpp_dead:
2157             self.vapi.want_bfd_events(enable_disable=False)
2158         self.vapi.collect_events()  # clear the event queue
2159         super(BFDAuthOnOffTestCase, self).tearDown()
2160
2161     def test_auth_on_immediate(self):
2162         """ turn auth on without disturbing session state (immediate) """
2163         key = self.factory.create_random_key(self)
2164         key.add_vpp_config()
2165         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2166                                             self.pg0.remote_ip4)
2167         self.vpp_session.add_vpp_config()
2168         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2169         bfd_session_up(self)
2170         for dummy in range(self.test_session.detect_mult * 2):
2171             p = wait_for_bfd_packet(self)
2172             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2173             self.test_session.send_packet()
2174         self.vpp_session.activate_auth(key)
2175         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2176         self.test_session.sha1_key = key
2177         for dummy in range(self.test_session.detect_mult * 2):
2178             p = wait_for_bfd_packet(self)
2179             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2180             self.test_session.send_packet()
2181         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2182         self.assert_equal(len(self.vapi.collect_events()), 0,
2183                           "number of bfd events")
2184
2185     def test_auth_off_immediate(self):
2186         """ turn auth off without disturbing session state (immediate) """
2187         key = self.factory.create_random_key(self)
2188         key.add_vpp_config()
2189         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2190                                             self.pg0.remote_ip4, sha1_key=key)
2191         self.vpp_session.add_vpp_config()
2192         self.test_session = BFDTestSession(
2193             self, self.pg0, AF_INET, sha1_key=key,
2194             bfd_key_id=self.vpp_session.bfd_key_id)
2195         bfd_session_up(self)
2196         # self.vapi.want_bfd_events(enable_disable=0)
2197         for dummy in range(self.test_session.detect_mult * 2):
2198             p = wait_for_bfd_packet(self)
2199             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2200             self.test_session.inc_seq_num()
2201             self.test_session.send_packet()
2202         self.vpp_session.deactivate_auth()
2203         self.test_session.bfd_key_id = None
2204         self.test_session.sha1_key = None
2205         for dummy in range(self.test_session.detect_mult * 2):
2206             p = wait_for_bfd_packet(self)
2207             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2208             self.test_session.inc_seq_num()
2209             self.test_session.send_packet()
2210         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2211         self.assert_equal(len(self.vapi.collect_events()), 0,
2212                           "number of bfd events")
2213
2214     def test_auth_change_key_immediate(self):
2215         """ change auth key without disturbing session state (immediate) """
2216         key1 = self.factory.create_random_key(self)
2217         key1.add_vpp_config()
2218         key2 = self.factory.create_random_key(self)
2219         key2.add_vpp_config()
2220         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2221                                             self.pg0.remote_ip4, sha1_key=key1)
2222         self.vpp_session.add_vpp_config()
2223         self.test_session = BFDTestSession(
2224             self, self.pg0, AF_INET, sha1_key=key1,
2225             bfd_key_id=self.vpp_session.bfd_key_id)
2226         bfd_session_up(self)
2227         for dummy in range(self.test_session.detect_mult * 2):
2228             p = wait_for_bfd_packet(self)
2229             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2230             self.test_session.send_packet()
2231         self.vpp_session.activate_auth(key2)
2232         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2233         self.test_session.sha1_key = key2
2234         for dummy in range(self.test_session.detect_mult * 2):
2235             p = wait_for_bfd_packet(self)
2236             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2237             self.test_session.send_packet()
2238         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2239         self.assert_equal(len(self.vapi.collect_events()), 0,
2240                           "number of bfd events")
2241
2242     def test_auth_on_delayed(self):
2243         """ turn auth on without disturbing session state (delayed) """
2244         key = self.factory.create_random_key(self)
2245         key.add_vpp_config()
2246         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2247                                             self.pg0.remote_ip4)
2248         self.vpp_session.add_vpp_config()
2249         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2250         bfd_session_up(self)
2251         for dummy in range(self.test_session.detect_mult * 2):
2252             wait_for_bfd_packet(self)
2253             self.test_session.send_packet()
2254         self.vpp_session.activate_auth(key, delayed=True)
2255         for dummy in range(self.test_session.detect_mult * 2):
2256             p = wait_for_bfd_packet(self)
2257             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2258             self.test_session.send_packet()
2259         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2260         self.test_session.sha1_key = key
2261         self.test_session.send_packet()
2262         for dummy in range(self.test_session.detect_mult * 2):
2263             p = wait_for_bfd_packet(self)
2264             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2265             self.test_session.send_packet()
2266         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2267         self.assert_equal(len(self.vapi.collect_events()), 0,
2268                           "number of bfd events")
2269
2270     def test_auth_off_delayed(self):
2271         """ turn auth off without disturbing session state (delayed) """
2272         key = self.factory.create_random_key(self)
2273         key.add_vpp_config()
2274         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2275                                             self.pg0.remote_ip4, sha1_key=key)
2276         self.vpp_session.add_vpp_config()
2277         self.test_session = BFDTestSession(
2278             self, self.pg0, AF_INET, sha1_key=key,
2279             bfd_key_id=self.vpp_session.bfd_key_id)
2280         bfd_session_up(self)
2281         for dummy in range(self.test_session.detect_mult * 2):
2282             p = wait_for_bfd_packet(self)
2283             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2284             self.test_session.send_packet()
2285         self.vpp_session.deactivate_auth(delayed=True)
2286         for dummy in range(self.test_session.detect_mult * 2):
2287             p = wait_for_bfd_packet(self)
2288             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2289             self.test_session.send_packet()
2290         self.test_session.bfd_key_id = None
2291         self.test_session.sha1_key = None
2292         self.test_session.send_packet()
2293         for dummy in range(self.test_session.detect_mult * 2):
2294             p = wait_for_bfd_packet(self)
2295             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2296             self.test_session.send_packet()
2297         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2298         self.assert_equal(len(self.vapi.collect_events()), 0,
2299                           "number of bfd events")
2300
2301     def test_auth_change_key_delayed(self):
2302         """ change auth key without disturbing session state (delayed) """
2303         key1 = self.factory.create_random_key(self)
2304         key1.add_vpp_config()
2305         key2 = self.factory.create_random_key(self)
2306         key2.add_vpp_config()
2307         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2308                                             self.pg0.remote_ip4, sha1_key=key1)
2309         self.vpp_session.add_vpp_config()
2310         self.vpp_session.admin_up()
2311         self.test_session = BFDTestSession(
2312             self, self.pg0, AF_INET, sha1_key=key1,
2313             bfd_key_id=self.vpp_session.bfd_key_id)
2314         bfd_session_up(self)
2315         for dummy in range(self.test_session.detect_mult * 2):
2316             p = wait_for_bfd_packet(self)
2317             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2318             self.test_session.send_packet()
2319         self.vpp_session.activate_auth(key2, delayed=True)
2320         for dummy in range(self.test_session.detect_mult * 2):
2321             p = wait_for_bfd_packet(self)
2322             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2323             self.test_session.send_packet()
2324         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2325         self.test_session.sha1_key = key2
2326         self.test_session.send_packet()
2327         for dummy in range(self.test_session.detect_mult * 2):
2328             p = wait_for_bfd_packet(self)
2329             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2330             self.test_session.send_packet()
2331         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2332         self.assert_equal(len(self.vapi.collect_events()), 0,
2333                           "number of bfd events")
2334
2335
2336 @tag_run_solo
2337 class BFDCLITestCase(VppTestCase):
2338     """Bidirectional Forwarding Detection (BFD) (CLI) """
2339     pg0 = None
2340
2341     @classmethod
2342     def setUpClass(cls):
2343         super(BFDCLITestCase, cls).setUpClass()
2344         cls.vapi.cli("set log class bfd level debug")
2345         try:
2346             cls.create_pg_interfaces((0,))
2347             cls.pg0.config_ip4()
2348             cls.pg0.config_ip6()
2349             cls.pg0.resolve_arp()
2350             cls.pg0.resolve_ndp()
2351
2352         except Exception:
2353             super(BFDCLITestCase, cls).tearDownClass()
2354             raise
2355
2356     @classmethod
2357     def tearDownClass(cls):
2358         super(BFDCLITestCase, cls).tearDownClass()
2359
2360     def setUp(self):
2361         super(BFDCLITestCase, self).setUp()
2362         self.factory = AuthKeyFactory()
2363         self.pg0.enable_capture()
2364
2365     def tearDown(self):
2366         try:
2367             self.vapi.want_bfd_events(enable_disable=False)
2368         except UnexpectedApiReturnValueError:
2369             # some tests aren't subscribed, so this is not an issue
2370             pass
2371         self.vapi.collect_events()  # clear the event queue
2372         super(BFDCLITestCase, self).tearDown()
2373
2374     def cli_verify_no_response(self, cli):
2375         """ execute a CLI, asserting that the response is empty """
2376         self.assert_equal(self.vapi.cli(cli),
2377                           "",
2378                           "CLI command response")
2379
2380     def cli_verify_response(self, cli, expected):
2381         """ execute a CLI, asserting that the response matches expectation """
2382         try:
2383             reply = self.vapi.cli(cli)
2384         except CliFailedCommandError as cli_error:
2385             reply = str(cli_error)
2386         self.assert_equal(reply.strip(),
2387                           expected,
2388                           "CLI command response")
2389
2390     def test_show(self):
2391         """ show commands """
2392         k1 = self.factory.create_random_key(self)
2393         k1.add_vpp_config()
2394         k2 = self.factory.create_random_key(
2395             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2396         k2.add_vpp_config()
2397         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2398         s1.add_vpp_config()
2399         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2400                               sha1_key=k2)
2401         s2.add_vpp_config()
2402         self.logger.info(self.vapi.ppcli("show bfd keys"))
2403         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2404         self.logger.info(self.vapi.ppcli("show bfd"))
2405
2406     def test_set_del_sha1_key(self):
2407         """ set/delete SHA1 auth key """
2408         k = self.factory.create_random_key(self)
2409         self.registry.register(k, self.logger)
2410         self.cli_verify_no_response(
2411             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2412             (k.conf_key_id,
2413                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2414         self.assertTrue(k.query_vpp_config())
2415         self.vpp_session = VppBFDUDPSession(
2416             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2417         self.vpp_session.add_vpp_config()
2418         self.test_session = \
2419             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2420                            bfd_key_id=self.vpp_session.bfd_key_id)
2421         self.vapi.want_bfd_events()
2422         bfd_session_up(self)
2423         bfd_session_down(self)
2424         # try to replace the secret for the key - should fail because the key
2425         # is in-use
2426         k2 = self.factory.create_random_key(self)
2427         self.cli_verify_response(
2428             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2429             (k.conf_key_id,
2430                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2431             "bfd key set: `bfd_auth_set_key' API call failed, "
2432             "rv=-103:BFD object in use")
2433         # manipulating the session using old secret should still work
2434         bfd_session_up(self)
2435         bfd_session_down(self)
2436         self.vpp_session.remove_vpp_config()
2437         self.cli_verify_no_response(
2438             "bfd key del conf-key-id %s" % k.conf_key_id)
2439         self.assertFalse(k.query_vpp_config())
2440
2441     def test_set_del_meticulous_sha1_key(self):
2442         """ set/delete meticulous SHA1 auth key """
2443         k = self.factory.create_random_key(
2444             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2445         self.registry.register(k, self.logger)
2446         self.cli_verify_no_response(
2447             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2448             (k.conf_key_id,
2449                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2450         self.assertTrue(k.query_vpp_config())
2451         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2452                                             self.pg0.remote_ip6, af=AF_INET6,
2453                                             sha1_key=k)
2454         self.vpp_session.add_vpp_config()
2455         self.vpp_session.admin_up()
2456         self.test_session = \
2457             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2458                            bfd_key_id=self.vpp_session.bfd_key_id)
2459         self.vapi.want_bfd_events()
2460         bfd_session_up(self)
2461         bfd_session_down(self)
2462         # try to replace the secret for the key - should fail because the key
2463         # is in-use
2464         k2 = self.factory.create_random_key(self)
2465         self.cli_verify_response(
2466             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2467             (k.conf_key_id,
2468                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2469             "bfd key set: `bfd_auth_set_key' API call failed, "
2470             "rv=-103:BFD object in use")
2471         # manipulating the session using old secret should still work
2472         bfd_session_up(self)
2473         bfd_session_down(self)
2474         self.vpp_session.remove_vpp_config()
2475         self.cli_verify_no_response(
2476             "bfd key del conf-key-id %s" % k.conf_key_id)
2477         self.assertFalse(k.query_vpp_config())
2478
2479     def test_add_mod_del_bfd_udp(self):
2480         """ create/modify/delete IPv4 BFD UDP session """
2481         vpp_session = VppBFDUDPSession(
2482             self, self.pg0, self.pg0.remote_ip4)
2483         self.registry.register(vpp_session, self.logger)
2484         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2485             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2486             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2487                                 self.pg0.remote_ip4,
2488                                 vpp_session.desired_min_tx,
2489                                 vpp_session.required_min_rx,
2490                                 vpp_session.detect_mult)
2491         self.cli_verify_no_response(cli_add_cmd)
2492         # 2nd add should fail
2493         self.cli_verify_response(
2494             cli_add_cmd,
2495             "bfd udp session add: `bfd_add_add_session' API call"
2496             " failed, rv=-101:Duplicate BFD object")
2497         verify_bfd_session_config(self, vpp_session)
2498         mod_session = VppBFDUDPSession(
2499             self, self.pg0, self.pg0.remote_ip4,
2500             required_min_rx=2 * vpp_session.required_min_rx,
2501             desired_min_tx=3 * vpp_session.desired_min_tx,
2502             detect_mult=4 * vpp_session.detect_mult)
2503         self.cli_verify_no_response(
2504             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2505             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2506             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2507              mod_session.desired_min_tx, mod_session.required_min_rx,
2508              mod_session.detect_mult))
2509         verify_bfd_session_config(self, mod_session)
2510         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2511             "peer-addr %s" % (self.pg0.name,
2512                               self.pg0.local_ip4, self.pg0.remote_ip4)
2513         self.cli_verify_no_response(cli_del_cmd)
2514         # 2nd del is expected to fail
2515         self.cli_verify_response(
2516             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2517             " failed, rv=-102:No such BFD object")
2518         self.assertFalse(vpp_session.query_vpp_config())
2519
2520     def test_add_mod_del_bfd_udp6(self):
2521         """ create/modify/delete IPv6 BFD UDP session """
2522         vpp_session = VppBFDUDPSession(
2523             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2524         self.registry.register(vpp_session, self.logger)
2525         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2526             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2527             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2528                                 self.pg0.remote_ip6,
2529                                 vpp_session.desired_min_tx,
2530                                 vpp_session.required_min_rx,
2531                                 vpp_session.detect_mult)
2532         self.cli_verify_no_response(cli_add_cmd)
2533         # 2nd add should fail
2534         self.cli_verify_response(
2535             cli_add_cmd,
2536             "bfd udp session add: `bfd_add_add_session' API call"
2537             " failed, rv=-101:Duplicate BFD object")
2538         verify_bfd_session_config(self, vpp_session)
2539         mod_session = VppBFDUDPSession(
2540             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2541             required_min_rx=2 * vpp_session.required_min_rx,
2542             desired_min_tx=3 * vpp_session.desired_min_tx,
2543             detect_mult=4 * vpp_session.detect_mult)
2544         self.cli_verify_no_response(
2545             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2546             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2547             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2548              mod_session.desired_min_tx,
2549              mod_session.required_min_rx, mod_session.detect_mult))
2550         verify_bfd_session_config(self, mod_session)
2551         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2552             "peer-addr %s" % (self.pg0.name,
2553                               self.pg0.local_ip6, self.pg0.remote_ip6)
2554         self.cli_verify_no_response(cli_del_cmd)
2555         # 2nd del is expected to fail
2556         self.cli_verify_response(
2557             cli_del_cmd,
2558             "bfd udp session del: `bfd_udp_del_session' API call"
2559             " failed, rv=-102:No such BFD object")
2560         self.assertFalse(vpp_session.query_vpp_config())
2561
2562     def test_add_mod_del_bfd_udp_auth(self):
2563         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2564         key = self.factory.create_random_key(self)
2565         key.add_vpp_config()
2566         vpp_session = VppBFDUDPSession(
2567             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2568         self.registry.register(vpp_session, self.logger)
2569         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2570             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2571             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2572             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2573                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2574                vpp_session.detect_mult, key.conf_key_id,
2575                vpp_session.bfd_key_id)
2576         self.cli_verify_no_response(cli_add_cmd)
2577         # 2nd add should fail
2578         self.cli_verify_response(
2579             cli_add_cmd,
2580             "bfd udp session add: `bfd_add_add_session' API call"
2581             " failed, rv=-101:Duplicate BFD object")
2582         verify_bfd_session_config(self, vpp_session)
2583         mod_session = VppBFDUDPSession(
2584             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2585             bfd_key_id=vpp_session.bfd_key_id,
2586             required_min_rx=2 * vpp_session.required_min_rx,
2587             desired_min_tx=3 * vpp_session.desired_min_tx,
2588             detect_mult=4 * vpp_session.detect_mult)
2589         self.cli_verify_no_response(
2590             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2591             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2592             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2593              mod_session.desired_min_tx,
2594              mod_session.required_min_rx, mod_session.detect_mult))
2595         verify_bfd_session_config(self, mod_session)
2596         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2597             "peer-addr %s" % (self.pg0.name,
2598                               self.pg0.local_ip4, self.pg0.remote_ip4)
2599         self.cli_verify_no_response(cli_del_cmd)
2600         # 2nd del is expected to fail
2601         self.cli_verify_response(
2602             cli_del_cmd,
2603             "bfd udp session del: `bfd_udp_del_session' API call"
2604             " failed, rv=-102:No such BFD object")
2605         self.assertFalse(vpp_session.query_vpp_config())
2606
2607     def test_add_mod_del_bfd_udp6_auth(self):
2608         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2609         key = self.factory.create_random_key(
2610             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2611         key.add_vpp_config()
2612         vpp_session = VppBFDUDPSession(
2613             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2614         self.registry.register(vpp_session, self.logger)
2615         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2616             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2617             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2618             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2619                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2620                vpp_session.detect_mult, key.conf_key_id,
2621                vpp_session.bfd_key_id)
2622         self.cli_verify_no_response(cli_add_cmd)
2623         # 2nd add should fail
2624         self.cli_verify_response(
2625             cli_add_cmd,
2626             "bfd udp session add: `bfd_add_add_session' API call"
2627             " failed, rv=-101:Duplicate BFD object")
2628         verify_bfd_session_config(self, vpp_session)
2629         mod_session = VppBFDUDPSession(
2630             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2631             bfd_key_id=vpp_session.bfd_key_id,
2632             required_min_rx=2 * vpp_session.required_min_rx,
2633             desired_min_tx=3 * vpp_session.desired_min_tx,
2634             detect_mult=4 * vpp_session.detect_mult)
2635         self.cli_verify_no_response(
2636             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2637             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2638             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2639              mod_session.desired_min_tx,
2640              mod_session.required_min_rx, mod_session.detect_mult))
2641         verify_bfd_session_config(self, mod_session)
2642         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2643             "peer-addr %s" % (self.pg0.name,
2644                               self.pg0.local_ip6, self.pg0.remote_ip6)
2645         self.cli_verify_no_response(cli_del_cmd)
2646         # 2nd del is expected to fail
2647         self.cli_verify_response(
2648             cli_del_cmd,
2649             "bfd udp session del: `bfd_udp_del_session' API call"
2650             " failed, rv=-102:No such BFD object")
2651         self.assertFalse(vpp_session.query_vpp_config())
2652
2653     def test_auth_on_off(self):
2654         """ turn authentication on and off """
2655         key = self.factory.create_random_key(
2656             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2657         key.add_vpp_config()
2658         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2659         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2660                                         sha1_key=key)
2661         session.add_vpp_config()
2662         cli_activate = \
2663             "bfd udp session auth activate interface %s local-addr %s "\
2664             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2665             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2666                key.conf_key_id, auth_session.bfd_key_id)
2667         self.cli_verify_no_response(cli_activate)
2668         verify_bfd_session_config(self, auth_session)
2669         self.cli_verify_no_response(cli_activate)
2670         verify_bfd_session_config(self, auth_session)
2671         cli_deactivate = \
2672             "bfd udp session auth deactivate interface %s local-addr %s "\
2673             "peer-addr %s "\
2674             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2675         self.cli_verify_no_response(cli_deactivate)
2676         verify_bfd_session_config(self, session)
2677         self.cli_verify_no_response(cli_deactivate)
2678         verify_bfd_session_config(self, session)
2679
2680     def test_auth_on_off_delayed(self):
2681         """ turn authentication on and off (delayed) """
2682         key = self.factory.create_random_key(
2683             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2684         key.add_vpp_config()
2685         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2686         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2687                                         sha1_key=key)
2688         session.add_vpp_config()
2689         cli_activate = \
2690             "bfd udp session auth activate interface %s local-addr %s "\
2691             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2692             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2693                key.conf_key_id, auth_session.bfd_key_id)
2694         self.cli_verify_no_response(cli_activate)
2695         verify_bfd_session_config(self, auth_session)
2696         self.cli_verify_no_response(cli_activate)
2697         verify_bfd_session_config(self, auth_session)
2698         cli_deactivate = \
2699             "bfd udp session auth deactivate interface %s local-addr %s "\
2700             "peer-addr %s delayed yes"\
2701             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2702         self.cli_verify_no_response(cli_deactivate)
2703         verify_bfd_session_config(self, session)
2704         self.cli_verify_no_response(cli_deactivate)
2705         verify_bfd_session_config(self, session)
2706
2707     def test_admin_up_down(self):
2708         """ put session admin-up and admin-down """
2709         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2710         session.add_vpp_config()
2711         cli_down = \
2712             "bfd udp session set-flags admin down interface %s local-addr %s "\
2713             "peer-addr %s "\
2714             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2715         cli_up = \
2716             "bfd udp session set-flags admin up interface %s local-addr %s "\
2717             "peer-addr %s "\
2718             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2719         self.cli_verify_no_response(cli_down)
2720         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2721         self.cli_verify_no_response(cli_up)
2722         verify_bfd_session_config(self, session, state=BFDState.down)
2723
2724     def test_set_del_udp_echo_source(self):
2725         """ set/del udp echo source """
2726         self.create_loopback_interfaces(1)
2727         self.loopback0 = self.lo_interfaces[0]
2728         self.loopback0.admin_up()
2729         self.cli_verify_response("show bfd echo-source",
2730                                  "UDP echo source is not set.")
2731         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2732         self.cli_verify_no_response(cli_set)
2733         self.cli_verify_response("show bfd echo-source",
2734                                  "UDP echo source is: %s\n"
2735                                  "IPv4 address usable as echo source: none\n"
2736                                  "IPv6 address usable as echo source: none" %
2737                                  self.loopback0.name)
2738         self.loopback0.config_ip4()
2739         echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2740             self.loopback0.local_ip4)) ^ 1))
2741         self.cli_verify_response("show bfd echo-source",
2742                                  "UDP echo source is: %s\n"
2743                                  "IPv4 address usable as echo source: %s\n"
2744                                  "IPv6 address usable as echo source: none" %
2745                                  (self.loopback0.name, echo_ip4))
2746         echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2747             self.loopback0.local_ip6)) ^ 1))
2748         self.loopback0.config_ip6()
2749         self.cli_verify_response("show bfd echo-source",
2750                                  "UDP echo source is: %s\n"
2751                                  "IPv4 address usable as echo source: %s\n"
2752                                  "IPv6 address usable as echo source: %s" %
2753                                  (self.loopback0.name, echo_ip4, echo_ip6))
2754         cli_del = "bfd udp echo-source del"
2755         self.cli_verify_no_response(cli_del)
2756         self.cli_verify_response("show bfd echo-source",
2757                                  "UDP echo source is not set.")
2758
2759
2760 if __name__ == '__main__':
2761     unittest.main(testRunner=VppTestRunner)