ethernet: add sanity checks to p2p_ethernet_add/del
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import ipaddress
9 import time
10 import unittest
11 from random import randint, shuffle, getrandbits
12 from socket import AF_INET, AF_INET6, inet_ntop
13 from struct import pack, unpack
14
15 from six import moves
16 import scapy.compat
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
21
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23     BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from util import ppp
26 from vpp_ip import DpoProto
27 from vpp_ip_route import VppIpRoute, VppRoutePath
28 from vpp_lo_interface import VppLoInterface
29 from vpp_papi_provider import UnexpectedApiReturnValueError, \
30     CliFailedCommandError
31 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
32 from vpp_gre_interface import VppGreInterface
33 from vpp_papi import VppEnum
34
35 USEC_IN_SEC = 1000000
36
37
38 class AuthKeyFactory(object):
39     """Factory class for creating auth keys with unique conf key ID"""
40
41     def __init__(self):
42         self._conf_key_ids = {}
43
44     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
45         """ create a random key with unique conf key id """
46         conf_key_id = randint(0, 0xFFFFFFFF)
47         while conf_key_id in self._conf_key_ids:
48             conf_key_id = randint(0, 0xFFFFFFFF)
49         self._conf_key_ids[conf_key_id] = 1
50         key = scapy.compat.raw(
51             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
52         return VppBFDAuthKey(test=test, auth_type=auth_type,
53                              conf_key_id=conf_key_id, key=key)
54
55
56 class BFDAPITestCase(VppTestCase):
57     """Bidirectional Forwarding Detection (BFD) - API"""
58
59     pg0 = None
60     pg1 = None
61
62     @classmethod
63     def setUpClass(cls):
64         super(BFDAPITestCase, cls).setUpClass()
65         cls.vapi.cli("set log class bfd level debug")
66         try:
67             cls.create_pg_interfaces(range(2))
68             for i in cls.pg_interfaces:
69                 i.config_ip4()
70                 i.config_ip6()
71                 i.resolve_arp()
72
73         except Exception:
74             super(BFDAPITestCase, cls).tearDownClass()
75             raise
76
77     @classmethod
78     def tearDownClass(cls):
79         super(BFDAPITestCase, cls).tearDownClass()
80
81     def setUp(self):
82         super(BFDAPITestCase, self).setUp()
83         self.factory = AuthKeyFactory()
84
85     def test_add_bfd(self):
86         """ create a BFD session """
87         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
88         session.add_vpp_config()
89         self.logger.debug("Session state is %s", session.state)
90         session.remove_vpp_config()
91         session.add_vpp_config()
92         self.logger.debug("Session state is %s", session.state)
93         session.remove_vpp_config()
94
95     def test_double_add(self):
96         """ create the same BFD session twice (negative case) """
97         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
98         session.add_vpp_config()
99
100         with self.vapi.assert_negative_api_retval():
101             session.add_vpp_config()
102
103         session.remove_vpp_config()
104
105     def test_add_bfd6(self):
106         """ create IPv6 BFD session """
107         session = VppBFDUDPSession(
108             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
109         session.add_vpp_config()
110         self.logger.debug("Session state is %s", session.state)
111         session.remove_vpp_config()
112         session.add_vpp_config()
113         self.logger.debug("Session state is %s", session.state)
114         session.remove_vpp_config()
115
116     def test_mod_bfd(self):
117         """ modify BFD session parameters """
118         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
119                                    desired_min_tx=50000,
120                                    required_min_rx=10000,
121                                    detect_mult=1)
122         session.add_vpp_config()
123         s = session.get_bfd_udp_session_dump_entry()
124         self.assert_equal(session.desired_min_tx,
125                           s.desired_min_tx,
126                           "desired min transmit interval")
127         self.assert_equal(session.required_min_rx,
128                           s.required_min_rx,
129                           "required min receive interval")
130         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
131         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
132                                   required_min_rx=session.required_min_rx * 2,
133                                   detect_mult=session.detect_mult * 2)
134         s = session.get_bfd_udp_session_dump_entry()
135         self.assert_equal(session.desired_min_tx,
136                           s.desired_min_tx,
137                           "desired min transmit interval")
138         self.assert_equal(session.required_min_rx,
139                           s.required_min_rx,
140                           "required min receive interval")
141         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
142
143     def test_add_sha1_keys(self):
144         """ add SHA1 keys """
145         key_count = 10
146         keys = [self.factory.create_random_key(
147             self) for i in range(0, key_count)]
148         for key in keys:
149             self.assertFalse(key.query_vpp_config())
150         for key in keys:
151             key.add_vpp_config()
152         for key in keys:
153             self.assertTrue(key.query_vpp_config())
154         # remove randomly
155         indexes = list(range(key_count))
156         shuffle(indexes)
157         removed = []
158         for i in indexes:
159             key = keys[i]
160             key.remove_vpp_config()
161             removed.append(i)
162             for j in range(key_count):
163                 key = keys[j]
164                 if j in removed:
165                     self.assertFalse(key.query_vpp_config())
166                 else:
167                     self.assertTrue(key.query_vpp_config())
168         # should be removed now
169         for key in keys:
170             self.assertFalse(key.query_vpp_config())
171         # add back and remove again
172         for key in keys:
173             key.add_vpp_config()
174         for key in keys:
175             self.assertTrue(key.query_vpp_config())
176         for key in keys:
177             key.remove_vpp_config()
178         for key in keys:
179             self.assertFalse(key.query_vpp_config())
180
181     def test_add_bfd_sha1(self):
182         """ create a BFD session (SHA1) """
183         key = self.factory.create_random_key(self)
184         key.add_vpp_config()
185         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186                                    sha1_key=key)
187         session.add_vpp_config()
188         self.logger.debug("Session state is %s", session.state)
189         session.remove_vpp_config()
190         session.add_vpp_config()
191         self.logger.debug("Session state is %s", session.state)
192         session.remove_vpp_config()
193
194     def test_double_add_sha1(self):
195         """ create the same BFD session twice (negative case) (SHA1) """
196         key = self.factory.create_random_key(self)
197         key.add_vpp_config()
198         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
199                                    sha1_key=key)
200         session.add_vpp_config()
201         with self.assertRaises(Exception):
202             session.add_vpp_config()
203
204     def test_add_auth_nonexistent_key(self):
205         """ create BFD session using non-existent SHA1 (negative case) """
206         session = VppBFDUDPSession(
207             self, self.pg0, self.pg0.remote_ip4,
208             sha1_key=self.factory.create_random_key(self))
209         with self.assertRaises(Exception):
210             session.add_vpp_config()
211
212     def test_shared_sha1_key(self):
213         """ share single SHA1 key between multiple BFD sessions """
214         key = self.factory.create_random_key(self)
215         key.add_vpp_config()
216         sessions = [
217             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
218                              sha1_key=key),
219             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
220                              sha1_key=key, af=AF_INET6),
221             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
222                              sha1_key=key),
223             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
224                              sha1_key=key, af=AF_INET6)]
225         for s in sessions:
226             s.add_vpp_config()
227         removed = 0
228         for s in sessions:
229             e = key.get_bfd_auth_keys_dump_entry()
230             self.assert_equal(e.use_count, len(sessions) - removed,
231                               "Use count for shared key")
232             s.remove_vpp_config()
233             removed += 1
234         e = key.get_bfd_auth_keys_dump_entry()
235         self.assert_equal(e.use_count, len(sessions) - removed,
236                           "Use count for shared key")
237
238     def test_activate_auth(self):
239         """ activate SHA1 authentication """
240         key = self.factory.create_random_key(self)
241         key.add_vpp_config()
242         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
243         session.add_vpp_config()
244         session.activate_auth(key)
245
246     def test_deactivate_auth(self):
247         """ deactivate SHA1 authentication """
248         key = self.factory.create_random_key(self)
249         key.add_vpp_config()
250         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
251         session.add_vpp_config()
252         session.activate_auth(key)
253         session.deactivate_auth()
254
255     def test_change_key(self):
256         """ change SHA1 key """
257         key1 = self.factory.create_random_key(self)
258         key2 = self.factory.create_random_key(self)
259         while key2.conf_key_id == key1.conf_key_id:
260             key2 = self.factory.create_random_key(self)
261         key1.add_vpp_config()
262         key2.add_vpp_config()
263         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
264                                    sha1_key=key1)
265         session.add_vpp_config()
266         session.activate_auth(key2)
267
268     def test_set_del_udp_echo_source(self):
269         """ set/del udp echo source """
270         self.create_loopback_interfaces(1)
271         self.loopback0 = self.lo_interfaces[0]
272         self.loopback0.admin_up()
273         echo_source = self.vapi.bfd_udp_get_echo_source()
274         self.assertFalse(echo_source.is_set)
275         self.assertFalse(echo_source.have_usable_ip4)
276         self.assertFalse(echo_source.have_usable_ip6)
277
278         self.vapi.bfd_udp_set_echo_source(
279             sw_if_index=self.loopback0.sw_if_index)
280         echo_source = self.vapi.bfd_udp_get_echo_source()
281         self.assertTrue(echo_source.is_set)
282         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
283         self.assertFalse(echo_source.have_usable_ip4)
284         self.assertFalse(echo_source.have_usable_ip6)
285
286         self.loopback0.config_ip4()
287         echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
288             self.loopback0.local_ip4)) ^ 1).packed
289         echo_source = self.vapi.bfd_udp_get_echo_source()
290         self.assertTrue(echo_source.is_set)
291         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
292         self.assertTrue(echo_source.have_usable_ip4)
293         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
294         self.assertFalse(echo_source.have_usable_ip6)
295
296         self.loopback0.config_ip6()
297         echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
298             self.loopback0.local_ip6)) ^ 1).packed
299
300         echo_source = self.vapi.bfd_udp_get_echo_source()
301         self.assertTrue(echo_source.is_set)
302         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
303         self.assertTrue(echo_source.have_usable_ip4)
304         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
305         self.assertTrue(echo_source.have_usable_ip6)
306         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
307
308         self.vapi.bfd_udp_del_echo_source()
309         echo_source = self.vapi.bfd_udp_get_echo_source()
310         self.assertFalse(echo_source.is_set)
311         self.assertFalse(echo_source.have_usable_ip4)
312         self.assertFalse(echo_source.have_usable_ip6)
313
314
315 class BFDTestSession(object):
316     """ BFD session as seen from test framework side """
317
318     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
319                  bfd_key_id=None, our_seq_number=None,
320                  tunnel_header=None, phy_interface=None):
321         self.test = test
322         self.af = af
323         self.sha1_key = sha1_key
324         self.bfd_key_id = bfd_key_id
325         self.interface = interface
326         if phy_interface:
327             self.phy_interface = phy_interface
328         else:
329             self.phy_interface = self.interface
330         self.udp_sport = randint(49152, 65535)
331         if our_seq_number is None:
332             self.our_seq_number = randint(0, 40000000)
333         else:
334             self.our_seq_number = our_seq_number
335         self.vpp_seq_number = None
336         self.my_discriminator = 0
337         self.desired_min_tx = 300000
338         self.required_min_rx = 300000
339         self.required_min_echo_rx = None
340         self.detect_mult = detect_mult
341         self.diag = BFDDiagCode.no_diagnostic
342         self.your_discriminator = None
343         self.state = BFDState.down
344         self.auth_type = BFDAuthType.no_auth
345         self.tunnel_header = tunnel_header
346
347     def inc_seq_num(self):
348         """ increment sequence number, wrapping if needed """
349         if self.our_seq_number == 0xFFFFFFFF:
350             self.our_seq_number = 0
351         else:
352             self.our_seq_number += 1
353
354     def update(self, my_discriminator=None, your_discriminator=None,
355                desired_min_tx=None, required_min_rx=None,
356                required_min_echo_rx=None, detect_mult=None,
357                diag=None, state=None, auth_type=None):
358         """ update BFD parameters associated with session """
359         if my_discriminator is not None:
360             self.my_discriminator = my_discriminator
361         if your_discriminator is not None:
362             self.your_discriminator = your_discriminator
363         if required_min_rx is not None:
364             self.required_min_rx = required_min_rx
365         if required_min_echo_rx is not None:
366             self.required_min_echo_rx = required_min_echo_rx
367         if desired_min_tx is not None:
368             self.desired_min_tx = desired_min_tx
369         if detect_mult is not None:
370             self.detect_mult = detect_mult
371         if diag is not None:
372             self.diag = diag
373         if state is not None:
374             self.state = state
375         if auth_type is not None:
376             self.auth_type = auth_type
377
378     def fill_packet_fields(self, packet):
379         """ set packet fields with known values in packet """
380         bfd = packet[BFD]
381         if self.my_discriminator:
382             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
383                                    self.my_discriminator)
384             bfd.my_discriminator = self.my_discriminator
385         if self.your_discriminator:
386             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
387                                    self.your_discriminator)
388             bfd.your_discriminator = self.your_discriminator
389         if self.required_min_rx:
390             self.test.logger.debug(
391                 "BFD: setting packet.required_min_rx_interval=%s",
392                 self.required_min_rx)
393             bfd.required_min_rx_interval = self.required_min_rx
394         if self.required_min_echo_rx:
395             self.test.logger.debug(
396                 "BFD: setting packet.required_min_echo_rx=%s",
397                 self.required_min_echo_rx)
398             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
399         if self.desired_min_tx:
400             self.test.logger.debug(
401                 "BFD: setting packet.desired_min_tx_interval=%s",
402                 self.desired_min_tx)
403             bfd.desired_min_tx_interval = self.desired_min_tx
404         if self.detect_mult:
405             self.test.logger.debug(
406                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
407             bfd.detect_mult = self.detect_mult
408         if self.diag:
409             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
410             bfd.diag = self.diag
411         if self.state:
412             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
413             bfd.state = self.state
414         if self.auth_type:
415             # this is used by a negative test-case
416             self.test.logger.debug("BFD: setting packet.auth_type=%s",
417                                    self.auth_type)
418             bfd.auth_type = self.auth_type
419
420     def create_packet(self):
421         """ create a BFD packet, reflecting the current state of session """
422         if self.sha1_key:
423             bfd = BFD(flags="A")
424             bfd.auth_type = self.sha1_key.auth_type
425             bfd.auth_len = BFD.sha1_auth_len
426             bfd.auth_key_id = self.bfd_key_id
427             bfd.auth_seq_num = self.our_seq_number
428             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
429         else:
430             bfd = BFD()
431         packet = Ether(src=self.phy_interface.remote_mac,
432                        dst=self.phy_interface.local_mac)
433         if self.tunnel_header:
434             packet = packet / self.tunnel_header
435         if self.af == AF_INET6:
436             packet = (packet /
437                       IPv6(src=self.interface.remote_ip6,
438                            dst=self.interface.local_ip6,
439                            hlim=255) /
440                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
441                       bfd)
442         else:
443             packet = (packet /
444                       IP(src=self.interface.remote_ip4,
445                          dst=self.interface.local_ip4,
446                          ttl=255) /
447                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
448                       bfd)
449         self.test.logger.debug("BFD: Creating packet")
450         self.fill_packet_fields(packet)
451         if self.sha1_key:
452             hash_material = scapy.compat.raw(
453                 packet[BFD])[:32] + self.sha1_key.key + \
454                 b"\0" * (20 - len(self.sha1_key.key))
455             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
456                                    hashlib.sha1(hash_material).hexdigest())
457             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
458         return packet
459
460     def send_packet(self, packet=None, interface=None):
461         """ send packet on interface, creating the packet if needed """
462         if packet is None:
463             packet = self.create_packet()
464         if interface is None:
465             interface = self.phy_interface
466         self.test.logger.debug(ppp("Sending packet:", packet))
467         interface.add_stream(packet)
468         self.test.pg_start()
469
470     def verify_sha1_auth(self, packet):
471         """ Verify correctness of authentication in BFD layer. """
472         bfd = packet[BFD]
473         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
474         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
475                                BFDAuthType)
476         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
477         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
478         if self.vpp_seq_number is None:
479             self.vpp_seq_number = bfd.auth_seq_num
480             self.test.logger.debug("Received initial sequence number: %s" %
481                                    self.vpp_seq_number)
482         else:
483             recvd_seq_num = bfd.auth_seq_num
484             self.test.logger.debug("Received followup sequence number: %s" %
485                                    recvd_seq_num)
486             if self.vpp_seq_number < 0xffffffff:
487                 if self.sha1_key.auth_type == \
488                         BFDAuthType.meticulous_keyed_sha1:
489                     self.test.assert_equal(recvd_seq_num,
490                                            self.vpp_seq_number + 1,
491                                            "BFD sequence number")
492                 else:
493                     self.test.assert_in_range(recvd_seq_num,
494                                               self.vpp_seq_number,
495                                               self.vpp_seq_number + 1,
496                                               "BFD sequence number")
497             else:
498                 if self.sha1_key.auth_type == \
499                         BFDAuthType.meticulous_keyed_sha1:
500                     self.test.assert_equal(recvd_seq_num, 0,
501                                            "BFD sequence number")
502                 else:
503                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
504                                        "BFD sequence number not one of "
505                                        "(%s, 0)" % self.vpp_seq_number)
506             self.vpp_seq_number = recvd_seq_num
507         # last 20 bytes represent the hash - so replace them with the key,
508         # pad the result with zeros and hash the result
509         hash_material = bfd.original[:-20] + self.sha1_key.key + \
510             b"\0" * (20 - len(self.sha1_key.key))
511         expected_hash = hashlib.sha1(hash_material).hexdigest()
512         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
513                                expected_hash.encode(), "Auth key hash")
514
515     def verify_bfd(self, packet):
516         """ Verify correctness of BFD layer. """
517         bfd = packet[BFD]
518         self.test.assert_equal(bfd.version, 1, "BFD version")
519         self.test.assert_equal(bfd.your_discriminator,
520                                self.my_discriminator,
521                                "BFD - your discriminator")
522         if self.sha1_key:
523             self.verify_sha1_auth(packet)
524
525
526 def bfd_session_up(test):
527     """ Bring BFD session up """
528     test.logger.info("BFD: Waiting for slow hello")
529     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
530     old_offset = None
531     if hasattr(test, 'vpp_clock_offset'):
532         old_offset = test.vpp_clock_offset
533     test.vpp_clock_offset = time.time() - float(p.time)
534     test.logger.debug("BFD: Calculated vpp clock offset: %s",
535                       test.vpp_clock_offset)
536     if old_offset:
537         test.assertAlmostEqual(
538             old_offset, test.vpp_clock_offset, delta=0.5,
539             msg="vpp clock offset not stable (new: %s, old: %s)" %
540             (test.vpp_clock_offset, old_offset))
541     test.logger.info("BFD: Sending Init")
542     test.test_session.update(my_discriminator=randint(0, 40000000),
543                              your_discriminator=p[BFD].my_discriminator,
544                              state=BFDState.init)
545     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
546             BFDAuthType.meticulous_keyed_sha1:
547         test.test_session.inc_seq_num()
548     test.test_session.send_packet()
549     test.logger.info("BFD: Waiting for event")
550     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
551     verify_event(test, e, expected_state=BFDState.up)
552     test.logger.info("BFD: Session is Up")
553     test.test_session.update(state=BFDState.up)
554     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
555             BFDAuthType.meticulous_keyed_sha1:
556         test.test_session.inc_seq_num()
557     test.test_session.send_packet()
558     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
559
560
561 def bfd_session_down(test):
562     """ Bring BFD session down """
563     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
564     test.test_session.update(state=BFDState.down)
565     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
566             BFDAuthType.meticulous_keyed_sha1:
567         test.test_session.inc_seq_num()
568     test.test_session.send_packet()
569     test.logger.info("BFD: Waiting for event")
570     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
571     verify_event(test, e, expected_state=BFDState.down)
572     test.logger.info("BFD: Session is Down")
573     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
574
575
576 def verify_bfd_session_config(test, session, state=None):
577     dump = session.get_bfd_udp_session_dump_entry()
578     test.assertIsNotNone(dump)
579     # since dump is not none, we have verified that sw_if_index and addresses
580     # are valid (in get_bfd_udp_session_dump_entry)
581     if state:
582         test.assert_equal(dump.state, state, "session state")
583     test.assert_equal(dump.required_min_rx, session.required_min_rx,
584                       "required min rx interval")
585     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
586                       "desired min tx interval")
587     test.assert_equal(dump.detect_mult, session.detect_mult,
588                       "detect multiplier")
589     if session.sha1_key is None:
590         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
591     else:
592         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
593         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
594                           "bfd key id")
595         test.assert_equal(dump.conf_key_id,
596                           session.sha1_key.conf_key_id,
597                           "config key id")
598
599
600 def verify_ip(test, packet):
601     """ Verify correctness of IP layer. """
602     if test.vpp_session.af == AF_INET6:
603         ip = packet[IPv6]
604         local_ip = test.vpp_session.interface.local_ip6
605         remote_ip = test.vpp_session.interface.remote_ip6
606         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
607     else:
608         ip = packet[IP]
609         local_ip = test.vpp_session.interface.local_ip4
610         remote_ip = test.vpp_session.interface.remote_ip4
611         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
612     test.assert_equal(ip.src, local_ip, "IP source address")
613     test.assert_equal(ip.dst, remote_ip, "IP destination address")
614
615
616 def verify_udp(test, packet):
617     """ Verify correctness of UDP layer. """
618     udp = packet[UDP]
619     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
620     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
621                          "UDP source port")
622
623
624 def verify_event(test, event, expected_state):
625     """ Verify correctness of event values. """
626     e = event
627     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
628     test.assert_equal(e.sw_if_index,
629                       test.vpp_session.interface.sw_if_index,
630                       "BFD interface index")
631
632     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
633                       "Local IPv6 address")
634     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
635                       "Peer IPv6 address")
636     test.assert_equal(e.state, expected_state, BFDState)
637
638
639 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
640     """ wait for BFD packet and verify its correctness
641
642     :param timeout: how long to wait
643     :param pcap_time_min: ignore packets with pcap timestamp lower than this
644
645     :returns: tuple (packet, time spent waiting for packet)
646     """
647     test.logger.info("BFD: Waiting for BFD packet")
648     deadline = time.time() + timeout
649     counter = 0
650     while True:
651         counter += 1
652         # sanity check
653         test.assert_in_range(counter, 0, 100, "number of packets ignored")
654         time_left = deadline - time.time()
655         if time_left < 0:
656             raise CaptureTimeoutError("Packet did not arrive within timeout")
657         p = test.pg0.wait_for_packet(timeout=time_left)
658         test.logger.debug(ppp("BFD: Got packet:", p))
659         if pcap_time_min is not None and p.time < pcap_time_min:
660             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
661                                   "pcap time min %s):" %
662                                   (p.time, pcap_time_min), p))
663         else:
664             break
665     if is_tunnel:
666         # strip an IP layer and move to the next
667         p = p[IP].payload
668
669     bfd = p[BFD]
670     if bfd is None:
671         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
672     if bfd.payload:
673         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
674     verify_ip(test, p)
675     verify_udp(test, p)
676     test.test_session.verify_bfd(p)
677     return p
678
679
680 class BFD4TestCase(VppTestCase):
681     """Bidirectional Forwarding Detection (BFD)"""
682
683     pg0 = None
684     vpp_clock_offset = None
685     vpp_session = None
686     test_session = None
687
688     @classmethod
689     def setUpClass(cls):
690         super(BFD4TestCase, cls).setUpClass()
691         cls.vapi.cli("set log class bfd level debug")
692         try:
693             cls.create_pg_interfaces([0])
694             cls.create_loopback_interfaces(1)
695             cls.loopback0 = cls.lo_interfaces[0]
696             cls.loopback0.config_ip4()
697             cls.loopback0.admin_up()
698             cls.pg0.config_ip4()
699             cls.pg0.configure_ipv4_neighbors()
700             cls.pg0.admin_up()
701             cls.pg0.resolve_arp()
702
703         except Exception:
704             super(BFD4TestCase, cls).tearDownClass()
705             raise
706
707     @classmethod
708     def tearDownClass(cls):
709         super(BFD4TestCase, cls).tearDownClass()
710
711     def setUp(self):
712         super(BFD4TestCase, self).setUp()
713         self.factory = AuthKeyFactory()
714         self.vapi.want_bfd_events()
715         self.pg0.enable_capture()
716         try:
717             self.vpp_session = VppBFDUDPSession(self, self.pg0,
718                                                 self.pg0.remote_ip4)
719             self.vpp_session.add_vpp_config()
720             self.vpp_session.admin_up()
721             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
722         except BaseException:
723             self.vapi.want_bfd_events(enable_disable=0)
724             raise
725
726     def tearDown(self):
727         if not self.vpp_dead:
728             self.vapi.want_bfd_events(enable_disable=0)
729         self.vapi.collect_events()  # clear the event queue
730         super(BFD4TestCase, self).tearDown()
731
732     def test_session_up(self):
733         """ bring BFD session up """
734         bfd_session_up(self)
735
736     def test_session_up_by_ip(self):
737         """ bring BFD session up - first frame looked up by address pair """
738         self.logger.info("BFD: Sending Slow control frame")
739         self.test_session.update(my_discriminator=randint(0, 40000000))
740         self.test_session.send_packet()
741         self.pg0.enable_capture()
742         p = self.pg0.wait_for_packet(1)
743         self.assert_equal(p[BFD].your_discriminator,
744                           self.test_session.my_discriminator,
745                           "BFD - your discriminator")
746         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
747         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
748                                  state=BFDState.up)
749         self.logger.info("BFD: Waiting for event")
750         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
751         verify_event(self, e, expected_state=BFDState.init)
752         self.logger.info("BFD: Sending Up")
753         self.test_session.send_packet()
754         self.logger.info("BFD: Waiting for event")
755         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
756         verify_event(self, e, expected_state=BFDState.up)
757         self.logger.info("BFD: Session is Up")
758         self.test_session.update(state=BFDState.up)
759         self.test_session.send_packet()
760         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
761
762     def test_session_down(self):
763         """ bring BFD session down """
764         bfd_session_up(self)
765         bfd_session_down(self)
766
767     def test_hold_up(self):
768         """ hold BFD session up """
769         bfd_session_up(self)
770         for dummy in range(self.test_session.detect_mult * 2):
771             wait_for_bfd_packet(self)
772             self.test_session.send_packet()
773         self.assert_equal(len(self.vapi.collect_events()), 0,
774                           "number of bfd events")
775
776     def test_slow_timer(self):
777         """ verify slow periodic control frames while session down """
778         packet_count = 3
779         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
780         prev_packet = wait_for_bfd_packet(self, 2)
781         for dummy in range(packet_count):
782             next_packet = wait_for_bfd_packet(self, 2)
783             time_diff = next_packet.time - prev_packet.time
784             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
785             # to work around timing issues
786             self.assert_in_range(
787                 time_diff, 0.70, 1.05, "time between slow packets")
788             prev_packet = next_packet
789
790     def test_zero_remote_min_rx(self):
791         """ no packets when zero remote required min rx interval """
792         bfd_session_up(self)
793         self.test_session.update(required_min_rx=0)
794         self.test_session.send_packet()
795         for dummy in range(self.test_session.detect_mult):
796             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
797                        "sleep before transmitting bfd packet")
798             self.test_session.send_packet()
799             try:
800                 p = wait_for_bfd_packet(self, timeout=0)
801                 self.logger.error(ppp("Received unexpected packet:", p))
802             except CaptureTimeoutError:
803                 pass
804         self.assert_equal(
805             len(self.vapi.collect_events()), 0, "number of bfd events")
806         self.test_session.update(required_min_rx=300000)
807         for dummy in range(3):
808             self.test_session.send_packet()
809             wait_for_bfd_packet(
810                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
811         self.assert_equal(
812             len(self.vapi.collect_events()), 0, "number of bfd events")
813
814     def test_conn_down(self):
815         """ verify session goes down after inactivity """
816         bfd_session_up(self)
817         detection_time = self.test_session.detect_mult *\
818             self.vpp_session.required_min_rx / USEC_IN_SEC
819         self.sleep(detection_time, "waiting for BFD session time-out")
820         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
821         verify_event(self, e, expected_state=BFDState.down)
822
823     def test_peer_discr_reset_sess_down(self):
824         """ peer discriminator reset after session goes down """
825         bfd_session_up(self)
826         detection_time = self.test_session.detect_mult *\
827             self.vpp_session.required_min_rx / USEC_IN_SEC
828         self.sleep(detection_time, "waiting for BFD session time-out")
829         self.test_session.my_discriminator = 0
830         wait_for_bfd_packet(self,
831                             pcap_time_min=time.time() - self.vpp_clock_offset)
832
833     def test_large_required_min_rx(self):
834         """ large remote required min rx interval """
835         bfd_session_up(self)
836         p = wait_for_bfd_packet(self)
837         interval = 3000000
838         self.test_session.update(required_min_rx=interval)
839         self.test_session.send_packet()
840         time_mark = time.time()
841         count = 0
842         # busy wait here, trying to collect a packet or event, vpp is not
843         # allowed to send packets and the session will timeout first - so the
844         # Up->Down event must arrive before any packets do
845         while time.time() < time_mark + interval / USEC_IN_SEC:
846             try:
847                 p = wait_for_bfd_packet(self, timeout=0)
848                 # if vpp managed to send a packet before we did the session
849                 # session update, then that's fine, ignore it
850                 if p.time < time_mark - self.vpp_clock_offset:
851                     continue
852                 self.logger.error(ppp("Received unexpected packet:", p))
853                 count += 1
854             except CaptureTimeoutError:
855                 pass
856             events = self.vapi.collect_events()
857             if len(events) > 0:
858                 verify_event(self, events[0], BFDState.down)
859                 break
860         self.assert_equal(count, 0, "number of packets received")
861
862     def test_immediate_remote_min_rx_reduction(self):
863         """ immediately honor remote required min rx reduction """
864         self.vpp_session.remove_vpp_config()
865         self.vpp_session = VppBFDUDPSession(
866             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
867         self.pg0.enable_capture()
868         self.vpp_session.add_vpp_config()
869         self.test_session.update(desired_min_tx=1000000,
870                                  required_min_rx=1000000)
871         bfd_session_up(self)
872         reference_packet = wait_for_bfd_packet(self)
873         time_mark = time.time()
874         interval = 300000
875         self.test_session.update(required_min_rx=interval)
876         self.test_session.send_packet()
877         extra_time = time.time() - time_mark
878         p = wait_for_bfd_packet(self)
879         # first packet is allowed to be late by time we spent doing the update
880         # calculated in extra_time
881         self.assert_in_range(p.time - reference_packet.time,
882                              .95 * 0.75 * interval / USEC_IN_SEC,
883                              1.05 * interval / USEC_IN_SEC + extra_time,
884                              "time between BFD packets")
885         reference_packet = p
886         for dummy in range(3):
887             p = wait_for_bfd_packet(self)
888             diff = p.time - reference_packet.time
889             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
890                                  1.05 * interval / USEC_IN_SEC,
891                                  "time between BFD packets")
892             reference_packet = p
893
894     def test_modify_req_min_rx_double(self):
895         """ modify session - double required min rx """
896         bfd_session_up(self)
897         p = wait_for_bfd_packet(self)
898         self.test_session.update(desired_min_tx=10000,
899                                  required_min_rx=10000)
900         self.test_session.send_packet()
901         # double required min rx
902         self.vpp_session.modify_parameters(
903             required_min_rx=2 * self.vpp_session.required_min_rx)
904         p = wait_for_bfd_packet(
905             self, pcap_time_min=time.time() - self.vpp_clock_offset)
906         # poll bit needs to be set
907         self.assertIn("P", p.sprintf("%BFD.flags%"),
908                       "Poll bit not set in BFD packet")
909         # finish poll sequence with final packet
910         final = self.test_session.create_packet()
911         final[BFD].flags = "F"
912         timeout = self.test_session.detect_mult * \
913             max(self.test_session.desired_min_tx,
914                 self.vpp_session.required_min_rx) / USEC_IN_SEC
915         self.test_session.send_packet(final)
916         time_mark = time.time()
917         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
918         verify_event(self, e, expected_state=BFDState.down)
919         time_to_event = time.time() - time_mark
920         self.assert_in_range(time_to_event, .9 * timeout,
921                              1.1 * timeout, "session timeout")
922
923     def test_modify_req_min_rx_halve(self):
924         """ modify session - halve required min rx """
925         self.vpp_session.modify_parameters(
926             required_min_rx=2 * self.vpp_session.required_min_rx)
927         bfd_session_up(self)
928         p = wait_for_bfd_packet(self)
929         self.test_session.update(desired_min_tx=10000,
930                                  required_min_rx=10000)
931         self.test_session.send_packet()
932         p = wait_for_bfd_packet(
933             self, pcap_time_min=time.time() - self.vpp_clock_offset)
934         # halve required min rx
935         old_required_min_rx = self.vpp_session.required_min_rx
936         self.vpp_session.modify_parameters(
937             required_min_rx=self.vpp_session.required_min_rx // 2)
938         # now we wait 0.8*3*old-req-min-rx and the session should still be up
939         self.sleep(0.8 * self.vpp_session.detect_mult *
940                    old_required_min_rx / USEC_IN_SEC,
941                    "wait before finishing poll sequence")
942         self.assert_equal(len(self.vapi.collect_events()), 0,
943                           "number of bfd events")
944         p = wait_for_bfd_packet(self)
945         # poll bit needs to be set
946         self.assertIn("P", p.sprintf("%BFD.flags%"),
947                       "Poll bit not set in BFD packet")
948         # finish poll sequence with final packet
949         final = self.test_session.create_packet()
950         final[BFD].flags = "F"
951         self.test_session.send_packet(final)
952         # now the session should time out under new conditions
953         detection_time = self.test_session.detect_mult *\
954             self.vpp_session.required_min_rx / USEC_IN_SEC
955         before = time.time()
956         e = self.vapi.wait_for_event(
957             2 * detection_time, "bfd_udp_session_details")
958         after = time.time()
959         self.assert_in_range(after - before,
960                              0.9 * detection_time,
961                              1.1 * detection_time,
962                              "time before bfd session goes down")
963         verify_event(self, e, expected_state=BFDState.down)
964
965     def test_modify_detect_mult(self):
966         """ modify detect multiplier """
967         bfd_session_up(self)
968         p = wait_for_bfd_packet(self)
969         self.vpp_session.modify_parameters(detect_mult=1)
970         p = wait_for_bfd_packet(
971             self, pcap_time_min=time.time() - self.vpp_clock_offset)
972         self.assert_equal(self.vpp_session.detect_mult,
973                           p[BFD].detect_mult,
974                           "detect mult")
975         # poll bit must not be set
976         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
977                          "Poll bit not set in BFD packet")
978         self.vpp_session.modify_parameters(detect_mult=10)
979         p = wait_for_bfd_packet(
980             self, pcap_time_min=time.time() - self.vpp_clock_offset)
981         self.assert_equal(self.vpp_session.detect_mult,
982                           p[BFD].detect_mult,
983                           "detect mult")
984         # poll bit must not be set
985         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
986                          "Poll bit not set in BFD packet")
987
988     def test_queued_poll(self):
989         """ test poll sequence queueing """
990         bfd_session_up(self)
991         p = wait_for_bfd_packet(self)
992         self.vpp_session.modify_parameters(
993             required_min_rx=2 * self.vpp_session.required_min_rx)
994         p = wait_for_bfd_packet(self)
995         poll_sequence_start = time.time()
996         poll_sequence_length_min = 0.5
997         send_final_after = time.time() + poll_sequence_length_min
998         # poll bit needs to be set
999         self.assertIn("P", p.sprintf("%BFD.flags%"),
1000                       "Poll bit not set in BFD packet")
1001         self.assert_equal(p[BFD].required_min_rx_interval,
1002                           self.vpp_session.required_min_rx,
1003                           "BFD required min rx interval")
1004         self.vpp_session.modify_parameters(
1005             required_min_rx=2 * self.vpp_session.required_min_rx)
1006         # 2nd poll sequence should be queued now
1007         # don't send the reply back yet, wait for some time to emulate
1008         # longer round-trip time
1009         packet_count = 0
1010         while time.time() < send_final_after:
1011             self.test_session.send_packet()
1012             p = wait_for_bfd_packet(self)
1013             self.assert_equal(len(self.vapi.collect_events()), 0,
1014                               "number of bfd events")
1015             self.assert_equal(p[BFD].required_min_rx_interval,
1016                               self.vpp_session.required_min_rx,
1017                               "BFD required min rx interval")
1018             packet_count += 1
1019             # poll bit must be set
1020             self.assertIn("P", p.sprintf("%BFD.flags%"),
1021                           "Poll bit not set in BFD packet")
1022         final = self.test_session.create_packet()
1023         final[BFD].flags = "F"
1024         self.test_session.send_packet(final)
1025         # finish 1st with final
1026         poll_sequence_length = time.time() - poll_sequence_start
1027         # vpp must wait for some time before starting new poll sequence
1028         poll_no_2_started = False
1029         for dummy in range(2 * packet_count):
1030             p = wait_for_bfd_packet(self)
1031             self.assert_equal(len(self.vapi.collect_events()), 0,
1032                               "number of bfd events")
1033             if "P" in p.sprintf("%BFD.flags%"):
1034                 poll_no_2_started = True
1035                 if time.time() < poll_sequence_start + poll_sequence_length:
1036                     raise Exception("VPP started 2nd poll sequence too soon")
1037                 final = self.test_session.create_packet()
1038                 final[BFD].flags = "F"
1039                 self.test_session.send_packet(final)
1040                 break
1041             else:
1042                 self.test_session.send_packet()
1043         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1044         # finish 2nd with final
1045         final = self.test_session.create_packet()
1046         final[BFD].flags = "F"
1047         self.test_session.send_packet(final)
1048         p = wait_for_bfd_packet(self)
1049         # poll bit must not be set
1050         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1051                          "Poll bit set in BFD packet")
1052
1053     # returning inconsistent results requiring retries in per-patch tests
1054     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1055     def test_poll_response(self):
1056         """ test correct response to control frame with poll bit set """
1057         bfd_session_up(self)
1058         poll = self.test_session.create_packet()
1059         poll[BFD].flags = "P"
1060         self.test_session.send_packet(poll)
1061         final = wait_for_bfd_packet(
1062             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1063         self.assertIn("F", final.sprintf("%BFD.flags%"))
1064
1065     def test_no_periodic_if_remote_demand(self):
1066         """ no periodic frames outside poll sequence if remote demand set """
1067         bfd_session_up(self)
1068         demand = self.test_session.create_packet()
1069         demand[BFD].flags = "D"
1070         self.test_session.send_packet(demand)
1071         transmit_time = 0.9 \
1072             * max(self.vpp_session.required_min_rx,
1073                   self.test_session.desired_min_tx) \
1074             / USEC_IN_SEC
1075         count = 0
1076         for dummy in range(self.test_session.detect_mult * 2):
1077             self.sleep(transmit_time)
1078             self.test_session.send_packet(demand)
1079             try:
1080                 p = wait_for_bfd_packet(self, timeout=0)
1081                 self.logger.error(ppp("Received unexpected packet:", p))
1082                 count += 1
1083             except CaptureTimeoutError:
1084                 pass
1085         events = self.vapi.collect_events()
1086         for e in events:
1087             self.logger.error("Received unexpected event: %s", e)
1088         self.assert_equal(count, 0, "number of packets received")
1089         self.assert_equal(len(events), 0, "number of events received")
1090
1091     def test_echo_looped_back(self):
1092         """ echo packets looped back """
1093         # don't need a session in this case..
1094         self.vpp_session.remove_vpp_config()
1095         self.pg0.enable_capture()
1096         echo_packet_count = 10
1097         # random source port low enough to increment a few times..
1098         udp_sport_tx = randint(1, 50000)
1099         udp_sport_rx = udp_sport_tx
1100         echo_packet = (Ether(src=self.pg0.remote_mac,
1101                              dst=self.pg0.local_mac) /
1102                        IP(src=self.pg0.remote_ip4,
1103                           dst=self.pg0.remote_ip4) /
1104                        UDP(dport=BFD.udp_dport_echo) /
1105                        Raw("this should be looped back"))
1106         for dummy in range(echo_packet_count):
1107             self.sleep(.01, "delay between echo packets")
1108             echo_packet[UDP].sport = udp_sport_tx
1109             udp_sport_tx += 1
1110             self.logger.debug(ppp("Sending packet:", echo_packet))
1111             self.pg0.add_stream(echo_packet)
1112             self.pg_start()
1113         for dummy in range(echo_packet_count):
1114             p = self.pg0.wait_for_packet(1)
1115             self.logger.debug(ppp("Got packet:", p))
1116             ether = p[Ether]
1117             self.assert_equal(self.pg0.remote_mac,
1118                               ether.dst, "Destination MAC")
1119             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1120             ip = p[IP]
1121             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1122             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1123             udp = p[UDP]
1124             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1125                               "UDP destination port")
1126             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1127             udp_sport_rx += 1
1128             # need to compare the hex payload here, otherwise BFD_vpp_echo
1129             # gets in way
1130             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1131                              scapy.compat.raw(echo_packet[UDP].payload),
1132                              "Received packet is not the echo packet sent")
1133         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1134                           "ECHO packet identifier for test purposes)")
1135
1136     def test_echo(self):
1137         """ echo function """
1138         bfd_session_up(self)
1139         self.test_session.update(required_min_echo_rx=150000)
1140         self.test_session.send_packet()
1141         detection_time = self.test_session.detect_mult *\
1142             self.vpp_session.required_min_rx / USEC_IN_SEC
1143         # echo shouldn't work without echo source set
1144         for dummy in range(10):
1145             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1146             self.sleep(sleep, "delay before sending bfd packet")
1147             self.test_session.send_packet()
1148         p = wait_for_bfd_packet(
1149             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1150         self.assert_equal(p[BFD].required_min_rx_interval,
1151                           self.vpp_session.required_min_rx,
1152                           "BFD required min rx interval")
1153         self.test_session.send_packet()
1154         self.vapi.bfd_udp_set_echo_source(
1155             sw_if_index=self.loopback0.sw_if_index)
1156         echo_seen = False
1157         # should be turned on - loopback echo packets
1158         for dummy in range(3):
1159             loop_until = time.time() + 0.75 * detection_time
1160             while time.time() < loop_until:
1161                 p = self.pg0.wait_for_packet(1)
1162                 self.logger.debug(ppp("Got packet:", p))
1163                 if p[UDP].dport == BFD.udp_dport_echo:
1164                     self.assert_equal(
1165                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1166                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1167                                         "BFD ECHO src IP equal to loopback IP")
1168                     self.logger.debug(ppp("Looping back packet:", p))
1169                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1170                                       "ECHO packet destination MAC address")
1171                     p[Ether].dst = self.pg0.local_mac
1172                     self.pg0.add_stream(p)
1173                     self.pg_start()
1174                     echo_seen = True
1175                 elif p.haslayer(BFD):
1176                     if echo_seen:
1177                         self.assertGreaterEqual(
1178                             p[BFD].required_min_rx_interval,
1179                             1000000)
1180                     if "P" in p.sprintf("%BFD.flags%"):
1181                         final = self.test_session.create_packet()
1182                         final[BFD].flags = "F"
1183                         self.test_session.send_packet(final)
1184                 else:
1185                     raise Exception(ppp("Received unknown packet:", p))
1186
1187                 self.assert_equal(len(self.vapi.collect_events()), 0,
1188                                   "number of bfd events")
1189             self.test_session.send_packet()
1190         self.assertTrue(echo_seen, "No echo packets received")
1191
1192     def test_echo_fail(self):
1193         """ session goes down if echo function fails """
1194         bfd_session_up(self)
1195         self.test_session.update(required_min_echo_rx=150000)
1196         self.test_session.send_packet()
1197         detection_time = self.test_session.detect_mult *\
1198             self.vpp_session.required_min_rx / USEC_IN_SEC
1199         self.vapi.bfd_udp_set_echo_source(
1200             sw_if_index=self.loopback0.sw_if_index)
1201         # echo function should be used now, but we will drop the echo packets
1202         verified_diag = False
1203         for dummy in range(3):
1204             loop_until = time.time() + 0.75 * detection_time
1205             while time.time() < loop_until:
1206                 p = self.pg0.wait_for_packet(1)
1207                 self.logger.debug(ppp("Got packet:", p))
1208                 if p[UDP].dport == BFD.udp_dport_echo:
1209                     # dropped
1210                     pass
1211                 elif p.haslayer(BFD):
1212                     if "P" in p.sprintf("%BFD.flags%"):
1213                         self.assertGreaterEqual(
1214                             p[BFD].required_min_rx_interval,
1215                             1000000)
1216                         final = self.test_session.create_packet()
1217                         final[BFD].flags = "F"
1218                         self.test_session.send_packet(final)
1219                     if p[BFD].state == BFDState.down:
1220                         self.assert_equal(p[BFD].diag,
1221                                           BFDDiagCode.echo_function_failed,
1222                                           BFDDiagCode)
1223                         verified_diag = True
1224                 else:
1225                     raise Exception(ppp("Received unknown packet:", p))
1226             self.test_session.send_packet()
1227         events = self.vapi.collect_events()
1228         self.assert_equal(len(events), 1, "number of bfd events")
1229         self.assert_equal(events[0].state, BFDState.down, BFDState)
1230         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1231
1232     def test_echo_stop(self):
1233         """ echo function stops if peer sets required min echo rx zero """
1234         bfd_session_up(self)
1235         self.test_session.update(required_min_echo_rx=150000)
1236         self.test_session.send_packet()
1237         self.vapi.bfd_udp_set_echo_source(
1238             sw_if_index=self.loopback0.sw_if_index)
1239         # wait for first echo packet
1240         while True:
1241             p = self.pg0.wait_for_packet(1)
1242             self.logger.debug(ppp("Got packet:", p))
1243             if p[UDP].dport == BFD.udp_dport_echo:
1244                 self.logger.debug(ppp("Looping back packet:", p))
1245                 p[Ether].dst = self.pg0.local_mac
1246                 self.pg0.add_stream(p)
1247                 self.pg_start()
1248                 break
1249             elif p.haslayer(BFD):
1250                 # ignore BFD
1251                 pass
1252             else:
1253                 raise Exception(ppp("Received unknown packet:", p))
1254         self.test_session.update(required_min_echo_rx=0)
1255         self.test_session.send_packet()
1256         # echo packets shouldn't arrive anymore
1257         for dummy in range(5):
1258             wait_for_bfd_packet(
1259                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1260             self.test_session.send_packet()
1261             events = self.vapi.collect_events()
1262             self.assert_equal(len(events), 0, "number of bfd events")
1263
1264     def test_echo_source_removed(self):
1265         """ echo function stops if echo source is removed """
1266         bfd_session_up(self)
1267         self.test_session.update(required_min_echo_rx=150000)
1268         self.test_session.send_packet()
1269         self.vapi.bfd_udp_set_echo_source(
1270             sw_if_index=self.loopback0.sw_if_index)
1271         # wait for first echo packet
1272         while True:
1273             p = self.pg0.wait_for_packet(1)
1274             self.logger.debug(ppp("Got packet:", p))
1275             if p[UDP].dport == BFD.udp_dport_echo:
1276                 self.logger.debug(ppp("Looping back packet:", p))
1277                 p[Ether].dst = self.pg0.local_mac
1278                 self.pg0.add_stream(p)
1279                 self.pg_start()
1280                 break
1281             elif p.haslayer(BFD):
1282                 # ignore BFD
1283                 pass
1284             else:
1285                 raise Exception(ppp("Received unknown packet:", p))
1286         self.vapi.bfd_udp_del_echo_source()
1287         self.test_session.send_packet()
1288         # echo packets shouldn't arrive anymore
1289         for dummy in range(5):
1290             wait_for_bfd_packet(
1291                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1292             self.test_session.send_packet()
1293             events = self.vapi.collect_events()
1294             self.assert_equal(len(events), 0, "number of bfd events")
1295
1296     def test_stale_echo(self):
1297         """ stale echo packets don't keep a session up """
1298         bfd_session_up(self)
1299         self.test_session.update(required_min_echo_rx=150000)
1300         self.vapi.bfd_udp_set_echo_source(
1301             sw_if_index=self.loopback0.sw_if_index)
1302         self.test_session.send_packet()
1303         # should be turned on - loopback echo packets
1304         echo_packet = None
1305         timeout_at = None
1306         timeout_ok = False
1307         for dummy in range(10 * self.vpp_session.detect_mult):
1308             p = self.pg0.wait_for_packet(1)
1309             if p[UDP].dport == BFD.udp_dport_echo:
1310                 if echo_packet is None:
1311                     self.logger.debug(ppp("Got first echo packet:", p))
1312                     echo_packet = p
1313                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1314                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1315                 else:
1316                     self.logger.debug(ppp("Got followup echo packet:", p))
1317                 self.logger.debug(ppp("Looping back first echo packet:", p))
1318                 echo_packet[Ether].dst = self.pg0.local_mac
1319                 self.pg0.add_stream(echo_packet)
1320                 self.pg_start()
1321             elif p.haslayer(BFD):
1322                 self.logger.debug(ppp("Got packet:", p))
1323                 if "P" in p.sprintf("%BFD.flags%"):
1324                     final = self.test_session.create_packet()
1325                     final[BFD].flags = "F"
1326                     self.test_session.send_packet(final)
1327                 if p[BFD].state == BFDState.down:
1328                     self.assertIsNotNone(
1329                         timeout_at,
1330                         "Session went down before first echo packet received")
1331                     now = time.time()
1332                     self.assertGreaterEqual(
1333                         now, timeout_at,
1334                         "Session timeout at %s, but is expected at %s" %
1335                         (now, timeout_at))
1336                     self.assert_equal(p[BFD].diag,
1337                                       BFDDiagCode.echo_function_failed,
1338                                       BFDDiagCode)
1339                     events = self.vapi.collect_events()
1340                     self.assert_equal(len(events), 1, "number of bfd events")
1341                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1342                     timeout_ok = True
1343                     break
1344             else:
1345                 raise Exception(ppp("Received unknown packet:", p))
1346             self.test_session.send_packet()
1347         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1348
1349     def test_invalid_echo_checksum(self):
1350         """ echo packets with invalid checksum don't keep a session up """
1351         bfd_session_up(self)
1352         self.test_session.update(required_min_echo_rx=150000)
1353         self.vapi.bfd_udp_set_echo_source(
1354             sw_if_index=self.loopback0.sw_if_index)
1355         self.test_session.send_packet()
1356         # should be turned on - loopback echo packets
1357         timeout_at = None
1358         timeout_ok = False
1359         for dummy in range(10 * self.vpp_session.detect_mult):
1360             p = self.pg0.wait_for_packet(1)
1361             if p[UDP].dport == BFD.udp_dport_echo:
1362                 self.logger.debug(ppp("Got echo packet:", p))
1363                 if timeout_at is None:
1364                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1365                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1366                 p[BFD_vpp_echo].checksum = getrandbits(64)
1367                 p[Ether].dst = self.pg0.local_mac
1368                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1369                 self.pg0.add_stream(p)
1370                 self.pg_start()
1371             elif p.haslayer(BFD):
1372                 self.logger.debug(ppp("Got packet:", p))
1373                 if "P" in p.sprintf("%BFD.flags%"):
1374                     final = self.test_session.create_packet()
1375                     final[BFD].flags = "F"
1376                     self.test_session.send_packet(final)
1377                 if p[BFD].state == BFDState.down:
1378                     self.assertIsNotNone(
1379                         timeout_at,
1380                         "Session went down before first echo packet received")
1381                     now = time.time()
1382                     self.assertGreaterEqual(
1383                         now, timeout_at,
1384                         "Session timeout at %s, but is expected at %s" %
1385                         (now, timeout_at))
1386                     self.assert_equal(p[BFD].diag,
1387                                       BFDDiagCode.echo_function_failed,
1388                                       BFDDiagCode)
1389                     events = self.vapi.collect_events()
1390                     self.assert_equal(len(events), 1, "number of bfd events")
1391                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1392                     timeout_ok = True
1393                     break
1394             else:
1395                 raise Exception(ppp("Received unknown packet:", p))
1396             self.test_session.send_packet()
1397         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1398
1399     def test_admin_up_down(self):
1400         """ put session admin-up and admin-down """
1401         bfd_session_up(self)
1402         self.vpp_session.admin_down()
1403         self.pg0.enable_capture()
1404         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1405         verify_event(self, e, expected_state=BFDState.admin_down)
1406         for dummy in range(2):
1407             p = wait_for_bfd_packet(self)
1408             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1409         # try to bring session up - shouldn't be possible
1410         self.test_session.update(state=BFDState.init)
1411         self.test_session.send_packet()
1412         for dummy in range(2):
1413             p = wait_for_bfd_packet(self)
1414             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1415         self.vpp_session.admin_up()
1416         self.test_session.update(state=BFDState.down)
1417         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1418         verify_event(self, e, expected_state=BFDState.down)
1419         p = wait_for_bfd_packet(
1420             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1421         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1422         self.test_session.send_packet()
1423         p = wait_for_bfd_packet(
1424             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1425         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1426         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1427         verify_event(self, e, expected_state=BFDState.init)
1428         self.test_session.update(state=BFDState.up)
1429         self.test_session.send_packet()
1430         p = wait_for_bfd_packet(
1431             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1432         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1433         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1434         verify_event(self, e, expected_state=BFDState.up)
1435
1436     def test_config_change_remote_demand(self):
1437         """ configuration change while peer in demand mode """
1438         bfd_session_up(self)
1439         demand = self.test_session.create_packet()
1440         demand[BFD].flags = "D"
1441         self.test_session.send_packet(demand)
1442         self.vpp_session.modify_parameters(
1443             required_min_rx=2 * self.vpp_session.required_min_rx)
1444         p = wait_for_bfd_packet(
1445             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1446         # poll bit must be set
1447         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1448         # terminate poll sequence
1449         final = self.test_session.create_packet()
1450         final[BFD].flags = "D+F"
1451         self.test_session.send_packet(final)
1452         # vpp should be quiet now again
1453         transmit_time = 0.9 \
1454             * max(self.vpp_session.required_min_rx,
1455                   self.test_session.desired_min_tx) \
1456             / USEC_IN_SEC
1457         count = 0
1458         for dummy in range(self.test_session.detect_mult * 2):
1459             self.sleep(transmit_time)
1460             self.test_session.send_packet(demand)
1461             try:
1462                 p = wait_for_bfd_packet(self, timeout=0)
1463                 self.logger.error(ppp("Received unexpected packet:", p))
1464                 count += 1
1465             except CaptureTimeoutError:
1466                 pass
1467         events = self.vapi.collect_events()
1468         for e in events:
1469             self.logger.error("Received unexpected event: %s", e)
1470         self.assert_equal(count, 0, "number of packets received")
1471         self.assert_equal(len(events), 0, "number of events received")
1472
1473     def test_intf_deleted(self):
1474         """ interface with bfd session deleted """
1475         intf = VppLoInterface(self)
1476         intf.config_ip4()
1477         intf.admin_up()
1478         sw_if_index = intf.sw_if_index
1479         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1480         vpp_session.add_vpp_config()
1481         vpp_session.admin_up()
1482         intf.remove_vpp_config()
1483         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1484         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1485         self.assertFalse(vpp_session.query_vpp_config())
1486
1487
1488 class BFD6TestCase(VppTestCase):
1489     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1490
1491     pg0 = None
1492     vpp_clock_offset = None
1493     vpp_session = None
1494     test_session = None
1495
1496     @classmethod
1497     def setUpClass(cls):
1498         super(BFD6TestCase, cls).setUpClass()
1499         cls.vapi.cli("set log class bfd level debug")
1500         try:
1501             cls.create_pg_interfaces([0])
1502             cls.pg0.config_ip6()
1503             cls.pg0.configure_ipv6_neighbors()
1504             cls.pg0.admin_up()
1505             cls.pg0.resolve_ndp()
1506             cls.create_loopback_interfaces(1)
1507             cls.loopback0 = cls.lo_interfaces[0]
1508             cls.loopback0.config_ip6()
1509             cls.loopback0.admin_up()
1510
1511         except Exception:
1512             super(BFD6TestCase, cls).tearDownClass()
1513             raise
1514
1515     @classmethod
1516     def tearDownClass(cls):
1517         super(BFD6TestCase, cls).tearDownClass()
1518
1519     def setUp(self):
1520         super(BFD6TestCase, self).setUp()
1521         self.factory = AuthKeyFactory()
1522         self.vapi.want_bfd_events()
1523         self.pg0.enable_capture()
1524         try:
1525             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1526                                                 self.pg0.remote_ip6,
1527                                                 af=AF_INET6)
1528             self.vpp_session.add_vpp_config()
1529             self.vpp_session.admin_up()
1530             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1531             self.logger.debug(self.vapi.cli("show adj nbr"))
1532         except BaseException:
1533             self.vapi.want_bfd_events(enable_disable=0)
1534             raise
1535
1536     def tearDown(self):
1537         if not self.vpp_dead:
1538             self.vapi.want_bfd_events(enable_disable=0)
1539         self.vapi.collect_events()  # clear the event queue
1540         super(BFD6TestCase, self).tearDown()
1541
1542     def test_session_up(self):
1543         """ bring BFD session up """
1544         bfd_session_up(self)
1545
1546     def test_session_up_by_ip(self):
1547         """ bring BFD session up - first frame looked up by address pair """
1548         self.logger.info("BFD: Sending Slow control frame")
1549         self.test_session.update(my_discriminator=randint(0, 40000000))
1550         self.test_session.send_packet()
1551         self.pg0.enable_capture()
1552         p = self.pg0.wait_for_packet(1)
1553         self.assert_equal(p[BFD].your_discriminator,
1554                           self.test_session.my_discriminator,
1555                           "BFD - your discriminator")
1556         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1557         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1558                                  state=BFDState.up)
1559         self.logger.info("BFD: Waiting for event")
1560         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1561         verify_event(self, e, expected_state=BFDState.init)
1562         self.logger.info("BFD: Sending Up")
1563         self.test_session.send_packet()
1564         self.logger.info("BFD: Waiting for event")
1565         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1566         verify_event(self, e, expected_state=BFDState.up)
1567         self.logger.info("BFD: Session is Up")
1568         self.test_session.update(state=BFDState.up)
1569         self.test_session.send_packet()
1570         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1571
1572     def test_hold_up(self):
1573         """ hold BFD session up """
1574         bfd_session_up(self)
1575         for dummy in range(self.test_session.detect_mult * 2):
1576             wait_for_bfd_packet(self)
1577             self.test_session.send_packet()
1578         self.assert_equal(len(self.vapi.collect_events()), 0,
1579                           "number of bfd events")
1580         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1581
1582     def test_echo_looped_back(self):
1583         """ echo packets looped back """
1584         # don't need a session in this case..
1585         self.vpp_session.remove_vpp_config()
1586         self.pg0.enable_capture()
1587         echo_packet_count = 10
1588         # random source port low enough to increment a few times..
1589         udp_sport_tx = randint(1, 50000)
1590         udp_sport_rx = udp_sport_tx
1591         echo_packet = (Ether(src=self.pg0.remote_mac,
1592                              dst=self.pg0.local_mac) /
1593                        IPv6(src=self.pg0.remote_ip6,
1594                             dst=self.pg0.remote_ip6) /
1595                        UDP(dport=BFD.udp_dport_echo) /
1596                        Raw("this should be looped back"))
1597         for dummy in range(echo_packet_count):
1598             self.sleep(.01, "delay between echo packets")
1599             echo_packet[UDP].sport = udp_sport_tx
1600             udp_sport_tx += 1
1601             self.logger.debug(ppp("Sending packet:", echo_packet))
1602             self.pg0.add_stream(echo_packet)
1603             self.pg_start()
1604         for dummy in range(echo_packet_count):
1605             p = self.pg0.wait_for_packet(1)
1606             self.logger.debug(ppp("Got packet:", p))
1607             ether = p[Ether]
1608             self.assert_equal(self.pg0.remote_mac,
1609                               ether.dst, "Destination MAC")
1610             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1611             ip = p[IPv6]
1612             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1613             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1614             udp = p[UDP]
1615             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1616                               "UDP destination port")
1617             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1618             udp_sport_rx += 1
1619             # need to compare the hex payload here, otherwise BFD_vpp_echo
1620             # gets in way
1621             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1622                              scapy.compat.raw(echo_packet[UDP].payload),
1623                              "Received packet is not the echo packet sent")
1624         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1625                           "ECHO packet identifier for test purposes)")
1626         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1627                           "ECHO packet identifier for test purposes)")
1628
1629     def test_echo(self):
1630         """ echo function """
1631         bfd_session_up(self)
1632         self.test_session.update(required_min_echo_rx=150000)
1633         self.test_session.send_packet()
1634         detection_time = self.test_session.detect_mult *\
1635             self.vpp_session.required_min_rx / USEC_IN_SEC
1636         # echo shouldn't work without echo source set
1637         for dummy in range(10):
1638             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1639             self.sleep(sleep, "delay before sending bfd packet")
1640             self.test_session.send_packet()
1641         p = wait_for_bfd_packet(
1642             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1643         self.assert_equal(p[BFD].required_min_rx_interval,
1644                           self.vpp_session.required_min_rx,
1645                           "BFD required min rx interval")
1646         self.test_session.send_packet()
1647         self.vapi.bfd_udp_set_echo_source(
1648             sw_if_index=self.loopback0.sw_if_index)
1649         echo_seen = False
1650         # should be turned on - loopback echo packets
1651         for dummy in range(3):
1652             loop_until = time.time() + 0.75 * detection_time
1653             while time.time() < loop_until:
1654                 p = self.pg0.wait_for_packet(1)
1655                 self.logger.debug(ppp("Got packet:", p))
1656                 if p[UDP].dport == BFD.udp_dport_echo:
1657                     self.assert_equal(
1658                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1659                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1660                                         "BFD ECHO src IP equal to loopback IP")
1661                     self.logger.debug(ppp("Looping back packet:", p))
1662                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1663                                       "ECHO packet destination MAC address")
1664                     p[Ether].dst = self.pg0.local_mac
1665                     self.pg0.add_stream(p)
1666                     self.pg_start()
1667                     echo_seen = True
1668                 elif p.haslayer(BFD):
1669                     if echo_seen:
1670                         self.assertGreaterEqual(
1671                             p[BFD].required_min_rx_interval,
1672                             1000000)
1673                     if "P" in p.sprintf("%BFD.flags%"):
1674                         final = self.test_session.create_packet()
1675                         final[BFD].flags = "F"
1676                         self.test_session.send_packet(final)
1677                 else:
1678                     raise Exception(ppp("Received unknown packet:", p))
1679
1680                 self.assert_equal(len(self.vapi.collect_events()), 0,
1681                                   "number of bfd events")
1682             self.test_session.send_packet()
1683         self.assertTrue(echo_seen, "No echo packets received")
1684
1685     def test_intf_deleted(self):
1686         """ interface with bfd session deleted """
1687         intf = VppLoInterface(self)
1688         intf.config_ip6()
1689         intf.admin_up()
1690         sw_if_index = intf.sw_if_index
1691         vpp_session = VppBFDUDPSession(
1692             self, intf, intf.remote_ip6, af=AF_INET6)
1693         vpp_session.add_vpp_config()
1694         vpp_session.admin_up()
1695         intf.remove_vpp_config()
1696         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1697         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1698         self.assertFalse(vpp_session.query_vpp_config())
1699
1700
1701 class BFDFIBTestCase(VppTestCase):
1702     """ BFD-FIB interactions (IPv6) """
1703
1704     vpp_session = None
1705     test_session = None
1706
1707     @classmethod
1708     def setUpClass(cls):
1709         super(BFDFIBTestCase, cls).setUpClass()
1710
1711     @classmethod
1712     def tearDownClass(cls):
1713         super(BFDFIBTestCase, cls).tearDownClass()
1714
1715     def setUp(self):
1716         super(BFDFIBTestCase, self).setUp()
1717         self.create_pg_interfaces(range(1))
1718
1719         self.vapi.want_bfd_events()
1720         self.pg0.enable_capture()
1721
1722         for i in self.pg_interfaces:
1723             i.admin_up()
1724             i.config_ip6()
1725             i.configure_ipv6_neighbors()
1726
1727     def tearDown(self):
1728         if not self.vpp_dead:
1729             self.vapi.want_bfd_events(enable_disable=False)
1730
1731         super(BFDFIBTestCase, self).tearDown()
1732
1733     @staticmethod
1734     def pkt_is_not_data_traffic(p):
1735         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1736         if p.haslayer(BFD) or is_ipv6_misc(p):
1737             return True
1738         return False
1739
1740     def test_session_with_fib(self):
1741         """ BFD-FIB interactions """
1742
1743         # packets to match against both of the routes
1744         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1745               IPv6(src="3001::1", dst="2001::1") /
1746               UDP(sport=1234, dport=1234) /
1747               Raw(b'\xa5' * 100)),
1748              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1749               IPv6(src="3001::1", dst="2002::1") /
1750               UDP(sport=1234, dport=1234) /
1751               Raw(b'\xa5' * 100))]
1752
1753         # A recursive and a non-recursive route via a next-hop that
1754         # will have a BFD session
1755         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1756                                   [VppRoutePath(self.pg0.remote_ip6,
1757                                                 self.pg0.sw_if_index)])
1758         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1759                                   [VppRoutePath(self.pg0.remote_ip6,
1760                                                 0xffffffff)])
1761         ip_2001_s_64.add_vpp_config()
1762         ip_2002_s_64.add_vpp_config()
1763
1764         # bring the session up now the routes are present
1765         self.vpp_session = VppBFDUDPSession(self,
1766                                             self.pg0,
1767                                             self.pg0.remote_ip6,
1768                                             af=AF_INET6)
1769         self.vpp_session.add_vpp_config()
1770         self.vpp_session.admin_up()
1771         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1772
1773         # session is up - traffic passes
1774         bfd_session_up(self)
1775
1776         self.pg0.add_stream(p)
1777         self.pg_start()
1778         for packet in p:
1779             captured = self.pg0.wait_for_packet(
1780                 1,
1781                 filter_out_fn=self.pkt_is_not_data_traffic)
1782             self.assertEqual(captured[IPv6].dst,
1783                              packet[IPv6].dst)
1784
1785         # session is up - traffic is dropped
1786         bfd_session_down(self)
1787
1788         self.pg0.add_stream(p)
1789         self.pg_start()
1790         with self.assertRaises(CaptureTimeoutError):
1791             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1792
1793         # session is up - traffic passes
1794         bfd_session_up(self)
1795
1796         self.pg0.add_stream(p)
1797         self.pg_start()
1798         for packet in p:
1799             captured = self.pg0.wait_for_packet(
1800                 1,
1801                 filter_out_fn=self.pkt_is_not_data_traffic)
1802             self.assertEqual(captured[IPv6].dst,
1803                              packet[IPv6].dst)
1804
1805
1806 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1807 class BFDTunTestCase(VppTestCase):
1808     """ BFD over GRE tunnel """
1809
1810     vpp_session = None
1811     test_session = None
1812
1813     @classmethod
1814     def setUpClass(cls):
1815         super(BFDTunTestCase, cls).setUpClass()
1816
1817     @classmethod
1818     def tearDownClass(cls):
1819         super(BFDTunTestCase, cls).tearDownClass()
1820
1821     def setUp(self):
1822         super(BFDTunTestCase, self).setUp()
1823         self.create_pg_interfaces(range(1))
1824
1825         self.vapi.want_bfd_events()
1826         self.pg0.enable_capture()
1827
1828         for i in self.pg_interfaces:
1829             i.admin_up()
1830             i.config_ip4()
1831             i.resolve_arp()
1832
1833     def tearDown(self):
1834         if not self.vpp_dead:
1835             self.vapi.want_bfd_events(enable_disable=0)
1836
1837         super(BFDTunTestCase, self).tearDown()
1838
1839     @staticmethod
1840     def pkt_is_not_data_traffic(p):
1841         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1842         if p.haslayer(BFD) or is_ipv6_misc(p):
1843             return True
1844         return False
1845
1846     def test_bfd_o_gre(self):
1847         """ BFD-o-GRE  """
1848
1849         # A GRE interface over which to run a BFD session
1850         gre_if = VppGreInterface(self,
1851                                  self.pg0.local_ip4,
1852                                  self.pg0.remote_ip4)
1853         gre_if.add_vpp_config()
1854         gre_if.admin_up()
1855         gre_if.config_ip4()
1856
1857         # bring the session up now the routes are present
1858         self.vpp_session = VppBFDUDPSession(self,
1859                                             gre_if,
1860                                             gre_if.remote_ip4,
1861                                             is_tunnel=True)
1862         self.vpp_session.add_vpp_config()
1863         self.vpp_session.admin_up()
1864
1865         self.test_session = BFDTestSession(
1866             self, gre_if, AF_INET,
1867             tunnel_header=(IP(src=self.pg0.remote_ip4,
1868                               dst=self.pg0.local_ip4) /
1869                            GRE()),
1870             phy_interface=self.pg0)
1871
1872         # packets to match against both of the routes
1873         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1874               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1875               UDP(sport=1234, dport=1234) /
1876               Raw(b'\xa5' * 100))]
1877
1878         # session is up - traffic passes
1879         bfd_session_up(self)
1880
1881         self.send_and_expect(self.pg0, p, self.pg0)
1882
1883         # bring session down
1884         bfd_session_down(self)
1885
1886
1887 class BFDSHA1TestCase(VppTestCase):
1888     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1889
1890     pg0 = None
1891     vpp_clock_offset = None
1892     vpp_session = None
1893     test_session = None
1894
1895     @classmethod
1896     def setUpClass(cls):
1897         super(BFDSHA1TestCase, cls).setUpClass()
1898         cls.vapi.cli("set log class bfd level debug")
1899         try:
1900             cls.create_pg_interfaces([0])
1901             cls.pg0.config_ip4()
1902             cls.pg0.admin_up()
1903             cls.pg0.resolve_arp()
1904
1905         except Exception:
1906             super(BFDSHA1TestCase, cls).tearDownClass()
1907             raise
1908
1909     @classmethod
1910     def tearDownClass(cls):
1911         super(BFDSHA1TestCase, cls).tearDownClass()
1912
1913     def setUp(self):
1914         super(BFDSHA1TestCase, self).setUp()
1915         self.factory = AuthKeyFactory()
1916         self.vapi.want_bfd_events()
1917         self.pg0.enable_capture()
1918
1919     def tearDown(self):
1920         if not self.vpp_dead:
1921             self.vapi.want_bfd_events(enable_disable=False)
1922         self.vapi.collect_events()  # clear the event queue
1923         super(BFDSHA1TestCase, self).tearDown()
1924
1925     def test_session_up(self):
1926         """ bring BFD session up """
1927         key = self.factory.create_random_key(self)
1928         key.add_vpp_config()
1929         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1930                                             self.pg0.remote_ip4,
1931                                             sha1_key=key)
1932         self.vpp_session.add_vpp_config()
1933         self.vpp_session.admin_up()
1934         self.test_session = BFDTestSession(
1935             self, self.pg0, AF_INET, sha1_key=key,
1936             bfd_key_id=self.vpp_session.bfd_key_id)
1937         bfd_session_up(self)
1938
1939     def test_hold_up(self):
1940         """ hold BFD session up """
1941         key = self.factory.create_random_key(self)
1942         key.add_vpp_config()
1943         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1944                                             self.pg0.remote_ip4,
1945                                             sha1_key=key)
1946         self.vpp_session.add_vpp_config()
1947         self.vpp_session.admin_up()
1948         self.test_session = BFDTestSession(
1949             self, self.pg0, AF_INET, sha1_key=key,
1950             bfd_key_id=self.vpp_session.bfd_key_id)
1951         bfd_session_up(self)
1952         for dummy in range(self.test_session.detect_mult * 2):
1953             wait_for_bfd_packet(self)
1954             self.test_session.send_packet()
1955         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1956
1957     def test_hold_up_meticulous(self):
1958         """ hold BFD session up - meticulous auth """
1959         key = self.factory.create_random_key(
1960             self, BFDAuthType.meticulous_keyed_sha1)
1961         key.add_vpp_config()
1962         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1963                                             self.pg0.remote_ip4, sha1_key=key)
1964         self.vpp_session.add_vpp_config()
1965         self.vpp_session.admin_up()
1966         # specify sequence number so that it wraps
1967         self.test_session = BFDTestSession(
1968             self, self.pg0, AF_INET, sha1_key=key,
1969             bfd_key_id=self.vpp_session.bfd_key_id,
1970             our_seq_number=0xFFFFFFFF - 4)
1971         bfd_session_up(self)
1972         for dummy in range(30):
1973             wait_for_bfd_packet(self)
1974             self.test_session.inc_seq_num()
1975             self.test_session.send_packet()
1976         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1977
1978     def test_send_bad_seq_number(self):
1979         """ session is not kept alive by msgs with bad sequence numbers"""
1980         key = self.factory.create_random_key(
1981             self, BFDAuthType.meticulous_keyed_sha1)
1982         key.add_vpp_config()
1983         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1984                                             self.pg0.remote_ip4, sha1_key=key)
1985         self.vpp_session.add_vpp_config()
1986         self.test_session = BFDTestSession(
1987             self, self.pg0, AF_INET, sha1_key=key,
1988             bfd_key_id=self.vpp_session.bfd_key_id)
1989         bfd_session_up(self)
1990         detection_time = self.test_session.detect_mult *\
1991             self.vpp_session.required_min_rx / USEC_IN_SEC
1992         send_until = time.time() + 2 * detection_time
1993         while time.time() < send_until:
1994             self.test_session.send_packet()
1995             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1996                        "time between bfd packets")
1997         e = self.vapi.collect_events()
1998         # session should be down now, because the sequence numbers weren't
1999         # updated
2000         self.assert_equal(len(e), 1, "number of bfd events")
2001         verify_event(self, e[0], expected_state=BFDState.down)
2002
2003     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2004                                        legitimate_test_session,
2005                                        rogue_test_session,
2006                                        rogue_bfd_values=None):
2007         """ execute a rogue session interaction scenario
2008
2009         1. create vpp session, add config
2010         2. bring the legitimate session up
2011         3. copy the bfd values from legitimate session to rogue session
2012         4. apply rogue_bfd_values to rogue session
2013         5. set rogue session state to down
2014         6. send message to take the session down from the rogue session
2015         7. assert that the legitimate session is unaffected
2016         """
2017
2018         self.vpp_session = vpp_bfd_udp_session
2019         self.vpp_session.add_vpp_config()
2020         self.test_session = legitimate_test_session
2021         # bring vpp session up
2022         bfd_session_up(self)
2023         # send packet from rogue session
2024         rogue_test_session.update(
2025             my_discriminator=self.test_session.my_discriminator,
2026             your_discriminator=self.test_session.your_discriminator,
2027             desired_min_tx=self.test_session.desired_min_tx,
2028             required_min_rx=self.test_session.required_min_rx,
2029             detect_mult=self.test_session.detect_mult,
2030             diag=self.test_session.diag,
2031             state=self.test_session.state,
2032             auth_type=self.test_session.auth_type)
2033         if rogue_bfd_values:
2034             rogue_test_session.update(**rogue_bfd_values)
2035         rogue_test_session.update(state=BFDState.down)
2036         rogue_test_session.send_packet()
2037         wait_for_bfd_packet(self)
2038         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2039
2040     def test_mismatch_auth(self):
2041         """ session is not brought down by unauthenticated msg """
2042         key = self.factory.create_random_key(self)
2043         key.add_vpp_config()
2044         vpp_session = VppBFDUDPSession(
2045             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2046         legitimate_test_session = BFDTestSession(
2047             self, self.pg0, AF_INET, sha1_key=key,
2048             bfd_key_id=vpp_session.bfd_key_id)
2049         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2050         self.execute_rogue_session_scenario(vpp_session,
2051                                             legitimate_test_session,
2052                                             rogue_test_session)
2053
2054     def test_mismatch_bfd_key_id(self):
2055         """ session is not brought down by msg with non-existent key-id """
2056         key = self.factory.create_random_key(self)
2057         key.add_vpp_config()
2058         vpp_session = VppBFDUDPSession(
2059             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2060         # pick a different random bfd key id
2061         x = randint(0, 255)
2062         while x == vpp_session.bfd_key_id:
2063             x = randint(0, 255)
2064         legitimate_test_session = BFDTestSession(
2065             self, self.pg0, AF_INET, sha1_key=key,
2066             bfd_key_id=vpp_session.bfd_key_id)
2067         rogue_test_session = BFDTestSession(
2068             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2069         self.execute_rogue_session_scenario(vpp_session,
2070                                             legitimate_test_session,
2071                                             rogue_test_session)
2072
2073     def test_mismatched_auth_type(self):
2074         """ session is not brought down by msg with wrong auth type """
2075         key = self.factory.create_random_key(self)
2076         key.add_vpp_config()
2077         vpp_session = VppBFDUDPSession(
2078             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2079         legitimate_test_session = BFDTestSession(
2080             self, self.pg0, AF_INET, sha1_key=key,
2081             bfd_key_id=vpp_session.bfd_key_id)
2082         rogue_test_session = BFDTestSession(
2083             self, self.pg0, AF_INET, sha1_key=key,
2084             bfd_key_id=vpp_session.bfd_key_id)
2085         self.execute_rogue_session_scenario(
2086             vpp_session, legitimate_test_session, rogue_test_session,
2087             {'auth_type': BFDAuthType.keyed_md5})
2088
2089     def test_restart(self):
2090         """ simulate remote peer restart and resynchronization """
2091         key = self.factory.create_random_key(
2092             self, BFDAuthType.meticulous_keyed_sha1)
2093         key.add_vpp_config()
2094         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2095                                             self.pg0.remote_ip4, sha1_key=key)
2096         self.vpp_session.add_vpp_config()
2097         self.test_session = BFDTestSession(
2098             self, self.pg0, AF_INET, sha1_key=key,
2099             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2100         bfd_session_up(self)
2101         # don't send any packets for 2*detection_time
2102         detection_time = self.test_session.detect_mult *\
2103             self.vpp_session.required_min_rx / USEC_IN_SEC
2104         self.sleep(2 * detection_time, "simulating peer restart")
2105         events = self.vapi.collect_events()
2106         self.assert_equal(len(events), 1, "number of bfd events")
2107         verify_event(self, events[0], expected_state=BFDState.down)
2108         self.test_session.update(state=BFDState.down)
2109         # reset sequence number
2110         self.test_session.our_seq_number = 0
2111         self.test_session.vpp_seq_number = None
2112         # now throw away any pending packets
2113         self.pg0.enable_capture()
2114         self.test_session.my_discriminator = 0
2115         bfd_session_up(self)
2116
2117
2118 class BFDAuthOnOffTestCase(VppTestCase):
2119     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2120
2121     pg0 = None
2122     vpp_session = None
2123     test_session = None
2124
2125     @classmethod
2126     def setUpClass(cls):
2127         super(BFDAuthOnOffTestCase, cls).setUpClass()
2128         cls.vapi.cli("set log class bfd level debug")
2129         try:
2130             cls.create_pg_interfaces([0])
2131             cls.pg0.config_ip4()
2132             cls.pg0.admin_up()
2133             cls.pg0.resolve_arp()
2134
2135         except Exception:
2136             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2137             raise
2138
2139     @classmethod
2140     def tearDownClass(cls):
2141         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2142
2143     def setUp(self):
2144         super(BFDAuthOnOffTestCase, self).setUp()
2145         self.factory = AuthKeyFactory()
2146         self.vapi.want_bfd_events()
2147         self.pg0.enable_capture()
2148
2149     def tearDown(self):
2150         if not self.vpp_dead:
2151             self.vapi.want_bfd_events(enable_disable=False)
2152         self.vapi.collect_events()  # clear the event queue
2153         super(BFDAuthOnOffTestCase, self).tearDown()
2154
2155     def test_auth_on_immediate(self):
2156         """ turn auth on without disturbing session state (immediate) """
2157         key = self.factory.create_random_key(self)
2158         key.add_vpp_config()
2159         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2160                                             self.pg0.remote_ip4)
2161         self.vpp_session.add_vpp_config()
2162         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2163         bfd_session_up(self)
2164         for dummy in range(self.test_session.detect_mult * 2):
2165             p = wait_for_bfd_packet(self)
2166             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2167             self.test_session.send_packet()
2168         self.vpp_session.activate_auth(key)
2169         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2170         self.test_session.sha1_key = key
2171         for dummy in range(self.test_session.detect_mult * 2):
2172             p = wait_for_bfd_packet(self)
2173             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2174             self.test_session.send_packet()
2175         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2176         self.assert_equal(len(self.vapi.collect_events()), 0,
2177                           "number of bfd events")
2178
2179     def test_auth_off_immediate(self):
2180         """ turn auth off without disturbing session state (immediate) """
2181         key = self.factory.create_random_key(self)
2182         key.add_vpp_config()
2183         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2184                                             self.pg0.remote_ip4, sha1_key=key)
2185         self.vpp_session.add_vpp_config()
2186         self.test_session = BFDTestSession(
2187             self, self.pg0, AF_INET, sha1_key=key,
2188             bfd_key_id=self.vpp_session.bfd_key_id)
2189         bfd_session_up(self)
2190         # self.vapi.want_bfd_events(enable_disable=0)
2191         for dummy in range(self.test_session.detect_mult * 2):
2192             p = wait_for_bfd_packet(self)
2193             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2194             self.test_session.inc_seq_num()
2195             self.test_session.send_packet()
2196         self.vpp_session.deactivate_auth()
2197         self.test_session.bfd_key_id = None
2198         self.test_session.sha1_key = None
2199         for dummy in range(self.test_session.detect_mult * 2):
2200             p = wait_for_bfd_packet(self)
2201             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2202             self.test_session.inc_seq_num()
2203             self.test_session.send_packet()
2204         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2205         self.assert_equal(len(self.vapi.collect_events()), 0,
2206                           "number of bfd events")
2207
2208     def test_auth_change_key_immediate(self):
2209         """ change auth key without disturbing session state (immediate) """
2210         key1 = self.factory.create_random_key(self)
2211         key1.add_vpp_config()
2212         key2 = self.factory.create_random_key(self)
2213         key2.add_vpp_config()
2214         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2215                                             self.pg0.remote_ip4, sha1_key=key1)
2216         self.vpp_session.add_vpp_config()
2217         self.test_session = BFDTestSession(
2218             self, self.pg0, AF_INET, sha1_key=key1,
2219             bfd_key_id=self.vpp_session.bfd_key_id)
2220         bfd_session_up(self)
2221         for dummy in range(self.test_session.detect_mult * 2):
2222             p = wait_for_bfd_packet(self)
2223             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2224             self.test_session.send_packet()
2225         self.vpp_session.activate_auth(key2)
2226         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2227         self.test_session.sha1_key = key2
2228         for dummy in range(self.test_session.detect_mult * 2):
2229             p = wait_for_bfd_packet(self)
2230             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2231             self.test_session.send_packet()
2232         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2233         self.assert_equal(len(self.vapi.collect_events()), 0,
2234                           "number of bfd events")
2235
2236     def test_auth_on_delayed(self):
2237         """ turn auth on without disturbing session state (delayed) """
2238         key = self.factory.create_random_key(self)
2239         key.add_vpp_config()
2240         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2241                                             self.pg0.remote_ip4)
2242         self.vpp_session.add_vpp_config()
2243         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2244         bfd_session_up(self)
2245         for dummy in range(self.test_session.detect_mult * 2):
2246             wait_for_bfd_packet(self)
2247             self.test_session.send_packet()
2248         self.vpp_session.activate_auth(key, delayed=True)
2249         for dummy in range(self.test_session.detect_mult * 2):
2250             p = wait_for_bfd_packet(self)
2251             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2252             self.test_session.send_packet()
2253         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2254         self.test_session.sha1_key = key
2255         self.test_session.send_packet()
2256         for dummy in range(self.test_session.detect_mult * 2):
2257             p = wait_for_bfd_packet(self)
2258             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2259             self.test_session.send_packet()
2260         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2261         self.assert_equal(len(self.vapi.collect_events()), 0,
2262                           "number of bfd events")
2263
2264     def test_auth_off_delayed(self):
2265         """ turn auth off without disturbing session state (delayed) """
2266         key = self.factory.create_random_key(self)
2267         key.add_vpp_config()
2268         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2269                                             self.pg0.remote_ip4, sha1_key=key)
2270         self.vpp_session.add_vpp_config()
2271         self.test_session = BFDTestSession(
2272             self, self.pg0, AF_INET, sha1_key=key,
2273             bfd_key_id=self.vpp_session.bfd_key_id)
2274         bfd_session_up(self)
2275         for dummy in range(self.test_session.detect_mult * 2):
2276             p = wait_for_bfd_packet(self)
2277             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2278             self.test_session.send_packet()
2279         self.vpp_session.deactivate_auth(delayed=True)
2280         for dummy in range(self.test_session.detect_mult * 2):
2281             p = wait_for_bfd_packet(self)
2282             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2283             self.test_session.send_packet()
2284         self.test_session.bfd_key_id = None
2285         self.test_session.sha1_key = None
2286         self.test_session.send_packet()
2287         for dummy in range(self.test_session.detect_mult * 2):
2288             p = wait_for_bfd_packet(self)
2289             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2290             self.test_session.send_packet()
2291         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2292         self.assert_equal(len(self.vapi.collect_events()), 0,
2293                           "number of bfd events")
2294
2295     def test_auth_change_key_delayed(self):
2296         """ change auth key without disturbing session state (delayed) """
2297         key1 = self.factory.create_random_key(self)
2298         key1.add_vpp_config()
2299         key2 = self.factory.create_random_key(self)
2300         key2.add_vpp_config()
2301         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2302                                             self.pg0.remote_ip4, sha1_key=key1)
2303         self.vpp_session.add_vpp_config()
2304         self.vpp_session.admin_up()
2305         self.test_session = BFDTestSession(
2306             self, self.pg0, AF_INET, sha1_key=key1,
2307             bfd_key_id=self.vpp_session.bfd_key_id)
2308         bfd_session_up(self)
2309         for dummy in range(self.test_session.detect_mult * 2):
2310             p = wait_for_bfd_packet(self)
2311             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2312             self.test_session.send_packet()
2313         self.vpp_session.activate_auth(key2, delayed=True)
2314         for dummy in range(self.test_session.detect_mult * 2):
2315             p = wait_for_bfd_packet(self)
2316             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2317             self.test_session.send_packet()
2318         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2319         self.test_session.sha1_key = key2
2320         self.test_session.send_packet()
2321         for dummy in range(self.test_session.detect_mult * 2):
2322             p = wait_for_bfd_packet(self)
2323             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2324             self.test_session.send_packet()
2325         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2326         self.assert_equal(len(self.vapi.collect_events()), 0,
2327                           "number of bfd events")
2328
2329
2330 class BFDCLITestCase(VppTestCase):
2331     """Bidirectional Forwarding Detection (BFD) (CLI) """
2332     pg0 = None
2333
2334     @classmethod
2335     def setUpClass(cls):
2336         super(BFDCLITestCase, cls).setUpClass()
2337         cls.vapi.cli("set log class bfd level debug")
2338         try:
2339             cls.create_pg_interfaces((0,))
2340             cls.pg0.config_ip4()
2341             cls.pg0.config_ip6()
2342             cls.pg0.resolve_arp()
2343             cls.pg0.resolve_ndp()
2344
2345         except Exception:
2346             super(BFDCLITestCase, cls).tearDownClass()
2347             raise
2348
2349     @classmethod
2350     def tearDownClass(cls):
2351         super(BFDCLITestCase, cls).tearDownClass()
2352
2353     def setUp(self):
2354         super(BFDCLITestCase, self).setUp()
2355         self.factory = AuthKeyFactory()
2356         self.pg0.enable_capture()
2357
2358     def tearDown(self):
2359         try:
2360             self.vapi.want_bfd_events(enable_disable=False)
2361         except UnexpectedApiReturnValueError:
2362             # some tests aren't subscribed, so this is not an issue
2363             pass
2364         self.vapi.collect_events()  # clear the event queue
2365         super(BFDCLITestCase, self).tearDown()
2366
2367     def cli_verify_no_response(self, cli):
2368         """ execute a CLI, asserting that the response is empty """
2369         self.assert_equal(self.vapi.cli(cli),
2370                           "",
2371                           "CLI command response")
2372
2373     def cli_verify_response(self, cli, expected):
2374         """ execute a CLI, asserting that the response matches expectation """
2375         try:
2376             reply = self.vapi.cli(cli)
2377         except CliFailedCommandError as cli_error:
2378             reply = str(cli_error)
2379         self.assert_equal(reply.strip(),
2380                           expected,
2381                           "CLI command response")
2382
2383     def test_show(self):
2384         """ show commands """
2385         k1 = self.factory.create_random_key(self)
2386         k1.add_vpp_config()
2387         k2 = self.factory.create_random_key(
2388             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2389         k2.add_vpp_config()
2390         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2391         s1.add_vpp_config()
2392         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2393                               sha1_key=k2)
2394         s2.add_vpp_config()
2395         self.logger.info(self.vapi.ppcli("show bfd keys"))
2396         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2397         self.logger.info(self.vapi.ppcli("show bfd"))
2398
2399     def test_set_del_sha1_key(self):
2400         """ set/delete SHA1 auth key """
2401         k = self.factory.create_random_key(self)
2402         self.registry.register(k, self.logger)
2403         self.cli_verify_no_response(
2404             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2405             (k.conf_key_id,
2406                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2407         self.assertTrue(k.query_vpp_config())
2408         self.vpp_session = VppBFDUDPSession(
2409             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2410         self.vpp_session.add_vpp_config()
2411         self.test_session = \
2412             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2413                            bfd_key_id=self.vpp_session.bfd_key_id)
2414         self.vapi.want_bfd_events()
2415         bfd_session_up(self)
2416         bfd_session_down(self)
2417         # try to replace the secret for the key - should fail because the key
2418         # is in-use
2419         k2 = self.factory.create_random_key(self)
2420         self.cli_verify_response(
2421             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2422             (k.conf_key_id,
2423                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2424             "bfd key set: `bfd_auth_set_key' API call failed, "
2425             "rv=-103:BFD object in use")
2426         # manipulating the session using old secret should still work
2427         bfd_session_up(self)
2428         bfd_session_down(self)
2429         self.vpp_session.remove_vpp_config()
2430         self.cli_verify_no_response(
2431             "bfd key del conf-key-id %s" % k.conf_key_id)
2432         self.assertFalse(k.query_vpp_config())
2433
2434     def test_set_del_meticulous_sha1_key(self):
2435         """ set/delete meticulous SHA1 auth key """
2436         k = self.factory.create_random_key(
2437             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2438         self.registry.register(k, self.logger)
2439         self.cli_verify_no_response(
2440             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2441             (k.conf_key_id,
2442                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2443         self.assertTrue(k.query_vpp_config())
2444         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2445                                             self.pg0.remote_ip6, af=AF_INET6,
2446                                             sha1_key=k)
2447         self.vpp_session.add_vpp_config()
2448         self.vpp_session.admin_up()
2449         self.test_session = \
2450             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2451                            bfd_key_id=self.vpp_session.bfd_key_id)
2452         self.vapi.want_bfd_events()
2453         bfd_session_up(self)
2454         bfd_session_down(self)
2455         # try to replace the secret for the key - should fail because the key
2456         # is in-use
2457         k2 = self.factory.create_random_key(self)
2458         self.cli_verify_response(
2459             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2460             (k.conf_key_id,
2461                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2462             "bfd key set: `bfd_auth_set_key' API call failed, "
2463             "rv=-103:BFD object in use")
2464         # manipulating the session using old secret should still work
2465         bfd_session_up(self)
2466         bfd_session_down(self)
2467         self.vpp_session.remove_vpp_config()
2468         self.cli_verify_no_response(
2469             "bfd key del conf-key-id %s" % k.conf_key_id)
2470         self.assertFalse(k.query_vpp_config())
2471
2472     def test_add_mod_del_bfd_udp(self):
2473         """ create/modify/delete IPv4 BFD UDP session """
2474         vpp_session = VppBFDUDPSession(
2475             self, self.pg0, self.pg0.remote_ip4)
2476         self.registry.register(vpp_session, self.logger)
2477         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2478             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2479             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2480                                 self.pg0.remote_ip4,
2481                                 vpp_session.desired_min_tx,
2482                                 vpp_session.required_min_rx,
2483                                 vpp_session.detect_mult)
2484         self.cli_verify_no_response(cli_add_cmd)
2485         # 2nd add should fail
2486         self.cli_verify_response(
2487             cli_add_cmd,
2488             "bfd udp session add: `bfd_add_add_session' API call"
2489             " failed, rv=-101:Duplicate BFD object")
2490         verify_bfd_session_config(self, vpp_session)
2491         mod_session = VppBFDUDPSession(
2492             self, self.pg0, self.pg0.remote_ip4,
2493             required_min_rx=2 * vpp_session.required_min_rx,
2494             desired_min_tx=3 * vpp_session.desired_min_tx,
2495             detect_mult=4 * vpp_session.detect_mult)
2496         self.cli_verify_no_response(
2497             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2498             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2499             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2500              mod_session.desired_min_tx, mod_session.required_min_rx,
2501              mod_session.detect_mult))
2502         verify_bfd_session_config(self, mod_session)
2503         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2504             "peer-addr %s" % (self.pg0.name,
2505                               self.pg0.local_ip4, self.pg0.remote_ip4)
2506         self.cli_verify_no_response(cli_del_cmd)
2507         # 2nd del is expected to fail
2508         self.cli_verify_response(
2509             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2510             " failed, rv=-102:No such BFD object")
2511         self.assertFalse(vpp_session.query_vpp_config())
2512
2513     def test_add_mod_del_bfd_udp6(self):
2514         """ create/modify/delete IPv6 BFD UDP session """
2515         vpp_session = VppBFDUDPSession(
2516             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2517         self.registry.register(vpp_session, self.logger)
2518         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2519             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2520             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2521                                 self.pg0.remote_ip6,
2522                                 vpp_session.desired_min_tx,
2523                                 vpp_session.required_min_rx,
2524                                 vpp_session.detect_mult)
2525         self.cli_verify_no_response(cli_add_cmd)
2526         # 2nd add should fail
2527         self.cli_verify_response(
2528             cli_add_cmd,
2529             "bfd udp session add: `bfd_add_add_session' API call"
2530             " failed, rv=-101:Duplicate BFD object")
2531         verify_bfd_session_config(self, vpp_session)
2532         mod_session = VppBFDUDPSession(
2533             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2534             required_min_rx=2 * vpp_session.required_min_rx,
2535             desired_min_tx=3 * vpp_session.desired_min_tx,
2536             detect_mult=4 * vpp_session.detect_mult)
2537         self.cli_verify_no_response(
2538             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2539             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2540             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2541              mod_session.desired_min_tx,
2542              mod_session.required_min_rx, mod_session.detect_mult))
2543         verify_bfd_session_config(self, mod_session)
2544         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2545             "peer-addr %s" % (self.pg0.name,
2546                               self.pg0.local_ip6, self.pg0.remote_ip6)
2547         self.cli_verify_no_response(cli_del_cmd)
2548         # 2nd del is expected to fail
2549         self.cli_verify_response(
2550             cli_del_cmd,
2551             "bfd udp session del: `bfd_udp_del_session' API call"
2552             " failed, rv=-102:No such BFD object")
2553         self.assertFalse(vpp_session.query_vpp_config())
2554
2555     def test_add_mod_del_bfd_udp_auth(self):
2556         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2557         key = self.factory.create_random_key(self)
2558         key.add_vpp_config()
2559         vpp_session = VppBFDUDPSession(
2560             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2561         self.registry.register(vpp_session, self.logger)
2562         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2563             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2564             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2565             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2566                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2567                vpp_session.detect_mult, key.conf_key_id,
2568                vpp_session.bfd_key_id)
2569         self.cli_verify_no_response(cli_add_cmd)
2570         # 2nd add should fail
2571         self.cli_verify_response(
2572             cli_add_cmd,
2573             "bfd udp session add: `bfd_add_add_session' API call"
2574             " failed, rv=-101:Duplicate BFD object")
2575         verify_bfd_session_config(self, vpp_session)
2576         mod_session = VppBFDUDPSession(
2577             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2578             bfd_key_id=vpp_session.bfd_key_id,
2579             required_min_rx=2 * vpp_session.required_min_rx,
2580             desired_min_tx=3 * vpp_session.desired_min_tx,
2581             detect_mult=4 * vpp_session.detect_mult)
2582         self.cli_verify_no_response(
2583             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2584             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2585             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2586              mod_session.desired_min_tx,
2587              mod_session.required_min_rx, mod_session.detect_mult))
2588         verify_bfd_session_config(self, mod_session)
2589         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2590             "peer-addr %s" % (self.pg0.name,
2591                               self.pg0.local_ip4, self.pg0.remote_ip4)
2592         self.cli_verify_no_response(cli_del_cmd)
2593         # 2nd del is expected to fail
2594         self.cli_verify_response(
2595             cli_del_cmd,
2596             "bfd udp session del: `bfd_udp_del_session' API call"
2597             " failed, rv=-102:No such BFD object")
2598         self.assertFalse(vpp_session.query_vpp_config())
2599
2600     def test_add_mod_del_bfd_udp6_auth(self):
2601         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2602         key = self.factory.create_random_key(
2603             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2604         key.add_vpp_config()
2605         vpp_session = VppBFDUDPSession(
2606             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2607         self.registry.register(vpp_session, self.logger)
2608         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2609             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2610             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2611             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2612                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2613                vpp_session.detect_mult, key.conf_key_id,
2614                vpp_session.bfd_key_id)
2615         self.cli_verify_no_response(cli_add_cmd)
2616         # 2nd add should fail
2617         self.cli_verify_response(
2618             cli_add_cmd,
2619             "bfd udp session add: `bfd_add_add_session' API call"
2620             " failed, rv=-101:Duplicate BFD object")
2621         verify_bfd_session_config(self, vpp_session)
2622         mod_session = VppBFDUDPSession(
2623             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2624             bfd_key_id=vpp_session.bfd_key_id,
2625             required_min_rx=2 * vpp_session.required_min_rx,
2626             desired_min_tx=3 * vpp_session.desired_min_tx,
2627             detect_mult=4 * vpp_session.detect_mult)
2628         self.cli_verify_no_response(
2629             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2630             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2631             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2632              mod_session.desired_min_tx,
2633              mod_session.required_min_rx, mod_session.detect_mult))
2634         verify_bfd_session_config(self, mod_session)
2635         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2636             "peer-addr %s" % (self.pg0.name,
2637                               self.pg0.local_ip6, self.pg0.remote_ip6)
2638         self.cli_verify_no_response(cli_del_cmd)
2639         # 2nd del is expected to fail
2640         self.cli_verify_response(
2641             cli_del_cmd,
2642             "bfd udp session del: `bfd_udp_del_session' API call"
2643             " failed, rv=-102:No such BFD object")
2644         self.assertFalse(vpp_session.query_vpp_config())
2645
2646     def test_auth_on_off(self):
2647         """ turn authentication on and off """
2648         key = self.factory.create_random_key(
2649             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2650         key.add_vpp_config()
2651         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2652         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2653                                         sha1_key=key)
2654         session.add_vpp_config()
2655         cli_activate = \
2656             "bfd udp session auth activate interface %s local-addr %s "\
2657             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2658             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2659                key.conf_key_id, auth_session.bfd_key_id)
2660         self.cli_verify_no_response(cli_activate)
2661         verify_bfd_session_config(self, auth_session)
2662         self.cli_verify_no_response(cli_activate)
2663         verify_bfd_session_config(self, auth_session)
2664         cli_deactivate = \
2665             "bfd udp session auth deactivate interface %s local-addr %s "\
2666             "peer-addr %s "\
2667             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2668         self.cli_verify_no_response(cli_deactivate)
2669         verify_bfd_session_config(self, session)
2670         self.cli_verify_no_response(cli_deactivate)
2671         verify_bfd_session_config(self, session)
2672
2673     def test_auth_on_off_delayed(self):
2674         """ turn authentication on and off (delayed) """
2675         key = self.factory.create_random_key(
2676             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2677         key.add_vpp_config()
2678         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2679         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2680                                         sha1_key=key)
2681         session.add_vpp_config()
2682         cli_activate = \
2683             "bfd udp session auth activate interface %s local-addr %s "\
2684             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2685             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2686                key.conf_key_id, auth_session.bfd_key_id)
2687         self.cli_verify_no_response(cli_activate)
2688         verify_bfd_session_config(self, auth_session)
2689         self.cli_verify_no_response(cli_activate)
2690         verify_bfd_session_config(self, auth_session)
2691         cli_deactivate = \
2692             "bfd udp session auth deactivate interface %s local-addr %s "\
2693             "peer-addr %s delayed yes"\
2694             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2695         self.cli_verify_no_response(cli_deactivate)
2696         verify_bfd_session_config(self, session)
2697         self.cli_verify_no_response(cli_deactivate)
2698         verify_bfd_session_config(self, session)
2699
2700     def test_admin_up_down(self):
2701         """ put session admin-up and admin-down """
2702         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2703         session.add_vpp_config()
2704         cli_down = \
2705             "bfd udp session set-flags admin down interface %s local-addr %s "\
2706             "peer-addr %s "\
2707             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2708         cli_up = \
2709             "bfd udp session set-flags admin up interface %s local-addr %s "\
2710             "peer-addr %s "\
2711             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2712         self.cli_verify_no_response(cli_down)
2713         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2714         self.cli_verify_no_response(cli_up)
2715         verify_bfd_session_config(self, session, state=BFDState.down)
2716
2717     def test_set_del_udp_echo_source(self):
2718         """ set/del udp echo source """
2719         self.create_loopback_interfaces(1)
2720         self.loopback0 = self.lo_interfaces[0]
2721         self.loopback0.admin_up()
2722         self.cli_verify_response("show bfd echo-source",
2723                                  "UDP echo source is not set.")
2724         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2725         self.cli_verify_no_response(cli_set)
2726         self.cli_verify_response("show bfd echo-source",
2727                                  "UDP echo source is: %s\n"
2728                                  "IPv4 address usable as echo source: none\n"
2729                                  "IPv6 address usable as echo source: none" %
2730                                  self.loopback0.name)
2731         self.loopback0.config_ip4()
2732         echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2733             self.loopback0.local_ip4)) ^ 1))
2734         self.cli_verify_response("show bfd echo-source",
2735                                  "UDP echo source is: %s\n"
2736                                  "IPv4 address usable as echo source: %s\n"
2737                                  "IPv6 address usable as echo source: none" %
2738                                  (self.loopback0.name, echo_ip4))
2739         echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2740             self.loopback0.local_ip6)) ^ 1))
2741         self.loopback0.config_ip6()
2742         self.cli_verify_response("show bfd echo-source",
2743                                  "UDP echo source is: %s\n"
2744                                  "IPv4 address usable as echo source: %s\n"
2745                                  "IPv6 address usable as echo source: %s" %
2746                                  (self.loopback0.name, echo_ip4, echo_ip6))
2747         cli_del = "bfd udp echo-source del"
2748         self.cli_verify_no_response(cli_del)
2749         self.cli_verify_response("show bfd echo-source",
2750                                  "UDP echo source is not set.")
2751
2752
2753 if __name__ == '__main__':
2754     unittest.main(testRunner=VppTestRunner)