tests: tag the tests that do not work with multi-worker configuration
[vpp.git] / src / vnet / bfd / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import ipaddress
9 import reprlib
10 import time
11 import unittest
12 from random import randint, shuffle, getrandbits
13 from socket import AF_INET, AF_INET6, inet_ntop
14 from struct import pack, unpack
15
16 import scapy.compat
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
21
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23     BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import tag_fixme_vpp_workers
25 from framework import VppTestCase, VppTestRunner, running_extended_tests
26 from framework import tag_run_solo
27 from util import ppp
28 from vpp_ip import DpoProto
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_lo_interface import VppLoInterface
31 from vpp_papi_provider import UnexpectedApiReturnValueError, \
32     CliFailedCommandError
33 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
34 from vpp_gre_interface import VppGreInterface
35 from vpp_papi import VppEnum
36
37 USEC_IN_SEC = 1000000
38
39
40 class AuthKeyFactory(object):
41     """Factory class for creating auth keys with unique conf key ID"""
42
43     def __init__(self):
44         self._conf_key_ids = {}
45
46     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
47         """ create a random key with unique conf key id """
48         conf_key_id = randint(0, 0xFFFFFFFF)
49         while conf_key_id in self._conf_key_ids:
50             conf_key_id = randint(0, 0xFFFFFFFF)
51         self._conf_key_ids[conf_key_id] = 1
52         key = scapy.compat.raw(
53             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
54         return VppBFDAuthKey(test=test, auth_type=auth_type,
55                              conf_key_id=conf_key_id, key=key)
56
57
58 class BFDAPITestCase(VppTestCase):
59     """Bidirectional Forwarding Detection (BFD) - API"""
60
61     pg0 = None
62     pg1 = None
63
64     @classmethod
65     def setUpClass(cls):
66         super(BFDAPITestCase, cls).setUpClass()
67         cls.vapi.cli("set log class bfd level debug")
68         try:
69             cls.create_pg_interfaces(range(2))
70             for i in cls.pg_interfaces:
71                 i.config_ip4()
72                 i.config_ip6()
73                 i.resolve_arp()
74
75         except Exception:
76             super(BFDAPITestCase, cls).tearDownClass()
77             raise
78
79     @classmethod
80     def tearDownClass(cls):
81         super(BFDAPITestCase, cls).tearDownClass()
82
83     def setUp(self):
84         super(BFDAPITestCase, self).setUp()
85         self.factory = AuthKeyFactory()
86
87     def test_add_bfd(self):
88         """ create a BFD session """
89         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
90         session.add_vpp_config()
91         self.logger.debug("Session state is %s", session.state)
92         session.remove_vpp_config()
93         session.add_vpp_config()
94         self.logger.debug("Session state is %s", session.state)
95         session.remove_vpp_config()
96
97     def test_double_add(self):
98         """ create the same BFD session twice (negative case) """
99         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
100         session.add_vpp_config()
101
102         with self.vapi.assert_negative_api_retval():
103             session.add_vpp_config()
104
105         session.remove_vpp_config()
106
107     def test_add_bfd6(self):
108         """ create IPv6 BFD session """
109         session = VppBFDUDPSession(
110             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
111         session.add_vpp_config()
112         self.logger.debug("Session state is %s", session.state)
113         session.remove_vpp_config()
114         session.add_vpp_config()
115         self.logger.debug("Session state is %s", session.state)
116         session.remove_vpp_config()
117
118     def test_mod_bfd(self):
119         """ modify BFD session parameters """
120         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
121                                    desired_min_tx=50000,
122                                    required_min_rx=10000,
123                                    detect_mult=1)
124         session.add_vpp_config()
125         s = session.get_bfd_udp_session_dump_entry()
126         self.assert_equal(session.desired_min_tx,
127                           s.desired_min_tx,
128                           "desired min transmit interval")
129         self.assert_equal(session.required_min_rx,
130                           s.required_min_rx,
131                           "required min receive interval")
132         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
133         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
134                                   required_min_rx=session.required_min_rx * 2,
135                                   detect_mult=session.detect_mult * 2)
136         s = session.get_bfd_udp_session_dump_entry()
137         self.assert_equal(session.desired_min_tx,
138                           s.desired_min_tx,
139                           "desired min transmit interval")
140         self.assert_equal(session.required_min_rx,
141                           s.required_min_rx,
142                           "required min receive interval")
143         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
144
145     def test_add_sha1_keys(self):
146         """ add SHA1 keys """
147         key_count = 10
148         keys = [self.factory.create_random_key(
149             self) for i in range(0, key_count)]
150         for key in keys:
151             self.assertFalse(key.query_vpp_config())
152         for key in keys:
153             key.add_vpp_config()
154         for key in keys:
155             self.assertTrue(key.query_vpp_config())
156         # remove randomly
157         indexes = list(range(key_count))
158         shuffle(indexes)
159         removed = []
160         for i in indexes:
161             key = keys[i]
162             key.remove_vpp_config()
163             removed.append(i)
164             for j in range(key_count):
165                 key = keys[j]
166                 if j in removed:
167                     self.assertFalse(key.query_vpp_config())
168                 else:
169                     self.assertTrue(key.query_vpp_config())
170         # should be removed now
171         for key in keys:
172             self.assertFalse(key.query_vpp_config())
173         # add back and remove again
174         for key in keys:
175             key.add_vpp_config()
176         for key in keys:
177             self.assertTrue(key.query_vpp_config())
178         for key in keys:
179             key.remove_vpp_config()
180         for key in keys:
181             self.assertFalse(key.query_vpp_config())
182
183     def test_add_bfd_sha1(self):
184         """ create a BFD session (SHA1) """
185         key = self.factory.create_random_key(self)
186         key.add_vpp_config()
187         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
188                                    sha1_key=key)
189         session.add_vpp_config()
190         self.logger.debug("Session state is %s", session.state)
191         session.remove_vpp_config()
192         session.add_vpp_config()
193         self.logger.debug("Session state is %s", session.state)
194         session.remove_vpp_config()
195
196     def test_double_add_sha1(self):
197         """ create the same BFD session twice (negative case) (SHA1) """
198         key = self.factory.create_random_key(self)
199         key.add_vpp_config()
200         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
201                                    sha1_key=key)
202         session.add_vpp_config()
203         with self.assertRaises(Exception):
204             session.add_vpp_config()
205
206     def test_add_auth_nonexistent_key(self):
207         """ create BFD session using non-existent SHA1 (negative case) """
208         session = VppBFDUDPSession(
209             self, self.pg0, self.pg0.remote_ip4,
210             sha1_key=self.factory.create_random_key(self))
211         with self.assertRaises(Exception):
212             session.add_vpp_config()
213
214     def test_shared_sha1_key(self):
215         """ share single SHA1 key between multiple BFD sessions """
216         key = self.factory.create_random_key(self)
217         key.add_vpp_config()
218         sessions = [
219             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
220                              sha1_key=key),
221             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
222                              sha1_key=key, af=AF_INET6),
223             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
224                              sha1_key=key),
225             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
226                              sha1_key=key, af=AF_INET6)]
227         for s in sessions:
228             s.add_vpp_config()
229         removed = 0
230         for s in sessions:
231             e = key.get_bfd_auth_keys_dump_entry()
232             self.assert_equal(e.use_count, len(sessions) - removed,
233                               "Use count for shared key")
234             s.remove_vpp_config()
235             removed += 1
236         e = key.get_bfd_auth_keys_dump_entry()
237         self.assert_equal(e.use_count, len(sessions) - removed,
238                           "Use count for shared key")
239
240     def test_activate_auth(self):
241         """ activate SHA1 authentication """
242         key = self.factory.create_random_key(self)
243         key.add_vpp_config()
244         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
245         session.add_vpp_config()
246         session.activate_auth(key)
247
248     def test_deactivate_auth(self):
249         """ deactivate SHA1 authentication """
250         key = self.factory.create_random_key(self)
251         key.add_vpp_config()
252         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
253         session.add_vpp_config()
254         session.activate_auth(key)
255         session.deactivate_auth()
256
257     def test_change_key(self):
258         """ change SHA1 key """
259         key1 = self.factory.create_random_key(self)
260         key2 = self.factory.create_random_key(self)
261         while key2.conf_key_id == key1.conf_key_id:
262             key2 = self.factory.create_random_key(self)
263         key1.add_vpp_config()
264         key2.add_vpp_config()
265         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
266                                    sha1_key=key1)
267         session.add_vpp_config()
268         session.activate_auth(key2)
269
270     def test_set_del_udp_echo_source(self):
271         """ set/del udp echo source """
272         self.create_loopback_interfaces(1)
273         self.loopback0 = self.lo_interfaces[0]
274         self.loopback0.admin_up()
275         echo_source = self.vapi.bfd_udp_get_echo_source()
276         self.assertFalse(echo_source.is_set)
277         self.assertFalse(echo_source.have_usable_ip4)
278         self.assertFalse(echo_source.have_usable_ip6)
279
280         self.vapi.bfd_udp_set_echo_source(
281             sw_if_index=self.loopback0.sw_if_index)
282         echo_source = self.vapi.bfd_udp_get_echo_source()
283         self.assertTrue(echo_source.is_set)
284         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
285         self.assertFalse(echo_source.have_usable_ip4)
286         self.assertFalse(echo_source.have_usable_ip6)
287
288         self.loopback0.config_ip4()
289         echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
290             self.loopback0.local_ip4)) ^ 1).packed
291         echo_source = self.vapi.bfd_udp_get_echo_source()
292         self.assertTrue(echo_source.is_set)
293         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
294         self.assertTrue(echo_source.have_usable_ip4)
295         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
296         self.assertFalse(echo_source.have_usable_ip6)
297
298         self.loopback0.config_ip6()
299         echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
300             self.loopback0.local_ip6)) ^ 1).packed
301
302         echo_source = self.vapi.bfd_udp_get_echo_source()
303         self.assertTrue(echo_source.is_set)
304         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
305         self.assertTrue(echo_source.have_usable_ip4)
306         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
307         self.assertTrue(echo_source.have_usable_ip6)
308         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
309
310         self.vapi.bfd_udp_del_echo_source()
311         echo_source = self.vapi.bfd_udp_get_echo_source()
312         self.assertFalse(echo_source.is_set)
313         self.assertFalse(echo_source.have_usable_ip4)
314         self.assertFalse(echo_source.have_usable_ip6)
315
316
317 class BFDTestSession(object):
318     """ BFD session as seen from test framework side """
319
320     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
321                  bfd_key_id=None, our_seq_number=None,
322                  tunnel_header=None, phy_interface=None):
323         self.test = test
324         self.af = af
325         self.sha1_key = sha1_key
326         self.bfd_key_id = bfd_key_id
327         self.interface = interface
328         if phy_interface:
329             self.phy_interface = phy_interface
330         else:
331             self.phy_interface = self.interface
332         self.udp_sport = randint(49152, 65535)
333         if our_seq_number is None:
334             self.our_seq_number = randint(0, 40000000)
335         else:
336             self.our_seq_number = our_seq_number
337         self.vpp_seq_number = None
338         self.my_discriminator = 0
339         self.desired_min_tx = 300000
340         self.required_min_rx = 300000
341         self.required_min_echo_rx = None
342         self.detect_mult = detect_mult
343         self.diag = BFDDiagCode.no_diagnostic
344         self.your_discriminator = None
345         self.state = BFDState.down
346         self.auth_type = BFDAuthType.no_auth
347         self.tunnel_header = tunnel_header
348
349     def inc_seq_num(self):
350         """ increment sequence number, wrapping if needed """
351         if self.our_seq_number == 0xFFFFFFFF:
352             self.our_seq_number = 0
353         else:
354             self.our_seq_number += 1
355
356     def update(self, my_discriminator=None, your_discriminator=None,
357                desired_min_tx=None, required_min_rx=None,
358                required_min_echo_rx=None, detect_mult=None,
359                diag=None, state=None, auth_type=None):
360         """ update BFD parameters associated with session """
361         if my_discriminator is not None:
362             self.my_discriminator = my_discriminator
363         if your_discriminator is not None:
364             self.your_discriminator = your_discriminator
365         if required_min_rx is not None:
366             self.required_min_rx = required_min_rx
367         if required_min_echo_rx is not None:
368             self.required_min_echo_rx = required_min_echo_rx
369         if desired_min_tx is not None:
370             self.desired_min_tx = desired_min_tx
371         if detect_mult is not None:
372             self.detect_mult = detect_mult
373         if diag is not None:
374             self.diag = diag
375         if state is not None:
376             self.state = state
377         if auth_type is not None:
378             self.auth_type = auth_type
379
380     def fill_packet_fields(self, packet):
381         """ set packet fields with known values in packet """
382         bfd = packet[BFD]
383         if self.my_discriminator:
384             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
385                                    self.my_discriminator)
386             bfd.my_discriminator = self.my_discriminator
387         if self.your_discriminator:
388             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
389                                    self.your_discriminator)
390             bfd.your_discriminator = self.your_discriminator
391         if self.required_min_rx:
392             self.test.logger.debug(
393                 "BFD: setting packet.required_min_rx_interval=%s",
394                 self.required_min_rx)
395             bfd.required_min_rx_interval = self.required_min_rx
396         if self.required_min_echo_rx:
397             self.test.logger.debug(
398                 "BFD: setting packet.required_min_echo_rx=%s",
399                 self.required_min_echo_rx)
400             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
401         if self.desired_min_tx:
402             self.test.logger.debug(
403                 "BFD: setting packet.desired_min_tx_interval=%s",
404                 self.desired_min_tx)
405             bfd.desired_min_tx_interval = self.desired_min_tx
406         if self.detect_mult:
407             self.test.logger.debug(
408                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
409             bfd.detect_mult = self.detect_mult
410         if self.diag:
411             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
412             bfd.diag = self.diag
413         if self.state:
414             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
415             bfd.state = self.state
416         if self.auth_type:
417             # this is used by a negative test-case
418             self.test.logger.debug("BFD: setting packet.auth_type=%s",
419                                    self.auth_type)
420             bfd.auth_type = self.auth_type
421
422     def create_packet(self):
423         """ create a BFD packet, reflecting the current state of session """
424         if self.sha1_key:
425             bfd = BFD(flags="A")
426             bfd.auth_type = self.sha1_key.auth_type
427             bfd.auth_len = BFD.sha1_auth_len
428             bfd.auth_key_id = self.bfd_key_id
429             bfd.auth_seq_num = self.our_seq_number
430             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
431         else:
432             bfd = BFD()
433         packet = Ether(src=self.phy_interface.remote_mac,
434                        dst=self.phy_interface.local_mac)
435         if self.tunnel_header:
436             packet = packet / self.tunnel_header
437         if self.af == AF_INET6:
438             packet = (packet /
439                       IPv6(src=self.interface.remote_ip6,
440                            dst=self.interface.local_ip6,
441                            hlim=255) /
442                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
443                       bfd)
444         else:
445             packet = (packet /
446                       IP(src=self.interface.remote_ip4,
447                          dst=self.interface.local_ip4,
448                          ttl=255) /
449                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
450                       bfd)
451         self.test.logger.debug("BFD: Creating packet")
452         self.fill_packet_fields(packet)
453         if self.sha1_key:
454             hash_material = scapy.compat.raw(
455                 packet[BFD])[:32] + self.sha1_key.key + \
456                 b"\0" * (20 - len(self.sha1_key.key))
457             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
458                                    hashlib.sha1(hash_material).hexdigest())
459             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
460         return packet
461
462     def send_packet(self, packet=None, interface=None):
463         """ send packet on interface, creating the packet if needed """
464         if packet is None:
465             packet = self.create_packet()
466         if interface is None:
467             interface = self.phy_interface
468         self.test.logger.debug(ppp("Sending packet:", packet))
469         interface.add_stream(packet)
470         self.test.pg_start()
471
472     def verify_sha1_auth(self, packet):
473         """ Verify correctness of authentication in BFD layer. """
474         bfd = packet[BFD]
475         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
476         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
477                                BFDAuthType)
478         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
479         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
480         if self.vpp_seq_number is None:
481             self.vpp_seq_number = bfd.auth_seq_num
482             self.test.logger.debug("Received initial sequence number: %s" %
483                                    self.vpp_seq_number)
484         else:
485             recvd_seq_num = bfd.auth_seq_num
486             self.test.logger.debug("Received followup sequence number: %s" %
487                                    recvd_seq_num)
488             if self.vpp_seq_number < 0xffffffff:
489                 if self.sha1_key.auth_type == \
490                         BFDAuthType.meticulous_keyed_sha1:
491                     self.test.assert_equal(recvd_seq_num,
492                                            self.vpp_seq_number + 1,
493                                            "BFD sequence number")
494                 else:
495                     self.test.assert_in_range(recvd_seq_num,
496                                               self.vpp_seq_number,
497                                               self.vpp_seq_number + 1,
498                                               "BFD sequence number")
499             else:
500                 if self.sha1_key.auth_type == \
501                         BFDAuthType.meticulous_keyed_sha1:
502                     self.test.assert_equal(recvd_seq_num, 0,
503                                            "BFD sequence number")
504                 else:
505                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
506                                        "BFD sequence number not one of "
507                                        "(%s, 0)" % self.vpp_seq_number)
508             self.vpp_seq_number = recvd_seq_num
509         # last 20 bytes represent the hash - so replace them with the key,
510         # pad the result with zeros and hash the result
511         hash_material = bfd.original[:-20] + self.sha1_key.key + \
512             b"\0" * (20 - len(self.sha1_key.key))
513         expected_hash = hashlib.sha1(hash_material).hexdigest()
514         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
515                                expected_hash.encode(), "Auth key hash")
516
517     def verify_bfd(self, packet):
518         """ Verify correctness of BFD layer. """
519         bfd = packet[BFD]
520         self.test.assert_equal(bfd.version, 1, "BFD version")
521         self.test.assert_equal(bfd.your_discriminator,
522                                self.my_discriminator,
523                                "BFD - your discriminator")
524         if self.sha1_key:
525             self.verify_sha1_auth(packet)
526
527
528 def bfd_session_up(test):
529     """ Bring BFD session up """
530     test.logger.info("BFD: Waiting for slow hello")
531     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
532     old_offset = None
533     if hasattr(test, 'vpp_clock_offset'):
534         old_offset = test.vpp_clock_offset
535     test.vpp_clock_offset = time.time() - float(p.time)
536     test.logger.debug("BFD: Calculated vpp clock offset: %s",
537                       test.vpp_clock_offset)
538     if old_offset:
539         test.assertAlmostEqual(
540             old_offset, test.vpp_clock_offset, delta=0.5,
541             msg="vpp clock offset not stable (new: %s, old: %s)" %
542             (test.vpp_clock_offset, old_offset))
543     test.logger.info("BFD: Sending Init")
544     test.test_session.update(my_discriminator=randint(0, 40000000),
545                              your_discriminator=p[BFD].my_discriminator,
546                              state=BFDState.init)
547     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
548             BFDAuthType.meticulous_keyed_sha1:
549         test.test_session.inc_seq_num()
550     test.test_session.send_packet()
551     test.logger.info("BFD: Waiting for event")
552     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
553     verify_event(test, e, expected_state=BFDState.up)
554     test.logger.info("BFD: Session is Up")
555     test.test_session.update(state=BFDState.up)
556     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
557             BFDAuthType.meticulous_keyed_sha1:
558         test.test_session.inc_seq_num()
559     test.test_session.send_packet()
560     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561
562
563 def bfd_session_down(test):
564     """ Bring BFD session down """
565     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
566     test.test_session.update(state=BFDState.down)
567     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
568             BFDAuthType.meticulous_keyed_sha1:
569         test.test_session.inc_seq_num()
570     test.test_session.send_packet()
571     test.logger.info("BFD: Waiting for event")
572     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
573     verify_event(test, e, expected_state=BFDState.down)
574     test.logger.info("BFD: Session is Down")
575     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
576
577
578 def verify_bfd_session_config(test, session, state=None):
579     dump = session.get_bfd_udp_session_dump_entry()
580     test.assertIsNotNone(dump)
581     # since dump is not none, we have verified that sw_if_index and addresses
582     # are valid (in get_bfd_udp_session_dump_entry)
583     if state:
584         test.assert_equal(dump.state, state, "session state")
585     test.assert_equal(dump.required_min_rx, session.required_min_rx,
586                       "required min rx interval")
587     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
588                       "desired min tx interval")
589     test.assert_equal(dump.detect_mult, session.detect_mult,
590                       "detect multiplier")
591     if session.sha1_key is None:
592         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
593     else:
594         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
595         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
596                           "bfd key id")
597         test.assert_equal(dump.conf_key_id,
598                           session.sha1_key.conf_key_id,
599                           "config key id")
600
601
602 def verify_ip(test, packet):
603     """ Verify correctness of IP layer. """
604     if test.vpp_session.af == AF_INET6:
605         ip = packet[IPv6]
606         local_ip = test.vpp_session.interface.local_ip6
607         remote_ip = test.vpp_session.interface.remote_ip6
608         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
609     else:
610         ip = packet[IP]
611         local_ip = test.vpp_session.interface.local_ip4
612         remote_ip = test.vpp_session.interface.remote_ip4
613         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
614     test.assert_equal(ip.src, local_ip, "IP source address")
615     test.assert_equal(ip.dst, remote_ip, "IP destination address")
616
617
618 def verify_udp(test, packet):
619     """ Verify correctness of UDP layer. """
620     udp = packet[UDP]
621     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
622     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
623                          "UDP source port")
624
625
626 def verify_event(test, event, expected_state):
627     """ Verify correctness of event values. """
628     e = event
629     test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
630     test.assert_equal(e.sw_if_index,
631                       test.vpp_session.interface.sw_if_index,
632                       "BFD interface index")
633
634     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
635                       "Local IPv6 address")
636     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
637                       "Peer IPv6 address")
638     test.assert_equal(e.state, expected_state, BFDState)
639
640
641 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
642     """ wait for BFD packet and verify its correctness
643
644     :param timeout: how long to wait
645     :param pcap_time_min: ignore packets with pcap timestamp lower than this
646
647     :returns: tuple (packet, time spent waiting for packet)
648     """
649     test.logger.info("BFD: Waiting for BFD packet")
650     deadline = time.time() + timeout
651     counter = 0
652     while True:
653         counter += 1
654         # sanity check
655         test.assert_in_range(counter, 0, 100, "number of packets ignored")
656         time_left = deadline - time.time()
657         if time_left < 0:
658             raise CaptureTimeoutError("Packet did not arrive within timeout")
659         p = test.pg0.wait_for_packet(timeout=time_left)
660         test.logger.debug(ppp("BFD: Got packet:", p))
661         if pcap_time_min is not None and p.time < pcap_time_min:
662             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
663                                   "pcap time min %s):" %
664                                   (p.time, pcap_time_min), p))
665         else:
666             break
667     if is_tunnel:
668         # strip an IP layer and move to the next
669         p = p[IP].payload
670
671     bfd = p[BFD]
672     if bfd is None:
673         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
674     if bfd.payload:
675         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
676     verify_ip(test, p)
677     verify_udp(test, p)
678     test.test_session.verify_bfd(p)
679     return p
680
681
682 @tag_run_solo
683 class BFD4TestCase(VppTestCase):
684     """Bidirectional Forwarding Detection (BFD)"""
685
686     pg0 = None
687     vpp_clock_offset = None
688     vpp_session = None
689     test_session = None
690
691     @classmethod
692     def setUpClass(cls):
693         super(BFD4TestCase, cls).setUpClass()
694         cls.vapi.cli("set log class bfd level debug")
695         try:
696             cls.create_pg_interfaces([0])
697             cls.create_loopback_interfaces(1)
698             cls.loopback0 = cls.lo_interfaces[0]
699             cls.loopback0.config_ip4()
700             cls.loopback0.admin_up()
701             cls.pg0.config_ip4()
702             cls.pg0.configure_ipv4_neighbors()
703             cls.pg0.admin_up()
704             cls.pg0.resolve_arp()
705
706         except Exception:
707             super(BFD4TestCase, cls).tearDownClass()
708             raise
709
710     @classmethod
711     def tearDownClass(cls):
712         super(BFD4TestCase, cls).tearDownClass()
713
714     def setUp(self):
715         super(BFD4TestCase, self).setUp()
716         self.factory = AuthKeyFactory()
717         self.vapi.want_bfd_events()
718         self.pg0.enable_capture()
719         try:
720             self.vpp_session = VppBFDUDPSession(self, self.pg0,
721                                                 self.pg0.remote_ip4)
722             self.vpp_session.add_vpp_config()
723             self.vpp_session.admin_up()
724             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
725         except BaseException:
726             self.vapi.want_bfd_events(enable_disable=0)
727             raise
728
729     def tearDown(self):
730         if not self.vpp_dead:
731             self.vapi.want_bfd_events(enable_disable=0)
732         self.vapi.collect_events()  # clear the event queue
733         super(BFD4TestCase, self).tearDown()
734
735     def test_session_up(self):
736         """ bring BFD session up """
737         bfd_session_up(self)
738
739     def test_session_up_by_ip(self):
740         """ bring BFD session up - first frame looked up by address pair """
741         self.logger.info("BFD: Sending Slow control frame")
742         self.test_session.update(my_discriminator=randint(0, 40000000))
743         self.test_session.send_packet()
744         self.pg0.enable_capture()
745         p = self.pg0.wait_for_packet(1)
746         self.assert_equal(p[BFD].your_discriminator,
747                           self.test_session.my_discriminator,
748                           "BFD - your discriminator")
749         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
750         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
751                                  state=BFDState.up)
752         self.logger.info("BFD: Waiting for event")
753         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
754         verify_event(self, e, expected_state=BFDState.init)
755         self.logger.info("BFD: Sending Up")
756         self.test_session.send_packet()
757         self.logger.info("BFD: Waiting for event")
758         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
759         verify_event(self, e, expected_state=BFDState.up)
760         self.logger.info("BFD: Session is Up")
761         self.test_session.update(state=BFDState.up)
762         self.test_session.send_packet()
763         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
764
765     def test_session_down(self):
766         """ bring BFD session down """
767         bfd_session_up(self)
768         bfd_session_down(self)
769
770     def test_hold_up(self):
771         """ hold BFD session up """
772         bfd_session_up(self)
773         for dummy in range(self.test_session.detect_mult * 2):
774             wait_for_bfd_packet(self)
775             self.test_session.send_packet()
776         self.assert_equal(len(self.vapi.collect_events()), 0,
777                           "number of bfd events")
778
779     def test_slow_timer(self):
780         """ verify slow periodic control frames while session down """
781         packet_count = 3
782         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
783         prev_packet = wait_for_bfd_packet(self, 2)
784         for dummy in range(packet_count):
785             next_packet = wait_for_bfd_packet(self, 2)
786             time_diff = next_packet.time - prev_packet.time
787             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
788             # to work around timing issues
789             self.assert_in_range(
790                 time_diff, 0.70, 1.05, "time between slow packets")
791             prev_packet = next_packet
792
793     def test_zero_remote_min_rx(self):
794         """ no packets when zero remote required min rx interval """
795         bfd_session_up(self)
796         self.test_session.update(required_min_rx=0)
797         self.test_session.send_packet()
798         for dummy in range(self.test_session.detect_mult):
799             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
800                        "sleep before transmitting bfd packet")
801             self.test_session.send_packet()
802             try:
803                 p = wait_for_bfd_packet(self, timeout=0)
804                 self.logger.error(ppp("Received unexpected packet:", p))
805             except CaptureTimeoutError:
806                 pass
807         self.assert_equal(
808             len(self.vapi.collect_events()), 0, "number of bfd events")
809         self.test_session.update(required_min_rx=300000)
810         for dummy in range(3):
811             self.test_session.send_packet()
812             wait_for_bfd_packet(
813                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
814         self.assert_equal(
815             len(self.vapi.collect_events()), 0, "number of bfd events")
816
817     def test_conn_down(self):
818         """ verify session goes down after inactivity """
819         bfd_session_up(self)
820         detection_time = self.test_session.detect_mult *\
821             self.vpp_session.required_min_rx / USEC_IN_SEC
822         self.sleep(detection_time, "waiting for BFD session time-out")
823         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
824         verify_event(self, e, expected_state=BFDState.down)
825
826     def test_peer_discr_reset_sess_down(self):
827         """ peer discriminator reset after session goes down """
828         bfd_session_up(self)
829         detection_time = self.test_session.detect_mult *\
830             self.vpp_session.required_min_rx / USEC_IN_SEC
831         self.sleep(detection_time, "waiting for BFD session time-out")
832         self.test_session.my_discriminator = 0
833         wait_for_bfd_packet(self,
834                             pcap_time_min=time.time() - self.vpp_clock_offset)
835
836     def test_large_required_min_rx(self):
837         """ large remote required min rx interval """
838         bfd_session_up(self)
839         p = wait_for_bfd_packet(self)
840         interval = 3000000
841         self.test_session.update(required_min_rx=interval)
842         self.test_session.send_packet()
843         time_mark = time.time()
844         count = 0
845         # busy wait here, trying to collect a packet or event, vpp is not
846         # allowed to send packets and the session will timeout first - so the
847         # Up->Down event must arrive before any packets do
848         while time.time() < time_mark + interval / USEC_IN_SEC:
849             try:
850                 p = wait_for_bfd_packet(self, timeout=0)
851                 # if vpp managed to send a packet before we did the session
852                 # session update, then that's fine, ignore it
853                 if p.time < time_mark - self.vpp_clock_offset:
854                     continue
855                 self.logger.error(ppp("Received unexpected packet:", p))
856                 count += 1
857             except CaptureTimeoutError:
858                 pass
859             events = self.vapi.collect_events()
860             if len(events) > 0:
861                 verify_event(self, events[0], BFDState.down)
862                 break
863         self.assert_equal(count, 0, "number of packets received")
864
865     def test_immediate_remote_min_rx_reduction(self):
866         """ immediately honor remote required min rx reduction """
867         self.vpp_session.remove_vpp_config()
868         self.vpp_session = VppBFDUDPSession(
869             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
870         self.pg0.enable_capture()
871         self.vpp_session.add_vpp_config()
872         self.test_session.update(desired_min_tx=1000000,
873                                  required_min_rx=1000000)
874         bfd_session_up(self)
875         reference_packet = wait_for_bfd_packet(self)
876         time_mark = time.time()
877         interval = 300000
878         self.test_session.update(required_min_rx=interval)
879         self.test_session.send_packet()
880         extra_time = time.time() - time_mark
881         p = wait_for_bfd_packet(self)
882         # first packet is allowed to be late by time we spent doing the update
883         # calculated in extra_time
884         self.assert_in_range(p.time - reference_packet.time,
885                              .95 * 0.75 * interval / USEC_IN_SEC,
886                              1.05 * interval / USEC_IN_SEC + extra_time,
887                              "time between BFD packets")
888         reference_packet = p
889         for dummy in range(3):
890             p = wait_for_bfd_packet(self)
891             diff = p.time - reference_packet.time
892             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
893                                  1.05 * interval / USEC_IN_SEC,
894                                  "time between BFD packets")
895             reference_packet = p
896
897     def test_modify_req_min_rx_double(self):
898         """ modify session - double required min rx """
899         bfd_session_up(self)
900         p = wait_for_bfd_packet(self)
901         self.test_session.update(desired_min_tx=10000,
902                                  required_min_rx=10000)
903         self.test_session.send_packet()
904         # double required min rx
905         self.vpp_session.modify_parameters(
906             required_min_rx=2 * self.vpp_session.required_min_rx)
907         p = wait_for_bfd_packet(
908             self, pcap_time_min=time.time() - self.vpp_clock_offset)
909         # poll bit needs to be set
910         self.assertIn("P", p.sprintf("%BFD.flags%"),
911                       "Poll bit not set in BFD packet")
912         # finish poll sequence with final packet
913         final = self.test_session.create_packet()
914         final[BFD].flags = "F"
915         timeout = self.test_session.detect_mult * \
916             max(self.test_session.desired_min_tx,
917                 self.vpp_session.required_min_rx) / USEC_IN_SEC
918         self.test_session.send_packet(final)
919         time_mark = time.time()
920         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
921         verify_event(self, e, expected_state=BFDState.down)
922         time_to_event = time.time() - time_mark
923         self.assert_in_range(time_to_event, .9 * timeout,
924                              1.1 * timeout, "session timeout")
925
926     def test_modify_req_min_rx_halve(self):
927         """ modify session - halve required min rx """
928         self.vpp_session.modify_parameters(
929             required_min_rx=2 * self.vpp_session.required_min_rx)
930         bfd_session_up(self)
931         p = wait_for_bfd_packet(self)
932         self.test_session.update(desired_min_tx=10000,
933                                  required_min_rx=10000)
934         self.test_session.send_packet()
935         p = wait_for_bfd_packet(
936             self, pcap_time_min=time.time() - self.vpp_clock_offset)
937         # halve required min rx
938         old_required_min_rx = self.vpp_session.required_min_rx
939         self.vpp_session.modify_parameters(
940             required_min_rx=self.vpp_session.required_min_rx // 2)
941         # now we wait 0.8*3*old-req-min-rx and the session should still be up
942         self.sleep(0.8 * self.vpp_session.detect_mult *
943                    old_required_min_rx / USEC_IN_SEC,
944                    "wait before finishing poll sequence")
945         self.assert_equal(len(self.vapi.collect_events()), 0,
946                           "number of bfd events")
947         p = wait_for_bfd_packet(self)
948         # poll bit needs to be set
949         self.assertIn("P", p.sprintf("%BFD.flags%"),
950                       "Poll bit not set in BFD packet")
951         # finish poll sequence with final packet
952         final = self.test_session.create_packet()
953         final[BFD].flags = "F"
954         self.test_session.send_packet(final)
955         # now the session should time out under new conditions
956         detection_time = self.test_session.detect_mult *\
957             self.vpp_session.required_min_rx / USEC_IN_SEC
958         before = time.time()
959         e = self.vapi.wait_for_event(
960             2 * detection_time, "bfd_udp_session_details")
961         after = time.time()
962         self.assert_in_range(after - before,
963                              0.9 * detection_time,
964                              1.1 * detection_time,
965                              "time before bfd session goes down")
966         verify_event(self, e, expected_state=BFDState.down)
967
968     def test_modify_detect_mult(self):
969         """ modify detect multiplier """
970         bfd_session_up(self)
971         p = wait_for_bfd_packet(self)
972         self.vpp_session.modify_parameters(detect_mult=1)
973         p = wait_for_bfd_packet(
974             self, pcap_time_min=time.time() - self.vpp_clock_offset)
975         self.assert_equal(self.vpp_session.detect_mult,
976                           p[BFD].detect_mult,
977                           "detect mult")
978         # poll bit must not be set
979         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
980                          "Poll bit not set in BFD packet")
981         self.vpp_session.modify_parameters(detect_mult=10)
982         p = wait_for_bfd_packet(
983             self, pcap_time_min=time.time() - self.vpp_clock_offset)
984         self.assert_equal(self.vpp_session.detect_mult,
985                           p[BFD].detect_mult,
986                           "detect mult")
987         # poll bit must not be set
988         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
989                          "Poll bit not set in BFD packet")
990
991     def test_queued_poll(self):
992         """ test poll sequence queueing """
993         bfd_session_up(self)
994         p = wait_for_bfd_packet(self)
995         self.vpp_session.modify_parameters(
996             required_min_rx=2 * self.vpp_session.required_min_rx)
997         p = wait_for_bfd_packet(self)
998         poll_sequence_start = time.time()
999         poll_sequence_length_min = 0.5
1000         send_final_after = time.time() + poll_sequence_length_min
1001         # poll bit needs to be set
1002         self.assertIn("P", p.sprintf("%BFD.flags%"),
1003                       "Poll bit not set in BFD packet")
1004         self.assert_equal(p[BFD].required_min_rx_interval,
1005                           self.vpp_session.required_min_rx,
1006                           "BFD required min rx interval")
1007         self.vpp_session.modify_parameters(
1008             required_min_rx=2 * self.vpp_session.required_min_rx)
1009         # 2nd poll sequence should be queued now
1010         # don't send the reply back yet, wait for some time to emulate
1011         # longer round-trip time
1012         packet_count = 0
1013         while time.time() < send_final_after:
1014             self.test_session.send_packet()
1015             p = wait_for_bfd_packet(self)
1016             self.assert_equal(len(self.vapi.collect_events()), 0,
1017                               "number of bfd events")
1018             self.assert_equal(p[BFD].required_min_rx_interval,
1019                               self.vpp_session.required_min_rx,
1020                               "BFD required min rx interval")
1021             packet_count += 1
1022             # poll bit must be set
1023             self.assertIn("P", p.sprintf("%BFD.flags%"),
1024                           "Poll bit not set in BFD packet")
1025         final = self.test_session.create_packet()
1026         final[BFD].flags = "F"
1027         self.test_session.send_packet(final)
1028         # finish 1st with final
1029         poll_sequence_length = time.time() - poll_sequence_start
1030         # vpp must wait for some time before starting new poll sequence
1031         poll_no_2_started = False
1032         for dummy in range(2 * packet_count):
1033             p = wait_for_bfd_packet(self)
1034             self.assert_equal(len(self.vapi.collect_events()), 0,
1035                               "number of bfd events")
1036             if "P" in p.sprintf("%BFD.flags%"):
1037                 poll_no_2_started = True
1038                 if time.time() < poll_sequence_start + poll_sequence_length:
1039                     raise Exception("VPP started 2nd poll sequence too soon")
1040                 final = self.test_session.create_packet()
1041                 final[BFD].flags = "F"
1042                 self.test_session.send_packet(final)
1043                 break
1044             else:
1045                 self.test_session.send_packet()
1046         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1047         # finish 2nd with final
1048         final = self.test_session.create_packet()
1049         final[BFD].flags = "F"
1050         self.test_session.send_packet(final)
1051         p = wait_for_bfd_packet(self)
1052         # poll bit must not be set
1053         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1054                          "Poll bit set in BFD packet")
1055
1056     # returning inconsistent results requiring retries in per-patch tests
1057     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1058     def test_poll_response(self):
1059         """ test correct response to control frame with poll bit set """
1060         bfd_session_up(self)
1061         poll = self.test_session.create_packet()
1062         poll[BFD].flags = "P"
1063         self.test_session.send_packet(poll)
1064         final = wait_for_bfd_packet(
1065             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1066         self.assertIn("F", final.sprintf("%BFD.flags%"))
1067
1068     def test_no_periodic_if_remote_demand(self):
1069         """ no periodic frames outside poll sequence if remote demand set """
1070         bfd_session_up(self)
1071         demand = self.test_session.create_packet()
1072         demand[BFD].flags = "D"
1073         self.test_session.send_packet(demand)
1074         transmit_time = 0.9 \
1075             * max(self.vpp_session.required_min_rx,
1076                   self.test_session.desired_min_tx) \
1077             / USEC_IN_SEC
1078         count = 0
1079         for dummy in range(self.test_session.detect_mult * 2):
1080             self.sleep(transmit_time)
1081             self.test_session.send_packet(demand)
1082             try:
1083                 p = wait_for_bfd_packet(self, timeout=0)
1084                 self.logger.error(ppp("Received unexpected packet:", p))
1085                 count += 1
1086             except CaptureTimeoutError:
1087                 pass
1088         events = self.vapi.collect_events()
1089         for e in events:
1090             self.logger.error("Received unexpected event: %s", e)
1091         self.assert_equal(count, 0, "number of packets received")
1092         self.assert_equal(len(events), 0, "number of events received")
1093
1094     def test_echo_looped_back(self):
1095         """ echo packets looped back """
1096         # don't need a session in this case..
1097         self.vpp_session.remove_vpp_config()
1098         self.pg0.enable_capture()
1099         echo_packet_count = 10
1100         # random source port low enough to increment a few times..
1101         udp_sport_tx = randint(1, 50000)
1102         udp_sport_rx = udp_sport_tx
1103         echo_packet = (Ether(src=self.pg0.remote_mac,
1104                              dst=self.pg0.local_mac) /
1105                        IP(src=self.pg0.remote_ip4,
1106                           dst=self.pg0.remote_ip4) /
1107                        UDP(dport=BFD.udp_dport_echo) /
1108                        Raw("this should be looped back"))
1109         for dummy in range(echo_packet_count):
1110             self.sleep(.01, "delay between echo packets")
1111             echo_packet[UDP].sport = udp_sport_tx
1112             udp_sport_tx += 1
1113             self.logger.debug(ppp("Sending packet:", echo_packet))
1114             self.pg0.add_stream(echo_packet)
1115             self.pg_start()
1116         for dummy in range(echo_packet_count):
1117             p = self.pg0.wait_for_packet(1)
1118             self.logger.debug(ppp("Got packet:", p))
1119             ether = p[Ether]
1120             self.assert_equal(self.pg0.remote_mac,
1121                               ether.dst, "Destination MAC")
1122             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1123             ip = p[IP]
1124             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1125             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1126             udp = p[UDP]
1127             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1128                               "UDP destination port")
1129             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1130             udp_sport_rx += 1
1131             # need to compare the hex payload here, otherwise BFD_vpp_echo
1132             # gets in way
1133             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1134                              scapy.compat.raw(echo_packet[UDP].payload),
1135                              "Received packet is not the echo packet sent")
1136         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1137                           "ECHO packet identifier for test purposes)")
1138
1139     def test_echo(self):
1140         """ echo function """
1141         bfd_session_up(self)
1142         self.test_session.update(required_min_echo_rx=150000)
1143         self.test_session.send_packet()
1144         detection_time = self.test_session.detect_mult *\
1145             self.vpp_session.required_min_rx / USEC_IN_SEC
1146         # echo shouldn't work without echo source set
1147         for dummy in range(10):
1148             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1149             self.sleep(sleep, "delay before sending bfd packet")
1150             self.test_session.send_packet()
1151         p = wait_for_bfd_packet(
1152             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1153         self.assert_equal(p[BFD].required_min_rx_interval,
1154                           self.vpp_session.required_min_rx,
1155                           "BFD required min rx interval")
1156         self.test_session.send_packet()
1157         self.vapi.bfd_udp_set_echo_source(
1158             sw_if_index=self.loopback0.sw_if_index)
1159         echo_seen = False
1160         # should be turned on - loopback echo packets
1161         for dummy in range(3):
1162             loop_until = time.time() + 0.75 * detection_time
1163             while time.time() < loop_until:
1164                 p = self.pg0.wait_for_packet(1)
1165                 self.logger.debug(ppp("Got packet:", p))
1166                 if p[UDP].dport == BFD.udp_dport_echo:
1167                     self.assert_equal(
1168                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1169                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1170                                         "BFD ECHO src IP equal to loopback IP")
1171                     self.logger.debug(ppp("Looping back packet:", p))
1172                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1173                                       "ECHO packet destination MAC address")
1174                     p[Ether].dst = self.pg0.local_mac
1175                     self.pg0.add_stream(p)
1176                     self.pg_start()
1177                     echo_seen = True
1178                 elif p.haslayer(BFD):
1179                     if echo_seen:
1180                         self.assertGreaterEqual(
1181                             p[BFD].required_min_rx_interval,
1182                             1000000)
1183                     if "P" in p.sprintf("%BFD.flags%"):
1184                         final = self.test_session.create_packet()
1185                         final[BFD].flags = "F"
1186                         self.test_session.send_packet(final)
1187                 else:
1188                     raise Exception(ppp("Received unknown packet:", p))
1189
1190                 self.assert_equal(len(self.vapi.collect_events()), 0,
1191                                   "number of bfd events")
1192             self.test_session.send_packet()
1193         self.assertTrue(echo_seen, "No echo packets received")
1194
1195     def test_echo_fail(self):
1196         """ session goes down if echo function fails """
1197         bfd_session_up(self)
1198         self.test_session.update(required_min_echo_rx=150000)
1199         self.test_session.send_packet()
1200         detection_time = self.test_session.detect_mult *\
1201             self.vpp_session.required_min_rx / USEC_IN_SEC
1202         self.vapi.bfd_udp_set_echo_source(
1203             sw_if_index=self.loopback0.sw_if_index)
1204         # echo function should be used now, but we will drop the echo packets
1205         verified_diag = False
1206         for dummy in range(3):
1207             loop_until = time.time() + 0.75 * detection_time
1208             while time.time() < loop_until:
1209                 p = self.pg0.wait_for_packet(1)
1210                 self.logger.debug(ppp("Got packet:", p))
1211                 if p[UDP].dport == BFD.udp_dport_echo:
1212                     # dropped
1213                     pass
1214                 elif p.haslayer(BFD):
1215                     if "P" in p.sprintf("%BFD.flags%"):
1216                         self.assertGreaterEqual(
1217                             p[BFD].required_min_rx_interval,
1218                             1000000)
1219                         final = self.test_session.create_packet()
1220                         final[BFD].flags = "F"
1221                         self.test_session.send_packet(final)
1222                     if p[BFD].state == BFDState.down:
1223                         self.assert_equal(p[BFD].diag,
1224                                           BFDDiagCode.echo_function_failed,
1225                                           BFDDiagCode)
1226                         verified_diag = True
1227                 else:
1228                     raise Exception(ppp("Received unknown packet:", p))
1229             self.test_session.send_packet()
1230         events = self.vapi.collect_events()
1231         self.assert_equal(len(events), 1, "number of bfd events")
1232         self.assert_equal(events[0].state, BFDState.down, BFDState)
1233         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1234
1235     def test_echo_stop(self):
1236         """ echo function stops if peer sets required min echo rx zero """
1237         bfd_session_up(self)
1238         self.test_session.update(required_min_echo_rx=150000)
1239         self.test_session.send_packet()
1240         self.vapi.bfd_udp_set_echo_source(
1241             sw_if_index=self.loopback0.sw_if_index)
1242         # wait for first echo packet
1243         while True:
1244             p = self.pg0.wait_for_packet(1)
1245             self.logger.debug(ppp("Got packet:", p))
1246             if p[UDP].dport == BFD.udp_dport_echo:
1247                 self.logger.debug(ppp("Looping back packet:", p))
1248                 p[Ether].dst = self.pg0.local_mac
1249                 self.pg0.add_stream(p)
1250                 self.pg_start()
1251                 break
1252             elif p.haslayer(BFD):
1253                 # ignore BFD
1254                 pass
1255             else:
1256                 raise Exception(ppp("Received unknown packet:", p))
1257         self.test_session.update(required_min_echo_rx=0)
1258         self.test_session.send_packet()
1259         # echo packets shouldn't arrive anymore
1260         for dummy in range(5):
1261             wait_for_bfd_packet(
1262                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1263             self.test_session.send_packet()
1264             events = self.vapi.collect_events()
1265             self.assert_equal(len(events), 0, "number of bfd events")
1266
1267     def test_echo_source_removed(self):
1268         """ echo function stops if echo source is removed """
1269         bfd_session_up(self)
1270         self.test_session.update(required_min_echo_rx=150000)
1271         self.test_session.send_packet()
1272         self.vapi.bfd_udp_set_echo_source(
1273             sw_if_index=self.loopback0.sw_if_index)
1274         # wait for first echo packet
1275         while True:
1276             p = self.pg0.wait_for_packet(1)
1277             self.logger.debug(ppp("Got packet:", p))
1278             if p[UDP].dport == BFD.udp_dport_echo:
1279                 self.logger.debug(ppp("Looping back packet:", p))
1280                 p[Ether].dst = self.pg0.local_mac
1281                 self.pg0.add_stream(p)
1282                 self.pg_start()
1283                 break
1284             elif p.haslayer(BFD):
1285                 # ignore BFD
1286                 pass
1287             else:
1288                 raise Exception(ppp("Received unknown packet:", p))
1289         self.vapi.bfd_udp_del_echo_source()
1290         self.test_session.send_packet()
1291         # echo packets shouldn't arrive anymore
1292         for dummy in range(5):
1293             wait_for_bfd_packet(
1294                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1295             self.test_session.send_packet()
1296             events = self.vapi.collect_events()
1297             self.assert_equal(len(events), 0, "number of bfd events")
1298
1299     def test_stale_echo(self):
1300         """ stale echo packets don't keep a session up """
1301         bfd_session_up(self)
1302         self.test_session.update(required_min_echo_rx=150000)
1303         self.vapi.bfd_udp_set_echo_source(
1304             sw_if_index=self.loopback0.sw_if_index)
1305         self.test_session.send_packet()
1306         # should be turned on - loopback echo packets
1307         echo_packet = None
1308         timeout_at = None
1309         timeout_ok = False
1310         for dummy in range(10 * self.vpp_session.detect_mult):
1311             p = self.pg0.wait_for_packet(1)
1312             if p[UDP].dport == BFD.udp_dport_echo:
1313                 if echo_packet is None:
1314                     self.logger.debug(ppp("Got first echo packet:", p))
1315                     echo_packet = p
1316                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1317                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1318                 else:
1319                     self.logger.debug(ppp("Got followup echo packet:", p))
1320                 self.logger.debug(ppp("Looping back first echo packet:", p))
1321                 echo_packet[Ether].dst = self.pg0.local_mac
1322                 self.pg0.add_stream(echo_packet)
1323                 self.pg_start()
1324             elif p.haslayer(BFD):
1325                 self.logger.debug(ppp("Got packet:", p))
1326                 if "P" in p.sprintf("%BFD.flags%"):
1327                     final = self.test_session.create_packet()
1328                     final[BFD].flags = "F"
1329                     self.test_session.send_packet(final)
1330                 if p[BFD].state == BFDState.down:
1331                     self.assertIsNotNone(
1332                         timeout_at,
1333                         "Session went down before first echo packet received")
1334                     now = time.time()
1335                     self.assertGreaterEqual(
1336                         now, timeout_at,
1337                         "Session timeout at %s, but is expected at %s" %
1338                         (now, timeout_at))
1339                     self.assert_equal(p[BFD].diag,
1340                                       BFDDiagCode.echo_function_failed,
1341                                       BFDDiagCode)
1342                     events = self.vapi.collect_events()
1343                     self.assert_equal(len(events), 1, "number of bfd events")
1344                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1345                     timeout_ok = True
1346                     break
1347             else:
1348                 raise Exception(ppp("Received unknown packet:", p))
1349             self.test_session.send_packet()
1350         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1351
1352     def test_invalid_echo_checksum(self):
1353         """ echo packets with invalid checksum don't keep a session up """
1354         bfd_session_up(self)
1355         self.test_session.update(required_min_echo_rx=150000)
1356         self.vapi.bfd_udp_set_echo_source(
1357             sw_if_index=self.loopback0.sw_if_index)
1358         self.test_session.send_packet()
1359         # should be turned on - loopback echo packets
1360         timeout_at = None
1361         timeout_ok = False
1362         for dummy in range(10 * self.vpp_session.detect_mult):
1363             p = self.pg0.wait_for_packet(1)
1364             if p[UDP].dport == BFD.udp_dport_echo:
1365                 self.logger.debug(ppp("Got echo packet:", p))
1366                 if timeout_at is None:
1367                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1368                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1369                 p[BFD_vpp_echo].checksum = getrandbits(64)
1370                 p[Ether].dst = self.pg0.local_mac
1371                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1372                 self.pg0.add_stream(p)
1373                 self.pg_start()
1374             elif p.haslayer(BFD):
1375                 self.logger.debug(ppp("Got packet:", p))
1376                 if "P" in p.sprintf("%BFD.flags%"):
1377                     final = self.test_session.create_packet()
1378                     final[BFD].flags = "F"
1379                     self.test_session.send_packet(final)
1380                 if p[BFD].state == BFDState.down:
1381                     self.assertIsNotNone(
1382                         timeout_at,
1383                         "Session went down before first echo packet received")
1384                     now = time.time()
1385                     self.assertGreaterEqual(
1386                         now, timeout_at,
1387                         "Session timeout at %s, but is expected at %s" %
1388                         (now, timeout_at))
1389                     self.assert_equal(p[BFD].diag,
1390                                       BFDDiagCode.echo_function_failed,
1391                                       BFDDiagCode)
1392                     events = self.vapi.collect_events()
1393                     self.assert_equal(len(events), 1, "number of bfd events")
1394                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1395                     timeout_ok = True
1396                     break
1397             else:
1398                 raise Exception(ppp("Received unknown packet:", p))
1399             self.test_session.send_packet()
1400         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1401
1402     def test_admin_up_down(self):
1403         """ put session admin-up and admin-down """
1404         bfd_session_up(self)
1405         self.vpp_session.admin_down()
1406         self.pg0.enable_capture()
1407         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1408         verify_event(self, e, expected_state=BFDState.admin_down)
1409         for dummy in range(2):
1410             p = wait_for_bfd_packet(self)
1411             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1412         # try to bring session up - shouldn't be possible
1413         self.test_session.update(state=BFDState.init)
1414         self.test_session.send_packet()
1415         for dummy in range(2):
1416             p = wait_for_bfd_packet(self)
1417             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1418         self.vpp_session.admin_up()
1419         self.test_session.update(state=BFDState.down)
1420         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1421         verify_event(self, e, expected_state=BFDState.down)
1422         p = wait_for_bfd_packet(
1423             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1424         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1425         self.test_session.send_packet()
1426         p = wait_for_bfd_packet(
1427             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1428         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1429         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1430         verify_event(self, e, expected_state=BFDState.init)
1431         self.test_session.update(state=BFDState.up)
1432         self.test_session.send_packet()
1433         p = wait_for_bfd_packet(
1434             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1435         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1436         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1437         verify_event(self, e, expected_state=BFDState.up)
1438
1439     def test_config_change_remote_demand(self):
1440         """ configuration change while peer in demand mode """
1441         bfd_session_up(self)
1442         demand = self.test_session.create_packet()
1443         demand[BFD].flags = "D"
1444         self.test_session.send_packet(demand)
1445         self.vpp_session.modify_parameters(
1446             required_min_rx=2 * self.vpp_session.required_min_rx)
1447         p = wait_for_bfd_packet(
1448             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1449         # poll bit must be set
1450         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1451         # terminate poll sequence
1452         final = self.test_session.create_packet()
1453         final[BFD].flags = "D+F"
1454         self.test_session.send_packet(final)
1455         # vpp should be quiet now again
1456         transmit_time = 0.9 \
1457             * max(self.vpp_session.required_min_rx,
1458                   self.test_session.desired_min_tx) \
1459             / USEC_IN_SEC
1460         count = 0
1461         for dummy in range(self.test_session.detect_mult * 2):
1462             self.sleep(transmit_time)
1463             self.test_session.send_packet(demand)
1464             try:
1465                 p = wait_for_bfd_packet(self, timeout=0)
1466                 self.logger.error(ppp("Received unexpected packet:", p))
1467                 count += 1
1468             except CaptureTimeoutError:
1469                 pass
1470         events = self.vapi.collect_events()
1471         for e in events:
1472             self.logger.error("Received unexpected event: %s", e)
1473         self.assert_equal(count, 0, "number of packets received")
1474         self.assert_equal(len(events), 0, "number of events received")
1475
1476     def test_intf_deleted(self):
1477         """ interface with bfd session deleted """
1478         intf = VppLoInterface(self)
1479         intf.config_ip4()
1480         intf.admin_up()
1481         sw_if_index = intf.sw_if_index
1482         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1483         vpp_session.add_vpp_config()
1484         vpp_session.admin_up()
1485         intf.remove_vpp_config()
1486         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1487         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1488         self.assertFalse(vpp_session.query_vpp_config())
1489
1490
1491 @tag_run_solo
1492 @tag_fixme_vpp_workers
1493 class BFD6TestCase(VppTestCase):
1494     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1495
1496     pg0 = None
1497     vpp_clock_offset = None
1498     vpp_session = None
1499     test_session = None
1500
1501     @classmethod
1502     def setUpClass(cls):
1503         super(BFD6TestCase, cls).setUpClass()
1504         cls.vapi.cli("set log class bfd level debug")
1505         try:
1506             cls.create_pg_interfaces([0])
1507             cls.pg0.config_ip6()
1508             cls.pg0.configure_ipv6_neighbors()
1509             cls.pg0.admin_up()
1510             cls.pg0.resolve_ndp()
1511             cls.create_loopback_interfaces(1)
1512             cls.loopback0 = cls.lo_interfaces[0]
1513             cls.loopback0.config_ip6()
1514             cls.loopback0.admin_up()
1515
1516         except Exception:
1517             super(BFD6TestCase, cls).tearDownClass()
1518             raise
1519
1520     @classmethod
1521     def tearDownClass(cls):
1522         super(BFD6TestCase, cls).tearDownClass()
1523
1524     def setUp(self):
1525         super(BFD6TestCase, self).setUp()
1526         self.factory = AuthKeyFactory()
1527         self.vapi.want_bfd_events()
1528         self.pg0.enable_capture()
1529         try:
1530             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1531                                                 self.pg0.remote_ip6,
1532                                                 af=AF_INET6)
1533             self.vpp_session.add_vpp_config()
1534             self.vpp_session.admin_up()
1535             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1536             self.logger.debug(self.vapi.cli("show adj nbr"))
1537         except BaseException:
1538             self.vapi.want_bfd_events(enable_disable=0)
1539             raise
1540
1541     def tearDown(self):
1542         if not self.vpp_dead:
1543             self.vapi.want_bfd_events(enable_disable=0)
1544         self.vapi.collect_events()  # clear the event queue
1545         super(BFD6TestCase, self).tearDown()
1546
1547     def test_session_up(self):
1548         """ bring BFD session up """
1549         bfd_session_up(self)
1550
1551     def test_session_up_by_ip(self):
1552         """ bring BFD session up - first frame looked up by address pair """
1553         self.logger.info("BFD: Sending Slow control frame")
1554         self.test_session.update(my_discriminator=randint(0, 40000000))
1555         self.test_session.send_packet()
1556         self.pg0.enable_capture()
1557         p = self.pg0.wait_for_packet(1)
1558         self.assert_equal(p[BFD].your_discriminator,
1559                           self.test_session.my_discriminator,
1560                           "BFD - your discriminator")
1561         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1562         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1563                                  state=BFDState.up)
1564         self.logger.info("BFD: Waiting for event")
1565         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1566         verify_event(self, e, expected_state=BFDState.init)
1567         self.logger.info("BFD: Sending Up")
1568         self.test_session.send_packet()
1569         self.logger.info("BFD: Waiting for event")
1570         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1571         verify_event(self, e, expected_state=BFDState.up)
1572         self.logger.info("BFD: Session is Up")
1573         self.test_session.update(state=BFDState.up)
1574         self.test_session.send_packet()
1575         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1576
1577     def test_hold_up(self):
1578         """ hold BFD session up """
1579         bfd_session_up(self)
1580         for dummy in range(self.test_session.detect_mult * 2):
1581             wait_for_bfd_packet(self)
1582             self.test_session.send_packet()
1583         self.assert_equal(len(self.vapi.collect_events()), 0,
1584                           "number of bfd events")
1585         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1586
1587     def test_echo_looped_back(self):
1588         """ echo packets looped back """
1589         # don't need a session in this case..
1590         self.vpp_session.remove_vpp_config()
1591         self.pg0.enable_capture()
1592         echo_packet_count = 10
1593         # random source port low enough to increment a few times..
1594         udp_sport_tx = randint(1, 50000)
1595         udp_sport_rx = udp_sport_tx
1596         echo_packet = (Ether(src=self.pg0.remote_mac,
1597                              dst=self.pg0.local_mac) /
1598                        IPv6(src=self.pg0.remote_ip6,
1599                             dst=self.pg0.remote_ip6) /
1600                        UDP(dport=BFD.udp_dport_echo) /
1601                        Raw("this should be looped back"))
1602         for dummy in range(echo_packet_count):
1603             self.sleep(.01, "delay between echo packets")
1604             echo_packet[UDP].sport = udp_sport_tx
1605             udp_sport_tx += 1
1606             self.logger.debug(ppp("Sending packet:", echo_packet))
1607             self.pg0.add_stream(echo_packet)
1608             self.pg_start()
1609         for dummy in range(echo_packet_count):
1610             p = self.pg0.wait_for_packet(1)
1611             self.logger.debug(ppp("Got packet:", p))
1612             ether = p[Ether]
1613             self.assert_equal(self.pg0.remote_mac,
1614                               ether.dst, "Destination MAC")
1615             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1616             ip = p[IPv6]
1617             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1618             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1619             udp = p[UDP]
1620             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1621                               "UDP destination port")
1622             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1623             udp_sport_rx += 1
1624             # need to compare the hex payload here, otherwise BFD_vpp_echo
1625             # gets in way
1626             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1627                              scapy.compat.raw(echo_packet[UDP].payload),
1628                              "Received packet is not the echo packet sent")
1629         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1630                           "ECHO packet identifier for test purposes)")
1631         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1632                           "ECHO packet identifier for test purposes)")
1633
1634     def test_echo(self):
1635         """ echo function """
1636         bfd_session_up(self)
1637         self.test_session.update(required_min_echo_rx=150000)
1638         self.test_session.send_packet()
1639         detection_time = self.test_session.detect_mult *\
1640             self.vpp_session.required_min_rx / USEC_IN_SEC
1641         # echo shouldn't work without echo source set
1642         for dummy in range(10):
1643             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1644             self.sleep(sleep, "delay before sending bfd packet")
1645             self.test_session.send_packet()
1646         p = wait_for_bfd_packet(
1647             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1648         self.assert_equal(p[BFD].required_min_rx_interval,
1649                           self.vpp_session.required_min_rx,
1650                           "BFD required min rx interval")
1651         self.test_session.send_packet()
1652         self.vapi.bfd_udp_set_echo_source(
1653             sw_if_index=self.loopback0.sw_if_index)
1654         echo_seen = False
1655         # should be turned on - loopback echo packets
1656         for dummy in range(3):
1657             loop_until = time.time() + 0.75 * detection_time
1658             while time.time() < loop_until:
1659                 p = self.pg0.wait_for_packet(1)
1660                 self.logger.debug(ppp("Got packet:", p))
1661                 if p[UDP].dport == BFD.udp_dport_echo:
1662                     self.assert_equal(
1663                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1664                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1665                                         "BFD ECHO src IP equal to loopback IP")
1666                     self.logger.debug(ppp("Looping back packet:", p))
1667                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1668                                       "ECHO packet destination MAC address")
1669                     p[Ether].dst = self.pg0.local_mac
1670                     self.pg0.add_stream(p)
1671                     self.pg_start()
1672                     echo_seen = True
1673                 elif p.haslayer(BFD):
1674                     if echo_seen:
1675                         self.assertGreaterEqual(
1676                             p[BFD].required_min_rx_interval,
1677                             1000000)
1678                     if "P" in p.sprintf("%BFD.flags%"):
1679                         final = self.test_session.create_packet()
1680                         final[BFD].flags = "F"
1681                         self.test_session.send_packet(final)
1682                 else:
1683                     raise Exception(ppp("Received unknown packet:", p))
1684
1685                 self.assert_equal(len(self.vapi.collect_events()), 0,
1686                                   "number of bfd events")
1687             self.test_session.send_packet()
1688         self.assertTrue(echo_seen, "No echo packets received")
1689
1690     def test_intf_deleted(self):
1691         """ interface with bfd session deleted """
1692         intf = VppLoInterface(self)
1693         intf.config_ip6()
1694         intf.admin_up()
1695         sw_if_index = intf.sw_if_index
1696         vpp_session = VppBFDUDPSession(
1697             self, intf, intf.remote_ip6, af=AF_INET6)
1698         vpp_session.add_vpp_config()
1699         vpp_session.admin_up()
1700         intf.remove_vpp_config()
1701         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1702         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1703         self.assertFalse(vpp_session.query_vpp_config())
1704
1705
1706 @tag_run_solo
1707 class BFDFIBTestCase(VppTestCase):
1708     """ BFD-FIB interactions (IPv6) """
1709
1710     vpp_session = None
1711     test_session = None
1712
1713     @classmethod
1714     def setUpClass(cls):
1715         super(BFDFIBTestCase, cls).setUpClass()
1716
1717     @classmethod
1718     def tearDownClass(cls):
1719         super(BFDFIBTestCase, cls).tearDownClass()
1720
1721     def setUp(self):
1722         super(BFDFIBTestCase, self).setUp()
1723         self.create_pg_interfaces(range(1))
1724
1725         self.vapi.want_bfd_events()
1726         self.pg0.enable_capture()
1727
1728         for i in self.pg_interfaces:
1729             i.admin_up()
1730             i.config_ip6()
1731             i.configure_ipv6_neighbors()
1732
1733     def tearDown(self):
1734         if not self.vpp_dead:
1735             self.vapi.want_bfd_events(enable_disable=False)
1736
1737         super(BFDFIBTestCase, self).tearDown()
1738
1739     @staticmethod
1740     def pkt_is_not_data_traffic(p):
1741         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1742         if p.haslayer(BFD) or is_ipv6_misc(p):
1743             return True
1744         return False
1745
1746     def test_session_with_fib(self):
1747         """ BFD-FIB interactions """
1748
1749         # packets to match against both of the routes
1750         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1751               IPv6(src="3001::1", dst="2001::1") /
1752               UDP(sport=1234, dport=1234) /
1753               Raw(b'\xa5' * 100)),
1754              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1755               IPv6(src="3001::1", dst="2002::1") /
1756               UDP(sport=1234, dport=1234) /
1757               Raw(b'\xa5' * 100))]
1758
1759         # A recursive and a non-recursive route via a next-hop that
1760         # will have a BFD session
1761         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1762                                   [VppRoutePath(self.pg0.remote_ip6,
1763                                                 self.pg0.sw_if_index)])
1764         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1765                                   [VppRoutePath(self.pg0.remote_ip6,
1766                                                 0xffffffff)])
1767         ip_2001_s_64.add_vpp_config()
1768         ip_2002_s_64.add_vpp_config()
1769
1770         # bring the session up now the routes are present
1771         self.vpp_session = VppBFDUDPSession(self,
1772                                             self.pg0,
1773                                             self.pg0.remote_ip6,
1774                                             af=AF_INET6)
1775         self.vpp_session.add_vpp_config()
1776         self.vpp_session.admin_up()
1777         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1778
1779         # session is up - traffic passes
1780         bfd_session_up(self)
1781
1782         self.pg0.add_stream(p)
1783         self.pg_start()
1784         for packet in p:
1785             captured = self.pg0.wait_for_packet(
1786                 1,
1787                 filter_out_fn=self.pkt_is_not_data_traffic)
1788             self.assertEqual(captured[IPv6].dst,
1789                              packet[IPv6].dst)
1790
1791         # session is up - traffic is dropped
1792         bfd_session_down(self)
1793
1794         self.pg0.add_stream(p)
1795         self.pg_start()
1796         with self.assertRaises(CaptureTimeoutError):
1797             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1798
1799         # session is up - traffic passes
1800         bfd_session_up(self)
1801
1802         self.pg0.add_stream(p)
1803         self.pg_start()
1804         for packet in p:
1805             captured = self.pg0.wait_for_packet(
1806                 1,
1807                 filter_out_fn=self.pkt_is_not_data_traffic)
1808             self.assertEqual(captured[IPv6].dst,
1809                              packet[IPv6].dst)
1810
1811
1812 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1813 class BFDTunTestCase(VppTestCase):
1814     """ BFD over GRE tunnel """
1815
1816     vpp_session = None
1817     test_session = None
1818
1819     @classmethod
1820     def setUpClass(cls):
1821         super(BFDTunTestCase, cls).setUpClass()
1822
1823     @classmethod
1824     def tearDownClass(cls):
1825         super(BFDTunTestCase, cls).tearDownClass()
1826
1827     def setUp(self):
1828         super(BFDTunTestCase, self).setUp()
1829         self.create_pg_interfaces(range(1))
1830
1831         self.vapi.want_bfd_events()
1832         self.pg0.enable_capture()
1833
1834         for i in self.pg_interfaces:
1835             i.admin_up()
1836             i.config_ip4()
1837             i.resolve_arp()
1838
1839     def tearDown(self):
1840         if not self.vpp_dead:
1841             self.vapi.want_bfd_events(enable_disable=0)
1842
1843         super(BFDTunTestCase, self).tearDown()
1844
1845     @staticmethod
1846     def pkt_is_not_data_traffic(p):
1847         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1848         if p.haslayer(BFD) or is_ipv6_misc(p):
1849             return True
1850         return False
1851
1852     def test_bfd_o_gre(self):
1853         """ BFD-o-GRE  """
1854
1855         # A GRE interface over which to run a BFD session
1856         gre_if = VppGreInterface(self,
1857                                  self.pg0.local_ip4,
1858                                  self.pg0.remote_ip4)
1859         gre_if.add_vpp_config()
1860         gre_if.admin_up()
1861         gre_if.config_ip4()
1862
1863         # bring the session up now the routes are present
1864         self.vpp_session = VppBFDUDPSession(self,
1865                                             gre_if,
1866                                             gre_if.remote_ip4,
1867                                             is_tunnel=True)
1868         self.vpp_session.add_vpp_config()
1869         self.vpp_session.admin_up()
1870
1871         self.test_session = BFDTestSession(
1872             self, gre_if, AF_INET,
1873             tunnel_header=(IP(src=self.pg0.remote_ip4,
1874                               dst=self.pg0.local_ip4) /
1875                            GRE()),
1876             phy_interface=self.pg0)
1877
1878         # packets to match against both of the routes
1879         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1880               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1881               UDP(sport=1234, dport=1234) /
1882               Raw(b'\xa5' * 100))]
1883
1884         # session is up - traffic passes
1885         bfd_session_up(self)
1886
1887         self.send_and_expect(self.pg0, p, self.pg0)
1888
1889         # bring session down
1890         bfd_session_down(self)
1891
1892
1893 @tag_run_solo
1894 class BFDSHA1TestCase(VppTestCase):
1895     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1896
1897     pg0 = None
1898     vpp_clock_offset = None
1899     vpp_session = None
1900     test_session = None
1901
1902     @classmethod
1903     def setUpClass(cls):
1904         super(BFDSHA1TestCase, cls).setUpClass()
1905         cls.vapi.cli("set log class bfd level debug")
1906         try:
1907             cls.create_pg_interfaces([0])
1908             cls.pg0.config_ip4()
1909             cls.pg0.admin_up()
1910             cls.pg0.resolve_arp()
1911
1912         except Exception:
1913             super(BFDSHA1TestCase, cls).tearDownClass()
1914             raise
1915
1916     @classmethod
1917     def tearDownClass(cls):
1918         super(BFDSHA1TestCase, cls).tearDownClass()
1919
1920     def setUp(self):
1921         super(BFDSHA1TestCase, self).setUp()
1922         self.factory = AuthKeyFactory()
1923         self.vapi.want_bfd_events()
1924         self.pg0.enable_capture()
1925
1926     def tearDown(self):
1927         if not self.vpp_dead:
1928             self.vapi.want_bfd_events(enable_disable=False)
1929         self.vapi.collect_events()  # clear the event queue
1930         super(BFDSHA1TestCase, self).tearDown()
1931
1932     def test_session_up(self):
1933         """ bring BFD session up """
1934         key = self.factory.create_random_key(self)
1935         key.add_vpp_config()
1936         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1937                                             self.pg0.remote_ip4,
1938                                             sha1_key=key)
1939         self.vpp_session.add_vpp_config()
1940         self.vpp_session.admin_up()
1941         self.test_session = BFDTestSession(
1942             self, self.pg0, AF_INET, sha1_key=key,
1943             bfd_key_id=self.vpp_session.bfd_key_id)
1944         bfd_session_up(self)
1945
1946     def test_hold_up(self):
1947         """ hold BFD session up """
1948         key = self.factory.create_random_key(self)
1949         key.add_vpp_config()
1950         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1951                                             self.pg0.remote_ip4,
1952                                             sha1_key=key)
1953         self.vpp_session.add_vpp_config()
1954         self.vpp_session.admin_up()
1955         self.test_session = BFDTestSession(
1956             self, self.pg0, AF_INET, sha1_key=key,
1957             bfd_key_id=self.vpp_session.bfd_key_id)
1958         bfd_session_up(self)
1959         for dummy in range(self.test_session.detect_mult * 2):
1960             wait_for_bfd_packet(self)
1961             self.test_session.send_packet()
1962         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1963
1964     def test_hold_up_meticulous(self):
1965         """ hold BFD session up - meticulous auth """
1966         key = self.factory.create_random_key(
1967             self, BFDAuthType.meticulous_keyed_sha1)
1968         key.add_vpp_config()
1969         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1970                                             self.pg0.remote_ip4, sha1_key=key)
1971         self.vpp_session.add_vpp_config()
1972         self.vpp_session.admin_up()
1973         # specify sequence number so that it wraps
1974         self.test_session = BFDTestSession(
1975             self, self.pg0, AF_INET, sha1_key=key,
1976             bfd_key_id=self.vpp_session.bfd_key_id,
1977             our_seq_number=0xFFFFFFFF - 4)
1978         bfd_session_up(self)
1979         for dummy in range(30):
1980             wait_for_bfd_packet(self)
1981             self.test_session.inc_seq_num()
1982             self.test_session.send_packet()
1983         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1984
1985     def test_send_bad_seq_number(self):
1986         """ session is not kept alive by msgs with bad sequence numbers"""
1987         key = self.factory.create_random_key(
1988             self, BFDAuthType.meticulous_keyed_sha1)
1989         key.add_vpp_config()
1990         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1991                                             self.pg0.remote_ip4, sha1_key=key)
1992         self.vpp_session.add_vpp_config()
1993         self.test_session = BFDTestSession(
1994             self, self.pg0, AF_INET, sha1_key=key,
1995             bfd_key_id=self.vpp_session.bfd_key_id)
1996         bfd_session_up(self)
1997         detection_time = self.test_session.detect_mult *\
1998             self.vpp_session.required_min_rx / USEC_IN_SEC
1999         send_until = time.time() + 2 * detection_time
2000         while time.time() < send_until:
2001             self.test_session.send_packet()
2002             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2003                        "time between bfd packets")
2004         e = self.vapi.collect_events()
2005         # session should be down now, because the sequence numbers weren't
2006         # updated
2007         self.assert_equal(len(e), 1, "number of bfd events")
2008         verify_event(self, e[0], expected_state=BFDState.down)
2009
2010     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2011                                        legitimate_test_session,
2012                                        rogue_test_session,
2013                                        rogue_bfd_values=None):
2014         """ execute a rogue session interaction scenario
2015
2016         1. create vpp session, add config
2017         2. bring the legitimate session up
2018         3. copy the bfd values from legitimate session to rogue session
2019         4. apply rogue_bfd_values to rogue session
2020         5. set rogue session state to down
2021         6. send message to take the session down from the rogue session
2022         7. assert that the legitimate session is unaffected
2023         """
2024
2025         self.vpp_session = vpp_bfd_udp_session
2026         self.vpp_session.add_vpp_config()
2027         self.test_session = legitimate_test_session
2028         # bring vpp session up
2029         bfd_session_up(self)
2030         # send packet from rogue session
2031         rogue_test_session.update(
2032             my_discriminator=self.test_session.my_discriminator,
2033             your_discriminator=self.test_session.your_discriminator,
2034             desired_min_tx=self.test_session.desired_min_tx,
2035             required_min_rx=self.test_session.required_min_rx,
2036             detect_mult=self.test_session.detect_mult,
2037             diag=self.test_session.diag,
2038             state=self.test_session.state,
2039             auth_type=self.test_session.auth_type)
2040         if rogue_bfd_values:
2041             rogue_test_session.update(**rogue_bfd_values)
2042         rogue_test_session.update(state=BFDState.down)
2043         rogue_test_session.send_packet()
2044         wait_for_bfd_packet(self)
2045         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2046
2047     def test_mismatch_auth(self):
2048         """ session is not brought down by unauthenticated msg """
2049         key = self.factory.create_random_key(self)
2050         key.add_vpp_config()
2051         vpp_session = VppBFDUDPSession(
2052             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2053         legitimate_test_session = BFDTestSession(
2054             self, self.pg0, AF_INET, sha1_key=key,
2055             bfd_key_id=vpp_session.bfd_key_id)
2056         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2057         self.execute_rogue_session_scenario(vpp_session,
2058                                             legitimate_test_session,
2059                                             rogue_test_session)
2060
2061     def test_mismatch_bfd_key_id(self):
2062         """ session is not brought down by msg with non-existent key-id """
2063         key = self.factory.create_random_key(self)
2064         key.add_vpp_config()
2065         vpp_session = VppBFDUDPSession(
2066             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2067         # pick a different random bfd key id
2068         x = randint(0, 255)
2069         while x == vpp_session.bfd_key_id:
2070             x = randint(0, 255)
2071         legitimate_test_session = BFDTestSession(
2072             self, self.pg0, AF_INET, sha1_key=key,
2073             bfd_key_id=vpp_session.bfd_key_id)
2074         rogue_test_session = BFDTestSession(
2075             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2076         self.execute_rogue_session_scenario(vpp_session,
2077                                             legitimate_test_session,
2078                                             rogue_test_session)
2079
2080     def test_mismatched_auth_type(self):
2081         """ session is not brought down by msg with wrong auth type """
2082         key = self.factory.create_random_key(self)
2083         key.add_vpp_config()
2084         vpp_session = VppBFDUDPSession(
2085             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2086         legitimate_test_session = BFDTestSession(
2087             self, self.pg0, AF_INET, sha1_key=key,
2088             bfd_key_id=vpp_session.bfd_key_id)
2089         rogue_test_session = BFDTestSession(
2090             self, self.pg0, AF_INET, sha1_key=key,
2091             bfd_key_id=vpp_session.bfd_key_id)
2092         self.execute_rogue_session_scenario(
2093             vpp_session, legitimate_test_session, rogue_test_session,
2094             {'auth_type': BFDAuthType.keyed_md5})
2095
2096     def test_restart(self):
2097         """ simulate remote peer restart and resynchronization """
2098         key = self.factory.create_random_key(
2099             self, BFDAuthType.meticulous_keyed_sha1)
2100         key.add_vpp_config()
2101         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2102                                             self.pg0.remote_ip4, sha1_key=key)
2103         self.vpp_session.add_vpp_config()
2104         self.test_session = BFDTestSession(
2105             self, self.pg0, AF_INET, sha1_key=key,
2106             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2107         bfd_session_up(self)
2108         # don't send any packets for 2*detection_time
2109         detection_time = self.test_session.detect_mult *\
2110             self.vpp_session.required_min_rx / USEC_IN_SEC
2111         self.sleep(2 * detection_time, "simulating peer restart")
2112         events = self.vapi.collect_events()
2113         self.assert_equal(len(events), 1, "number of bfd events")
2114         verify_event(self, events[0], expected_state=BFDState.down)
2115         self.test_session.update(state=BFDState.down)
2116         # reset sequence number
2117         self.test_session.our_seq_number = 0
2118         self.test_session.vpp_seq_number = None
2119         # now throw away any pending packets
2120         self.pg0.enable_capture()
2121         self.test_session.my_discriminator = 0
2122         bfd_session_up(self)
2123
2124
2125 @tag_run_solo
2126 class BFDAuthOnOffTestCase(VppTestCase):
2127     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2128
2129     pg0 = None
2130     vpp_session = None
2131     test_session = None
2132
2133     @classmethod
2134     def setUpClass(cls):
2135         super(BFDAuthOnOffTestCase, cls).setUpClass()
2136         cls.vapi.cli("set log class bfd level debug")
2137         try:
2138             cls.create_pg_interfaces([0])
2139             cls.pg0.config_ip4()
2140             cls.pg0.admin_up()
2141             cls.pg0.resolve_arp()
2142
2143         except Exception:
2144             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2145             raise
2146
2147     @classmethod
2148     def tearDownClass(cls):
2149         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2150
2151     def setUp(self):
2152         super(BFDAuthOnOffTestCase, self).setUp()
2153         self.factory = AuthKeyFactory()
2154         self.vapi.want_bfd_events()
2155         self.pg0.enable_capture()
2156
2157     def tearDown(self):
2158         if not self.vpp_dead:
2159             self.vapi.want_bfd_events(enable_disable=False)
2160         self.vapi.collect_events()  # clear the event queue
2161         super(BFDAuthOnOffTestCase, self).tearDown()
2162
2163     def test_auth_on_immediate(self):
2164         """ turn auth on without disturbing session state (immediate) """
2165         key = self.factory.create_random_key(self)
2166         key.add_vpp_config()
2167         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2168                                             self.pg0.remote_ip4)
2169         self.vpp_session.add_vpp_config()
2170         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2171         bfd_session_up(self)
2172         for dummy in range(self.test_session.detect_mult * 2):
2173             p = wait_for_bfd_packet(self)
2174             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2175             self.test_session.send_packet()
2176         self.vpp_session.activate_auth(key)
2177         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2178         self.test_session.sha1_key = key
2179         for dummy in range(self.test_session.detect_mult * 2):
2180             p = wait_for_bfd_packet(self)
2181             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2182             self.test_session.send_packet()
2183         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2184         self.assert_equal(len(self.vapi.collect_events()), 0,
2185                           "number of bfd events")
2186
2187     def test_auth_off_immediate(self):
2188         """ turn auth off without disturbing session state (immediate) """
2189         key = self.factory.create_random_key(self)
2190         key.add_vpp_config()
2191         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2192                                             self.pg0.remote_ip4, sha1_key=key)
2193         self.vpp_session.add_vpp_config()
2194         self.test_session = BFDTestSession(
2195             self, self.pg0, AF_INET, sha1_key=key,
2196             bfd_key_id=self.vpp_session.bfd_key_id)
2197         bfd_session_up(self)
2198         # self.vapi.want_bfd_events(enable_disable=0)
2199         for dummy in range(self.test_session.detect_mult * 2):
2200             p = wait_for_bfd_packet(self)
2201             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2202             self.test_session.inc_seq_num()
2203             self.test_session.send_packet()
2204         self.vpp_session.deactivate_auth()
2205         self.test_session.bfd_key_id = None
2206         self.test_session.sha1_key = None
2207         for dummy in range(self.test_session.detect_mult * 2):
2208             p = wait_for_bfd_packet(self)
2209             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2210             self.test_session.inc_seq_num()
2211             self.test_session.send_packet()
2212         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2213         self.assert_equal(len(self.vapi.collect_events()), 0,
2214                           "number of bfd events")
2215
2216     def test_auth_change_key_immediate(self):
2217         """ change auth key without disturbing session state (immediate) """
2218         key1 = self.factory.create_random_key(self)
2219         key1.add_vpp_config()
2220         key2 = self.factory.create_random_key(self)
2221         key2.add_vpp_config()
2222         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2223                                             self.pg0.remote_ip4, sha1_key=key1)
2224         self.vpp_session.add_vpp_config()
2225         self.test_session = BFDTestSession(
2226             self, self.pg0, AF_INET, sha1_key=key1,
2227             bfd_key_id=self.vpp_session.bfd_key_id)
2228         bfd_session_up(self)
2229         for dummy in range(self.test_session.detect_mult * 2):
2230             p = wait_for_bfd_packet(self)
2231             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2232             self.test_session.send_packet()
2233         self.vpp_session.activate_auth(key2)
2234         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2235         self.test_session.sha1_key = key2
2236         for dummy in range(self.test_session.detect_mult * 2):
2237             p = wait_for_bfd_packet(self)
2238             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2239             self.test_session.send_packet()
2240         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2241         self.assert_equal(len(self.vapi.collect_events()), 0,
2242                           "number of bfd events")
2243
2244     def test_auth_on_delayed(self):
2245         """ turn auth on without disturbing session state (delayed) """
2246         key = self.factory.create_random_key(self)
2247         key.add_vpp_config()
2248         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2249                                             self.pg0.remote_ip4)
2250         self.vpp_session.add_vpp_config()
2251         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2252         bfd_session_up(self)
2253         for dummy in range(self.test_session.detect_mult * 2):
2254             wait_for_bfd_packet(self)
2255             self.test_session.send_packet()
2256         self.vpp_session.activate_auth(key, delayed=True)
2257         for dummy in range(self.test_session.detect_mult * 2):
2258             p = wait_for_bfd_packet(self)
2259             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2260             self.test_session.send_packet()
2261         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2262         self.test_session.sha1_key = key
2263         self.test_session.send_packet()
2264         for dummy in range(self.test_session.detect_mult * 2):
2265             p = wait_for_bfd_packet(self)
2266             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2267             self.test_session.send_packet()
2268         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2269         self.assert_equal(len(self.vapi.collect_events()), 0,
2270                           "number of bfd events")
2271
2272     def test_auth_off_delayed(self):
2273         """ turn auth off without disturbing session state (delayed) """
2274         key = self.factory.create_random_key(self)
2275         key.add_vpp_config()
2276         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2277                                             self.pg0.remote_ip4, sha1_key=key)
2278         self.vpp_session.add_vpp_config()
2279         self.test_session = BFDTestSession(
2280             self, self.pg0, AF_INET, sha1_key=key,
2281             bfd_key_id=self.vpp_session.bfd_key_id)
2282         bfd_session_up(self)
2283         for dummy in range(self.test_session.detect_mult * 2):
2284             p = wait_for_bfd_packet(self)
2285             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2286             self.test_session.send_packet()
2287         self.vpp_session.deactivate_auth(delayed=True)
2288         for dummy in range(self.test_session.detect_mult * 2):
2289             p = wait_for_bfd_packet(self)
2290             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2291             self.test_session.send_packet()
2292         self.test_session.bfd_key_id = None
2293         self.test_session.sha1_key = None
2294         self.test_session.send_packet()
2295         for dummy in range(self.test_session.detect_mult * 2):
2296             p = wait_for_bfd_packet(self)
2297             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2298             self.test_session.send_packet()
2299         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2300         self.assert_equal(len(self.vapi.collect_events()), 0,
2301                           "number of bfd events")
2302
2303     def test_auth_change_key_delayed(self):
2304         """ change auth key without disturbing session state (delayed) """
2305         key1 = self.factory.create_random_key(self)
2306         key1.add_vpp_config()
2307         key2 = self.factory.create_random_key(self)
2308         key2.add_vpp_config()
2309         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2310                                             self.pg0.remote_ip4, sha1_key=key1)
2311         self.vpp_session.add_vpp_config()
2312         self.vpp_session.admin_up()
2313         self.test_session = BFDTestSession(
2314             self, self.pg0, AF_INET, sha1_key=key1,
2315             bfd_key_id=self.vpp_session.bfd_key_id)
2316         bfd_session_up(self)
2317         for dummy in range(self.test_session.detect_mult * 2):
2318             p = wait_for_bfd_packet(self)
2319             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2320             self.test_session.send_packet()
2321         self.vpp_session.activate_auth(key2, delayed=True)
2322         for dummy in range(self.test_session.detect_mult * 2):
2323             p = wait_for_bfd_packet(self)
2324             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2325             self.test_session.send_packet()
2326         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2327         self.test_session.sha1_key = key2
2328         self.test_session.send_packet()
2329         for dummy in range(self.test_session.detect_mult * 2):
2330             p = wait_for_bfd_packet(self)
2331             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2332             self.test_session.send_packet()
2333         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2334         self.assert_equal(len(self.vapi.collect_events()), 0,
2335                           "number of bfd events")
2336
2337
2338 @tag_run_solo
2339 class BFDCLITestCase(VppTestCase):
2340     """Bidirectional Forwarding Detection (BFD) (CLI) """
2341     pg0 = None
2342
2343     @classmethod
2344     def setUpClass(cls):
2345         super(BFDCLITestCase, cls).setUpClass()
2346         cls.vapi.cli("set log class bfd level debug")
2347         try:
2348             cls.create_pg_interfaces((0,))
2349             cls.pg0.config_ip4()
2350             cls.pg0.config_ip6()
2351             cls.pg0.resolve_arp()
2352             cls.pg0.resolve_ndp()
2353
2354         except Exception:
2355             super(BFDCLITestCase, cls).tearDownClass()
2356             raise
2357
2358     @classmethod
2359     def tearDownClass(cls):
2360         super(BFDCLITestCase, cls).tearDownClass()
2361
2362     def setUp(self):
2363         super(BFDCLITestCase, self).setUp()
2364         self.factory = AuthKeyFactory()
2365         self.pg0.enable_capture()
2366
2367     def tearDown(self):
2368         try:
2369             self.vapi.want_bfd_events(enable_disable=False)
2370         except UnexpectedApiReturnValueError:
2371             # some tests aren't subscribed, so this is not an issue
2372             pass
2373         self.vapi.collect_events()  # clear the event queue
2374         super(BFDCLITestCase, self).tearDown()
2375
2376     def cli_verify_no_response(self, cli):
2377         """ execute a CLI, asserting that the response is empty """
2378         self.assert_equal(self.vapi.cli(cli),
2379                           "",
2380                           "CLI command response")
2381
2382     def cli_verify_response(self, cli, expected):
2383         """ execute a CLI, asserting that the response matches expectation """
2384         try:
2385             reply = self.vapi.cli(cli)
2386         except CliFailedCommandError as cli_error:
2387             reply = str(cli_error)
2388         self.assert_equal(reply.strip(),
2389                           expected,
2390                           "CLI command response")
2391
2392     def test_show(self):
2393         """ show commands """
2394         k1 = self.factory.create_random_key(self)
2395         k1.add_vpp_config()
2396         k2 = self.factory.create_random_key(
2397             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2398         k2.add_vpp_config()
2399         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2400         s1.add_vpp_config()
2401         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2402                               sha1_key=k2)
2403         s2.add_vpp_config()
2404         self.logger.info(self.vapi.ppcli("show bfd keys"))
2405         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2406         self.logger.info(self.vapi.ppcli("show bfd"))
2407
2408     def test_set_del_sha1_key(self):
2409         """ set/delete SHA1 auth key """
2410         k = self.factory.create_random_key(self)
2411         self.registry.register(k, self.logger)
2412         self.cli_verify_no_response(
2413             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2414             (k.conf_key_id,
2415                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2416         self.assertTrue(k.query_vpp_config())
2417         self.vpp_session = VppBFDUDPSession(
2418             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2419         self.vpp_session.add_vpp_config()
2420         self.test_session = \
2421             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2422                            bfd_key_id=self.vpp_session.bfd_key_id)
2423         self.vapi.want_bfd_events()
2424         bfd_session_up(self)
2425         bfd_session_down(self)
2426         # try to replace the secret for the key - should fail because the key
2427         # is in-use
2428         k2 = self.factory.create_random_key(self)
2429         self.cli_verify_response(
2430             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2431             (k.conf_key_id,
2432                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2433             "bfd key set: `bfd_auth_set_key' API call failed, "
2434             "rv=-103:BFD object in use")
2435         # manipulating the session using old secret should still work
2436         bfd_session_up(self)
2437         bfd_session_down(self)
2438         self.vpp_session.remove_vpp_config()
2439         self.cli_verify_no_response(
2440             "bfd key del conf-key-id %s" % k.conf_key_id)
2441         self.assertFalse(k.query_vpp_config())
2442
2443     def test_set_del_meticulous_sha1_key(self):
2444         """ set/delete meticulous SHA1 auth key """
2445         k = self.factory.create_random_key(
2446             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2447         self.registry.register(k, self.logger)
2448         self.cli_verify_no_response(
2449             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2450             (k.conf_key_id,
2451                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2452         self.assertTrue(k.query_vpp_config())
2453         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2454                                             self.pg0.remote_ip6, af=AF_INET6,
2455                                             sha1_key=k)
2456         self.vpp_session.add_vpp_config()
2457         self.vpp_session.admin_up()
2458         self.test_session = \
2459             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2460                            bfd_key_id=self.vpp_session.bfd_key_id)
2461         self.vapi.want_bfd_events()
2462         bfd_session_up(self)
2463         bfd_session_down(self)
2464         # try to replace the secret for the key - should fail because the key
2465         # is in-use
2466         k2 = self.factory.create_random_key(self)
2467         self.cli_verify_response(
2468             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2469             (k.conf_key_id,
2470                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2471             "bfd key set: `bfd_auth_set_key' API call failed, "
2472             "rv=-103:BFD object in use")
2473         # manipulating the session using old secret should still work
2474         bfd_session_up(self)
2475         bfd_session_down(self)
2476         self.vpp_session.remove_vpp_config()
2477         self.cli_verify_no_response(
2478             "bfd key del conf-key-id %s" % k.conf_key_id)
2479         self.assertFalse(k.query_vpp_config())
2480
2481     def test_add_mod_del_bfd_udp(self):
2482         """ create/modify/delete IPv4 BFD UDP session """
2483         vpp_session = VppBFDUDPSession(
2484             self, self.pg0, self.pg0.remote_ip4)
2485         self.registry.register(vpp_session, self.logger)
2486         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2487             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2488             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2489                                 self.pg0.remote_ip4,
2490                                 vpp_session.desired_min_tx,
2491                                 vpp_session.required_min_rx,
2492                                 vpp_session.detect_mult)
2493         self.cli_verify_no_response(cli_add_cmd)
2494         # 2nd add should fail
2495         self.cli_verify_response(
2496             cli_add_cmd,
2497             "bfd udp session add: `bfd_add_add_session' API call"
2498             " failed, rv=-101:Duplicate BFD object")
2499         verify_bfd_session_config(self, vpp_session)
2500         mod_session = VppBFDUDPSession(
2501             self, self.pg0, self.pg0.remote_ip4,
2502             required_min_rx=2 * vpp_session.required_min_rx,
2503             desired_min_tx=3 * vpp_session.desired_min_tx,
2504             detect_mult=4 * vpp_session.detect_mult)
2505         self.cli_verify_no_response(
2506             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2507             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2508             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2509              mod_session.desired_min_tx, mod_session.required_min_rx,
2510              mod_session.detect_mult))
2511         verify_bfd_session_config(self, mod_session)
2512         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2513             "peer-addr %s" % (self.pg0.name,
2514                               self.pg0.local_ip4, self.pg0.remote_ip4)
2515         self.cli_verify_no_response(cli_del_cmd)
2516         # 2nd del is expected to fail
2517         self.cli_verify_response(
2518             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2519             " failed, rv=-102:No such BFD object")
2520         self.assertFalse(vpp_session.query_vpp_config())
2521
2522     def test_add_mod_del_bfd_udp6(self):
2523         """ create/modify/delete IPv6 BFD UDP session """
2524         vpp_session = VppBFDUDPSession(
2525             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2526         self.registry.register(vpp_session, self.logger)
2527         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2528             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2529             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2530                                 self.pg0.remote_ip6,
2531                                 vpp_session.desired_min_tx,
2532                                 vpp_session.required_min_rx,
2533                                 vpp_session.detect_mult)
2534         self.cli_verify_no_response(cli_add_cmd)
2535         # 2nd add should fail
2536         self.cli_verify_response(
2537             cli_add_cmd,
2538             "bfd udp session add: `bfd_add_add_session' API call"
2539             " failed, rv=-101:Duplicate BFD object")
2540         verify_bfd_session_config(self, vpp_session)
2541         mod_session = VppBFDUDPSession(
2542             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2543             required_min_rx=2 * vpp_session.required_min_rx,
2544             desired_min_tx=3 * vpp_session.desired_min_tx,
2545             detect_mult=4 * vpp_session.detect_mult)
2546         self.cli_verify_no_response(
2547             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2548             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2549             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2550              mod_session.desired_min_tx,
2551              mod_session.required_min_rx, mod_session.detect_mult))
2552         verify_bfd_session_config(self, mod_session)
2553         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2554             "peer-addr %s" % (self.pg0.name,
2555                               self.pg0.local_ip6, self.pg0.remote_ip6)
2556         self.cli_verify_no_response(cli_del_cmd)
2557         # 2nd del is expected to fail
2558         self.cli_verify_response(
2559             cli_del_cmd,
2560             "bfd udp session del: `bfd_udp_del_session' API call"
2561             " failed, rv=-102:No such BFD object")
2562         self.assertFalse(vpp_session.query_vpp_config())
2563
2564     def test_add_mod_del_bfd_udp_auth(self):
2565         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2566         key = self.factory.create_random_key(self)
2567         key.add_vpp_config()
2568         vpp_session = VppBFDUDPSession(
2569             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2570         self.registry.register(vpp_session, self.logger)
2571         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2572             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2573             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2574             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2575                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2576                vpp_session.detect_mult, key.conf_key_id,
2577                vpp_session.bfd_key_id)
2578         self.cli_verify_no_response(cli_add_cmd)
2579         # 2nd add should fail
2580         self.cli_verify_response(
2581             cli_add_cmd,
2582             "bfd udp session add: `bfd_add_add_session' API call"
2583             " failed, rv=-101:Duplicate BFD object")
2584         verify_bfd_session_config(self, vpp_session)
2585         mod_session = VppBFDUDPSession(
2586             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2587             bfd_key_id=vpp_session.bfd_key_id,
2588             required_min_rx=2 * vpp_session.required_min_rx,
2589             desired_min_tx=3 * vpp_session.desired_min_tx,
2590             detect_mult=4 * vpp_session.detect_mult)
2591         self.cli_verify_no_response(
2592             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2593             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2594             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2595              mod_session.desired_min_tx,
2596              mod_session.required_min_rx, mod_session.detect_mult))
2597         verify_bfd_session_config(self, mod_session)
2598         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2599             "peer-addr %s" % (self.pg0.name,
2600                               self.pg0.local_ip4, self.pg0.remote_ip4)
2601         self.cli_verify_no_response(cli_del_cmd)
2602         # 2nd del is expected to fail
2603         self.cli_verify_response(
2604             cli_del_cmd,
2605             "bfd udp session del: `bfd_udp_del_session' API call"
2606             " failed, rv=-102:No such BFD object")
2607         self.assertFalse(vpp_session.query_vpp_config())
2608
2609     def test_add_mod_del_bfd_udp6_auth(self):
2610         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2611         key = self.factory.create_random_key(
2612             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2613         key.add_vpp_config()
2614         vpp_session = VppBFDUDPSession(
2615             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2616         self.registry.register(vpp_session, self.logger)
2617         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2618             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2619             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2620             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2621                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2622                vpp_session.detect_mult, key.conf_key_id,
2623                vpp_session.bfd_key_id)
2624         self.cli_verify_no_response(cli_add_cmd)
2625         # 2nd add should fail
2626         self.cli_verify_response(
2627             cli_add_cmd,
2628             "bfd udp session add: `bfd_add_add_session' API call"
2629             " failed, rv=-101:Duplicate BFD object")
2630         verify_bfd_session_config(self, vpp_session)
2631         mod_session = VppBFDUDPSession(
2632             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2633             bfd_key_id=vpp_session.bfd_key_id,
2634             required_min_rx=2 * vpp_session.required_min_rx,
2635             desired_min_tx=3 * vpp_session.desired_min_tx,
2636             detect_mult=4 * vpp_session.detect_mult)
2637         self.cli_verify_no_response(
2638             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2639             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2640             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2641              mod_session.desired_min_tx,
2642              mod_session.required_min_rx, mod_session.detect_mult))
2643         verify_bfd_session_config(self, mod_session)
2644         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2645             "peer-addr %s" % (self.pg0.name,
2646                               self.pg0.local_ip6, self.pg0.remote_ip6)
2647         self.cli_verify_no_response(cli_del_cmd)
2648         # 2nd del is expected to fail
2649         self.cli_verify_response(
2650             cli_del_cmd,
2651             "bfd udp session del: `bfd_udp_del_session' API call"
2652             " failed, rv=-102:No such BFD object")
2653         self.assertFalse(vpp_session.query_vpp_config())
2654
2655     def test_auth_on_off(self):
2656         """ turn authentication on and off """
2657         key = self.factory.create_random_key(
2658             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2659         key.add_vpp_config()
2660         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2661         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2662                                         sha1_key=key)
2663         session.add_vpp_config()
2664         cli_activate = \
2665             "bfd udp session auth activate interface %s local-addr %s "\
2666             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2667             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2668                key.conf_key_id, auth_session.bfd_key_id)
2669         self.cli_verify_no_response(cli_activate)
2670         verify_bfd_session_config(self, auth_session)
2671         self.cli_verify_no_response(cli_activate)
2672         verify_bfd_session_config(self, auth_session)
2673         cli_deactivate = \
2674             "bfd udp session auth deactivate interface %s local-addr %s "\
2675             "peer-addr %s "\
2676             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2677         self.cli_verify_no_response(cli_deactivate)
2678         verify_bfd_session_config(self, session)
2679         self.cli_verify_no_response(cli_deactivate)
2680         verify_bfd_session_config(self, session)
2681
2682     def test_auth_on_off_delayed(self):
2683         """ turn authentication on and off (delayed) """
2684         key = self.factory.create_random_key(
2685             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2686         key.add_vpp_config()
2687         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2688         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2689                                         sha1_key=key)
2690         session.add_vpp_config()
2691         cli_activate = \
2692             "bfd udp session auth activate interface %s local-addr %s "\
2693             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2694             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2695                key.conf_key_id, auth_session.bfd_key_id)
2696         self.cli_verify_no_response(cli_activate)
2697         verify_bfd_session_config(self, auth_session)
2698         self.cli_verify_no_response(cli_activate)
2699         verify_bfd_session_config(self, auth_session)
2700         cli_deactivate = \
2701             "bfd udp session auth deactivate interface %s local-addr %s "\
2702             "peer-addr %s delayed yes"\
2703             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2704         self.cli_verify_no_response(cli_deactivate)
2705         verify_bfd_session_config(self, session)
2706         self.cli_verify_no_response(cli_deactivate)
2707         verify_bfd_session_config(self, session)
2708
2709     def test_admin_up_down(self):
2710         """ put session admin-up and admin-down """
2711         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2712         session.add_vpp_config()
2713         cli_down = \
2714             "bfd udp session set-flags admin down interface %s local-addr %s "\
2715             "peer-addr %s "\
2716             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2717         cli_up = \
2718             "bfd udp session set-flags admin up interface %s local-addr %s "\
2719             "peer-addr %s "\
2720             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2721         self.cli_verify_no_response(cli_down)
2722         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2723         self.cli_verify_no_response(cli_up)
2724         verify_bfd_session_config(self, session, state=BFDState.down)
2725
2726     def test_set_del_udp_echo_source(self):
2727         """ set/del udp echo source """
2728         self.create_loopback_interfaces(1)
2729         self.loopback0 = self.lo_interfaces[0]
2730         self.loopback0.admin_up()
2731         self.cli_verify_response("show bfd echo-source",
2732                                  "UDP echo source is not set.")
2733         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2734         self.cli_verify_no_response(cli_set)
2735         self.cli_verify_response("show bfd echo-source",
2736                                  "UDP echo source is: %s\n"
2737                                  "IPv4 address usable as echo source: none\n"
2738                                  "IPv6 address usable as echo source: none" %
2739                                  self.loopback0.name)
2740         self.loopback0.config_ip4()
2741         echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2742             self.loopback0.local_ip4)) ^ 1))
2743         self.cli_verify_response("show bfd echo-source",
2744                                  "UDP echo source is: %s\n"
2745                                  "IPv4 address usable as echo source: %s\n"
2746                                  "IPv6 address usable as echo source: none" %
2747                                  (self.loopback0.name, echo_ip4))
2748         echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2749             self.loopback0.local_ip6)) ^ 1))
2750         self.loopback0.config_ip6()
2751         self.cli_verify_response("show bfd echo-source",
2752                                  "UDP echo source is: %s\n"
2753                                  "IPv4 address usable as echo source: %s\n"
2754                                  "IPv6 address usable as echo source: %s" %
2755                                  (self.loopback0.name, echo_ip4, echo_ip6))
2756         cli_del = "bfd udp echo-source del"
2757         self.cli_verify_no_response(cli_del)
2758         self.cli_verify_response("show bfd echo-source",
2759                                  "UDP echo source is not set.")
2760
2761
2762 if __name__ == '__main__':
2763     unittest.main(testRunner=VppTestRunner)