lisp: fix use-after-free
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
29     CliFailedCommandError
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
33
34 USEC_IN_SEC = 1000000
35
36
37 class AuthKeyFactory(object):
38     """Factory class for creating auth keys with unique conf key ID"""
39
40     def __init__(self):
41         self._conf_key_ids = {}
42
43     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44         """ create a random key with unique conf key id """
45         conf_key_id = randint(0, 0xFFFFFFFF)
46         while conf_key_id in self._conf_key_ids:
47             conf_key_id = randint(0, 0xFFFFFFFF)
48         self._conf_key_ids[conf_key_id] = 1
49         key = scapy.compat.raw(
50             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51         return VppBFDAuthKey(test=test, auth_type=auth_type,
52                              conf_key_id=conf_key_id, key=key)
53
54
55 class BFDAPITestCase(VppTestCase):
56     """Bidirectional Forwarding Detection (BFD) - API"""
57
58     pg0 = None
59     pg1 = None
60
61     @classmethod
62     def setUpClass(cls):
63         super(BFDAPITestCase, cls).setUpClass()
64         cls.vapi.cli("set log class bfd level debug")
65         try:
66             cls.create_pg_interfaces(range(2))
67             for i in cls.pg_interfaces:
68                 i.config_ip4()
69                 i.config_ip6()
70                 i.resolve_arp()
71
72         except Exception:
73             super(BFDAPITestCase, cls).tearDownClass()
74             raise
75
76     @classmethod
77     def tearDownClass(cls):
78         super(BFDAPITestCase, cls).tearDownClass()
79
80     def setUp(self):
81         super(BFDAPITestCase, self).setUp()
82         self.factory = AuthKeyFactory()
83
84     def test_add_bfd(self):
85         """ create a BFD session """
86         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
87         session.add_vpp_config()
88         self.logger.debug("Session state is %s", session.state)
89         session.remove_vpp_config()
90         session.add_vpp_config()
91         self.logger.debug("Session state is %s", session.state)
92         session.remove_vpp_config()
93
94     def test_double_add(self):
95         """ create the same BFD session twice (negative case) """
96         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
97         session.add_vpp_config()
98
99         with self.vapi.assert_negative_api_retval():
100             session.add_vpp_config()
101
102         session.remove_vpp_config()
103
104     def test_add_bfd6(self):
105         """ create IPv6 BFD session """
106         session = VppBFDUDPSession(
107             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
108         session.add_vpp_config()
109         self.logger.debug("Session state is %s", session.state)
110         session.remove_vpp_config()
111         session.add_vpp_config()
112         self.logger.debug("Session state is %s", session.state)
113         session.remove_vpp_config()
114
115     def test_mod_bfd(self):
116         """ modify BFD session parameters """
117         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
118                                    desired_min_tx=50000,
119                                    required_min_rx=10000,
120                                    detect_mult=1)
121         session.add_vpp_config()
122         s = session.get_bfd_udp_session_dump_entry()
123         self.assert_equal(session.desired_min_tx,
124                           s.desired_min_tx,
125                           "desired min transmit interval")
126         self.assert_equal(session.required_min_rx,
127                           s.required_min_rx,
128                           "required min receive interval")
129         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
131                                   required_min_rx=session.required_min_rx * 2,
132                                   detect_mult=session.detect_mult * 2)
133         s = session.get_bfd_udp_session_dump_entry()
134         self.assert_equal(session.desired_min_tx,
135                           s.desired_min_tx,
136                           "desired min transmit interval")
137         self.assert_equal(session.required_min_rx,
138                           s.required_min_rx,
139                           "required min receive interval")
140         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
141
142     def test_add_sha1_keys(self):
143         """ add SHA1 keys """
144         key_count = 10
145         keys = [self.factory.create_random_key(
146             self) for i in range(0, key_count)]
147         for key in keys:
148             self.assertFalse(key.query_vpp_config())
149         for key in keys:
150             key.add_vpp_config()
151         for key in keys:
152             self.assertTrue(key.query_vpp_config())
153         # remove randomly
154         indexes = list(range(key_count))
155         shuffle(indexes)
156         removed = []
157         for i in indexes:
158             key = keys[i]
159             key.remove_vpp_config()
160             removed.append(i)
161             for j in range(key_count):
162                 key = keys[j]
163                 if j in removed:
164                     self.assertFalse(key.query_vpp_config())
165                 else:
166                     self.assertTrue(key.query_vpp_config())
167         # should be removed now
168         for key in keys:
169             self.assertFalse(key.query_vpp_config())
170         # add back and remove again
171         for key in keys:
172             key.add_vpp_config()
173         for key in keys:
174             self.assertTrue(key.query_vpp_config())
175         for key in keys:
176             key.remove_vpp_config()
177         for key in keys:
178             self.assertFalse(key.query_vpp_config())
179
180     def test_add_bfd_sha1(self):
181         """ create a BFD session (SHA1) """
182         key = self.factory.create_random_key(self)
183         key.add_vpp_config()
184         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185                                    sha1_key=key)
186         session.add_vpp_config()
187         self.logger.debug("Session state is %s", session.state)
188         session.remove_vpp_config()
189         session.add_vpp_config()
190         self.logger.debug("Session state is %s", session.state)
191         session.remove_vpp_config()
192
193     def test_double_add_sha1(self):
194         """ create the same BFD session twice (negative case) (SHA1) """
195         key = self.factory.create_random_key(self)
196         key.add_vpp_config()
197         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
198                                    sha1_key=key)
199         session.add_vpp_config()
200         with self.assertRaises(Exception):
201             session.add_vpp_config()
202
203     def test_add_auth_nonexistent_key(self):
204         """ create BFD session using non-existent SHA1 (negative case) """
205         session = VppBFDUDPSession(
206             self, self.pg0, self.pg0.remote_ip4,
207             sha1_key=self.factory.create_random_key(self))
208         with self.assertRaises(Exception):
209             session.add_vpp_config()
210
211     def test_shared_sha1_key(self):
212         """ share single SHA1 key between multiple BFD sessions """
213         key = self.factory.create_random_key(self)
214         key.add_vpp_config()
215         sessions = [
216             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217                              sha1_key=key),
218             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
219                              sha1_key=key, af=AF_INET6),
220             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
221                              sha1_key=key),
222             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
223                              sha1_key=key, af=AF_INET6)]
224         for s in sessions:
225             s.add_vpp_config()
226         removed = 0
227         for s in sessions:
228             e = key.get_bfd_auth_keys_dump_entry()
229             self.assert_equal(e.use_count, len(sessions) - removed,
230                               "Use count for shared key")
231             s.remove_vpp_config()
232             removed += 1
233         e = key.get_bfd_auth_keys_dump_entry()
234         self.assert_equal(e.use_count, len(sessions) - removed,
235                           "Use count for shared key")
236
237     def test_activate_auth(self):
238         """ activate SHA1 authentication """
239         key = self.factory.create_random_key(self)
240         key.add_vpp_config()
241         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242         session.add_vpp_config()
243         session.activate_auth(key)
244
245     def test_deactivate_auth(self):
246         """ deactivate SHA1 authentication """
247         key = self.factory.create_random_key(self)
248         key.add_vpp_config()
249         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
250         session.add_vpp_config()
251         session.activate_auth(key)
252         session.deactivate_auth()
253
254     def test_change_key(self):
255         """ change SHA1 key """
256         key1 = self.factory.create_random_key(self)
257         key2 = self.factory.create_random_key(self)
258         while key2.conf_key_id == key1.conf_key_id:
259             key2 = self.factory.create_random_key(self)
260         key1.add_vpp_config()
261         key2.add_vpp_config()
262         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
263                                    sha1_key=key1)
264         session.add_vpp_config()
265         session.activate_auth(key2)
266
267     def test_set_del_udp_echo_source(self):
268         """ set/del udp echo source """
269         self.create_loopback_interfaces(1)
270         self.loopback0 = self.lo_interfaces[0]
271         self.loopback0.admin_up()
272         echo_source = self.vapi.bfd_udp_get_echo_source()
273         self.assertFalse(echo_source.is_set)
274         self.assertFalse(echo_source.have_usable_ip4)
275         self.assertFalse(echo_source.have_usable_ip6)
276
277         self.vapi.bfd_udp_set_echo_source(
278             sw_if_index=self.loopback0.sw_if_index)
279         echo_source = self.vapi.bfd_udp_get_echo_source()
280         self.assertTrue(echo_source.is_set)
281         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282         self.assertFalse(echo_source.have_usable_ip4)
283         self.assertFalse(echo_source.have_usable_ip6)
284
285         self.loopback0.config_ip4()
286         unpacked = unpack("!L", self.loopback0.local_ip4n)
287         echo_ip4 = pack("!L", unpacked[0] ^ 1)
288         echo_source = self.vapi.bfd_udp_get_echo_source()
289         self.assertTrue(echo_source.is_set)
290         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
291         self.assertTrue(echo_source.have_usable_ip4)
292         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
293         self.assertFalse(echo_source.have_usable_ip6)
294
295         self.loopback0.config_ip6()
296         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
297         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
298                         unpacked[3] ^ 1)
299         echo_source = self.vapi.bfd_udp_get_echo_source()
300         self.assertTrue(echo_source.is_set)
301         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
302         self.assertTrue(echo_source.have_usable_ip4)
303         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
304         self.assertTrue(echo_source.have_usable_ip6)
305         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
306
307         self.vapi.bfd_udp_del_echo_source()
308         echo_source = self.vapi.bfd_udp_get_echo_source()
309         self.assertFalse(echo_source.is_set)
310         self.assertFalse(echo_source.have_usable_ip4)
311         self.assertFalse(echo_source.have_usable_ip6)
312
313
314 class BFDTestSession(object):
315     """ BFD session as seen from test framework side """
316
317     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
318                  bfd_key_id=None, our_seq_number=None,
319                  tunnel_header=None, phy_interface=None):
320         self.test = test
321         self.af = af
322         self.sha1_key = sha1_key
323         self.bfd_key_id = bfd_key_id
324         self.interface = interface
325         if phy_interface:
326             self.phy_interface = phy_interface
327         else:
328             self.phy_interface = self.interface
329         self.udp_sport = randint(49152, 65535)
330         if our_seq_number is None:
331             self.our_seq_number = randint(0, 40000000)
332         else:
333             self.our_seq_number = our_seq_number
334         self.vpp_seq_number = None
335         self.my_discriminator = 0
336         self.desired_min_tx = 300000
337         self.required_min_rx = 300000
338         self.required_min_echo_rx = None
339         self.detect_mult = detect_mult
340         self.diag = BFDDiagCode.no_diagnostic
341         self.your_discriminator = None
342         self.state = BFDState.down
343         self.auth_type = BFDAuthType.no_auth
344         self.tunnel_header = tunnel_header
345
346     def inc_seq_num(self):
347         """ increment sequence number, wrapping if needed """
348         if self.our_seq_number == 0xFFFFFFFF:
349             self.our_seq_number = 0
350         else:
351             self.our_seq_number += 1
352
353     def update(self, my_discriminator=None, your_discriminator=None,
354                desired_min_tx=None, required_min_rx=None,
355                required_min_echo_rx=None, detect_mult=None,
356                diag=None, state=None, auth_type=None):
357         """ update BFD parameters associated with session """
358         if my_discriminator is not None:
359             self.my_discriminator = my_discriminator
360         if your_discriminator is not None:
361             self.your_discriminator = your_discriminator
362         if required_min_rx is not None:
363             self.required_min_rx = required_min_rx
364         if required_min_echo_rx is not None:
365             self.required_min_echo_rx = required_min_echo_rx
366         if desired_min_tx is not None:
367             self.desired_min_tx = desired_min_tx
368         if detect_mult is not None:
369             self.detect_mult = detect_mult
370         if diag is not None:
371             self.diag = diag
372         if state is not None:
373             self.state = state
374         if auth_type is not None:
375             self.auth_type = auth_type
376
377     def fill_packet_fields(self, packet):
378         """ set packet fields with known values in packet """
379         bfd = packet[BFD]
380         if self.my_discriminator:
381             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
382                                    self.my_discriminator)
383             bfd.my_discriminator = self.my_discriminator
384         if self.your_discriminator:
385             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
386                                    self.your_discriminator)
387             bfd.your_discriminator = self.your_discriminator
388         if self.required_min_rx:
389             self.test.logger.debug(
390                 "BFD: setting packet.required_min_rx_interval=%s",
391                 self.required_min_rx)
392             bfd.required_min_rx_interval = self.required_min_rx
393         if self.required_min_echo_rx:
394             self.test.logger.debug(
395                 "BFD: setting packet.required_min_echo_rx=%s",
396                 self.required_min_echo_rx)
397             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
398         if self.desired_min_tx:
399             self.test.logger.debug(
400                 "BFD: setting packet.desired_min_tx_interval=%s",
401                 self.desired_min_tx)
402             bfd.desired_min_tx_interval = self.desired_min_tx
403         if self.detect_mult:
404             self.test.logger.debug(
405                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
406             bfd.detect_mult = self.detect_mult
407         if self.diag:
408             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
409             bfd.diag = self.diag
410         if self.state:
411             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
412             bfd.state = self.state
413         if self.auth_type:
414             # this is used by a negative test-case
415             self.test.logger.debug("BFD: setting packet.auth_type=%s",
416                                    self.auth_type)
417             bfd.auth_type = self.auth_type
418
419     def create_packet(self):
420         """ create a BFD packet, reflecting the current state of session """
421         if self.sha1_key:
422             bfd = BFD(flags="A")
423             bfd.auth_type = self.sha1_key.auth_type
424             bfd.auth_len = BFD.sha1_auth_len
425             bfd.auth_key_id = self.bfd_key_id
426             bfd.auth_seq_num = self.our_seq_number
427             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
428         else:
429             bfd = BFD()
430         packet = Ether(src=self.phy_interface.remote_mac,
431                        dst=self.phy_interface.local_mac)
432         if self.tunnel_header:
433             packet = packet / self.tunnel_header
434         if self.af == AF_INET6:
435             packet = (packet /
436                       IPv6(src=self.interface.remote_ip6,
437                            dst=self.interface.local_ip6,
438                            hlim=255) /
439                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
440                       bfd)
441         else:
442             packet = (packet /
443                       IP(src=self.interface.remote_ip4,
444                          dst=self.interface.local_ip4,
445                          ttl=255) /
446                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
447                       bfd)
448         self.test.logger.debug("BFD: Creating packet")
449         self.fill_packet_fields(packet)
450         if self.sha1_key:
451             hash_material = scapy.compat.raw(
452                 packet[BFD])[:32] + self.sha1_key.key + \
453                 b"\0" * (20 - len(self.sha1_key.key))
454             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
455                                    hashlib.sha1(hash_material).hexdigest())
456             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
457         return packet
458
459     def send_packet(self, packet=None, interface=None):
460         """ send packet on interface, creating the packet if needed """
461         if packet is None:
462             packet = self.create_packet()
463         if interface is None:
464             interface = self.phy_interface
465         self.test.logger.debug(ppp("Sending packet:", packet))
466         interface.add_stream(packet)
467         self.test.pg_start()
468
469     def verify_sha1_auth(self, packet):
470         """ Verify correctness of authentication in BFD layer. """
471         bfd = packet[BFD]
472         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
473         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
474                                BFDAuthType)
475         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
476         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
477         if self.vpp_seq_number is None:
478             self.vpp_seq_number = bfd.auth_seq_num
479             self.test.logger.debug("Received initial sequence number: %s" %
480                                    self.vpp_seq_number)
481         else:
482             recvd_seq_num = bfd.auth_seq_num
483             self.test.logger.debug("Received followup sequence number: %s" %
484                                    recvd_seq_num)
485             if self.vpp_seq_number < 0xffffffff:
486                 if self.sha1_key.auth_type == \
487                         BFDAuthType.meticulous_keyed_sha1:
488                     self.test.assert_equal(recvd_seq_num,
489                                            self.vpp_seq_number + 1,
490                                            "BFD sequence number")
491                 else:
492                     self.test.assert_in_range(recvd_seq_num,
493                                               self.vpp_seq_number,
494                                               self.vpp_seq_number + 1,
495                                               "BFD sequence number")
496             else:
497                 if self.sha1_key.auth_type == \
498                         BFDAuthType.meticulous_keyed_sha1:
499                     self.test.assert_equal(recvd_seq_num, 0,
500                                            "BFD sequence number")
501                 else:
502                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
503                                        "BFD sequence number not one of "
504                                        "(%s, 0)" % self.vpp_seq_number)
505             self.vpp_seq_number = recvd_seq_num
506         # last 20 bytes represent the hash - so replace them with the key,
507         # pad the result with zeros and hash the result
508         hash_material = bfd.original[:-20] + self.sha1_key.key + \
509             b"\0" * (20 - len(self.sha1_key.key))
510         expected_hash = hashlib.sha1(hash_material).hexdigest()
511         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
512                                expected_hash.encode(), "Auth key hash")
513
514     def verify_bfd(self, packet):
515         """ Verify correctness of BFD layer. """
516         bfd = packet[BFD]
517         self.test.assert_equal(bfd.version, 1, "BFD version")
518         self.test.assert_equal(bfd.your_discriminator,
519                                self.my_discriminator,
520                                "BFD - your discriminator")
521         if self.sha1_key:
522             self.verify_sha1_auth(packet)
523
524
525 def bfd_session_up(test):
526     """ Bring BFD session up """
527     test.logger.info("BFD: Waiting for slow hello")
528     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
529     old_offset = None
530     if hasattr(test, 'vpp_clock_offset'):
531         old_offset = test.vpp_clock_offset
532     test.vpp_clock_offset = time.time() - float(p.time)
533     test.logger.debug("BFD: Calculated vpp clock offset: %s",
534                       test.vpp_clock_offset)
535     if old_offset:
536         test.assertAlmostEqual(
537             old_offset, test.vpp_clock_offset, delta=0.5,
538             msg="vpp clock offset not stable (new: %s, old: %s)" %
539             (test.vpp_clock_offset, old_offset))
540     test.logger.info("BFD: Sending Init")
541     test.test_session.update(my_discriminator=randint(0, 40000000),
542                              your_discriminator=p[BFD].my_discriminator,
543                              state=BFDState.init)
544     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
545             BFDAuthType.meticulous_keyed_sha1:
546         test.test_session.inc_seq_num()
547     test.test_session.send_packet()
548     test.logger.info("BFD: Waiting for event")
549     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
550     verify_event(test, e, expected_state=BFDState.up)
551     test.logger.info("BFD: Session is Up")
552     test.test_session.update(state=BFDState.up)
553     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554             BFDAuthType.meticulous_keyed_sha1:
555         test.test_session.inc_seq_num()
556     test.test_session.send_packet()
557     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
558
559
560 def bfd_session_down(test):
561     """ Bring BFD session down """
562     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563     test.test_session.update(state=BFDState.down)
564     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
565             BFDAuthType.meticulous_keyed_sha1:
566         test.test_session.inc_seq_num()
567     test.test_session.send_packet()
568     test.logger.info("BFD: Waiting for event")
569     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
570     verify_event(test, e, expected_state=BFDState.down)
571     test.logger.info("BFD: Session is Down")
572     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
573
574
575 def verify_bfd_session_config(test, session, state=None):
576     dump = session.get_bfd_udp_session_dump_entry()
577     test.assertIsNotNone(dump)
578     # since dump is not none, we have verified that sw_if_index and addresses
579     # are valid (in get_bfd_udp_session_dump_entry)
580     if state:
581         test.assert_equal(dump.state, state, "session state")
582     test.assert_equal(dump.required_min_rx, session.required_min_rx,
583                       "required min rx interval")
584     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
585                       "desired min tx interval")
586     test.assert_equal(dump.detect_mult, session.detect_mult,
587                       "detect multiplier")
588     if session.sha1_key is None:
589         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
590     else:
591         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
592         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
593                           "bfd key id")
594         test.assert_equal(dump.conf_key_id,
595                           session.sha1_key.conf_key_id,
596                           "config key id")
597
598
599 def verify_ip(test, packet):
600     """ Verify correctness of IP layer. """
601     if test.vpp_session.af == AF_INET6:
602         ip = packet[IPv6]
603         local_ip = test.vpp_session.interface.local_ip6
604         remote_ip = test.vpp_session.interface.remote_ip6
605         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
606     else:
607         ip = packet[IP]
608         local_ip = test.vpp_session.interface.local_ip4
609         remote_ip = test.vpp_session.interface.remote_ip4
610         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
611     test.assert_equal(ip.src, local_ip, "IP source address")
612     test.assert_equal(ip.dst, remote_ip, "IP destination address")
613
614
615 def verify_udp(test, packet):
616     """ Verify correctness of UDP layer. """
617     udp = packet[UDP]
618     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
619     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
620                          "UDP source port")
621
622
623 def verify_event(test, event, expected_state):
624     """ Verify correctness of event values. """
625     e = event
626     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
627     test.assert_equal(e.sw_if_index,
628                       test.vpp_session.interface.sw_if_index,
629                       "BFD interface index")
630
631     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
632                       "Local IPv6 address")
633     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
634                       "Peer IPv6 address")
635     test.assert_equal(e.state, expected_state, BFDState)
636
637
638 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
639     """ wait for BFD packet and verify its correctness
640
641     :param timeout: how long to wait
642     :param pcap_time_min: ignore packets with pcap timestamp lower than this
643
644     :returns: tuple (packet, time spent waiting for packet)
645     """
646     test.logger.info("BFD: Waiting for BFD packet")
647     deadline = time.time() + timeout
648     counter = 0
649     while True:
650         counter += 1
651         # sanity check
652         test.assert_in_range(counter, 0, 100, "number of packets ignored")
653         time_left = deadline - time.time()
654         if time_left < 0:
655             raise CaptureTimeoutError("Packet did not arrive within timeout")
656         p = test.pg0.wait_for_packet(timeout=time_left)
657         test.logger.debug(ppp("BFD: Got packet:", p))
658         if pcap_time_min is not None and p.time < pcap_time_min:
659             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
660                                   "pcap time min %s):" %
661                                   (p.time, pcap_time_min), p))
662         else:
663             break
664     if is_tunnel:
665         # strip an IP layer and move to the next
666         p = p[IP].payload
667
668     bfd = p[BFD]
669     if bfd is None:
670         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
671     if bfd.payload:
672         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
673     verify_ip(test, p)
674     verify_udp(test, p)
675     test.test_session.verify_bfd(p)
676     return p
677
678
679 class BFD4TestCase(VppTestCase):
680     """Bidirectional Forwarding Detection (BFD)"""
681
682     pg0 = None
683     vpp_clock_offset = None
684     vpp_session = None
685     test_session = None
686
687     @classmethod
688     def setUpClass(cls):
689         super(BFD4TestCase, cls).setUpClass()
690         cls.vapi.cli("set log class bfd level debug")
691         try:
692             cls.create_pg_interfaces([0])
693             cls.create_loopback_interfaces(1)
694             cls.loopback0 = cls.lo_interfaces[0]
695             cls.loopback0.config_ip4()
696             cls.loopback0.admin_up()
697             cls.pg0.config_ip4()
698             cls.pg0.configure_ipv4_neighbors()
699             cls.pg0.admin_up()
700             cls.pg0.resolve_arp()
701
702         except Exception:
703             super(BFD4TestCase, cls).tearDownClass()
704             raise
705
706     @classmethod
707     def tearDownClass(cls):
708         super(BFD4TestCase, cls).tearDownClass()
709
710     def setUp(self):
711         super(BFD4TestCase, self).setUp()
712         self.factory = AuthKeyFactory()
713         self.vapi.want_bfd_events()
714         self.pg0.enable_capture()
715         try:
716             self.vpp_session = VppBFDUDPSession(self, self.pg0,
717                                                 self.pg0.remote_ip4)
718             self.vpp_session.add_vpp_config()
719             self.vpp_session.admin_up()
720             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
721         except BaseException:
722             self.vapi.want_bfd_events(enable_disable=0)
723             raise
724
725     def tearDown(self):
726         if not self.vpp_dead:
727             self.vapi.want_bfd_events(enable_disable=0)
728         self.vapi.collect_events()  # clear the event queue
729         super(BFD4TestCase, self).tearDown()
730
731     def test_session_up(self):
732         """ bring BFD session up """
733         bfd_session_up(self)
734
735     def test_session_up_by_ip(self):
736         """ bring BFD session up - first frame looked up by address pair """
737         self.logger.info("BFD: Sending Slow control frame")
738         self.test_session.update(my_discriminator=randint(0, 40000000))
739         self.test_session.send_packet()
740         self.pg0.enable_capture()
741         p = self.pg0.wait_for_packet(1)
742         self.assert_equal(p[BFD].your_discriminator,
743                           self.test_session.my_discriminator,
744                           "BFD - your discriminator")
745         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
746         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
747                                  state=BFDState.up)
748         self.logger.info("BFD: Waiting for event")
749         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750         verify_event(self, e, expected_state=BFDState.init)
751         self.logger.info("BFD: Sending Up")
752         self.test_session.send_packet()
753         self.logger.info("BFD: Waiting for event")
754         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755         verify_event(self, e, expected_state=BFDState.up)
756         self.logger.info("BFD: Session is Up")
757         self.test_session.update(state=BFDState.up)
758         self.test_session.send_packet()
759         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
760
761     def test_session_down(self):
762         """ bring BFD session down """
763         bfd_session_up(self)
764         bfd_session_down(self)
765
766     def test_hold_up(self):
767         """ hold BFD session up """
768         bfd_session_up(self)
769         for dummy in range(self.test_session.detect_mult * 2):
770             wait_for_bfd_packet(self)
771             self.test_session.send_packet()
772         self.assert_equal(len(self.vapi.collect_events()), 0,
773                           "number of bfd events")
774
775     def test_slow_timer(self):
776         """ verify slow periodic control frames while session down """
777         packet_count = 3
778         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
779         prev_packet = wait_for_bfd_packet(self, 2)
780         for dummy in range(packet_count):
781             next_packet = wait_for_bfd_packet(self, 2)
782             time_diff = next_packet.time - prev_packet.time
783             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
784             # to work around timing issues
785             self.assert_in_range(
786                 time_diff, 0.70, 1.05, "time between slow packets")
787             prev_packet = next_packet
788
789     def test_zero_remote_min_rx(self):
790         """ no packets when zero remote required min rx interval """
791         bfd_session_up(self)
792         self.test_session.update(required_min_rx=0)
793         self.test_session.send_packet()
794         for dummy in range(self.test_session.detect_mult):
795             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
796                        "sleep before transmitting bfd packet")
797             self.test_session.send_packet()
798             try:
799                 p = wait_for_bfd_packet(self, timeout=0)
800                 self.logger.error(ppp("Received unexpected packet:", p))
801             except CaptureTimeoutError:
802                 pass
803         self.assert_equal(
804             len(self.vapi.collect_events()), 0, "number of bfd events")
805         self.test_session.update(required_min_rx=300000)
806         for dummy in range(3):
807             self.test_session.send_packet()
808             wait_for_bfd_packet(
809                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
810         self.assert_equal(
811             len(self.vapi.collect_events()), 0, "number of bfd events")
812
813     def test_conn_down(self):
814         """ verify session goes down after inactivity """
815         bfd_session_up(self)
816         detection_time = self.test_session.detect_mult *\
817             self.vpp_session.required_min_rx / USEC_IN_SEC
818         self.sleep(detection_time, "waiting for BFD session time-out")
819         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
820         verify_event(self, e, expected_state=BFDState.down)
821
822     def test_peer_discr_reset_sess_down(self):
823         """ peer discriminator reset after session goes down """
824         bfd_session_up(self)
825         detection_time = self.test_session.detect_mult *\
826             self.vpp_session.required_min_rx / USEC_IN_SEC
827         self.sleep(detection_time, "waiting for BFD session time-out")
828         self.test_session.my_discriminator = 0
829         wait_for_bfd_packet(self,
830                             pcap_time_min=time.time() - self.vpp_clock_offset)
831
832     def test_large_required_min_rx(self):
833         """ large remote required min rx interval """
834         bfd_session_up(self)
835         p = wait_for_bfd_packet(self)
836         interval = 3000000
837         self.test_session.update(required_min_rx=interval)
838         self.test_session.send_packet()
839         time_mark = time.time()
840         count = 0
841         # busy wait here, trying to collect a packet or event, vpp is not
842         # allowed to send packets and the session will timeout first - so the
843         # Up->Down event must arrive before any packets do
844         while time.time() < time_mark + interval / USEC_IN_SEC:
845             try:
846                 p = wait_for_bfd_packet(self, timeout=0)
847                 # if vpp managed to send a packet before we did the session
848                 # session update, then that's fine, ignore it
849                 if p.time < time_mark - self.vpp_clock_offset:
850                     continue
851                 self.logger.error(ppp("Received unexpected packet:", p))
852                 count += 1
853             except CaptureTimeoutError:
854                 pass
855             events = self.vapi.collect_events()
856             if len(events) > 0:
857                 verify_event(self, events[0], BFDState.down)
858                 break
859         self.assert_equal(count, 0, "number of packets received")
860
861     def test_immediate_remote_min_rx_reduction(self):
862         """ immediately honor remote required min rx reduction """
863         self.vpp_session.remove_vpp_config()
864         self.vpp_session = VppBFDUDPSession(
865             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
866         self.pg0.enable_capture()
867         self.vpp_session.add_vpp_config()
868         self.test_session.update(desired_min_tx=1000000,
869                                  required_min_rx=1000000)
870         bfd_session_up(self)
871         reference_packet = wait_for_bfd_packet(self)
872         time_mark = time.time()
873         interval = 300000
874         self.test_session.update(required_min_rx=interval)
875         self.test_session.send_packet()
876         extra_time = time.time() - time_mark
877         p = wait_for_bfd_packet(self)
878         # first packet is allowed to be late by time we spent doing the update
879         # calculated in extra_time
880         self.assert_in_range(p.time - reference_packet.time,
881                              .95 * 0.75 * interval / USEC_IN_SEC,
882                              1.05 * interval / USEC_IN_SEC + extra_time,
883                              "time between BFD packets")
884         reference_packet = p
885         for dummy in range(3):
886             p = wait_for_bfd_packet(self)
887             diff = p.time - reference_packet.time
888             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
889                                  1.05 * interval / USEC_IN_SEC,
890                                  "time between BFD packets")
891             reference_packet = p
892
893     def test_modify_req_min_rx_double(self):
894         """ modify session - double required min rx """
895         bfd_session_up(self)
896         p = wait_for_bfd_packet(self)
897         self.test_session.update(desired_min_tx=10000,
898                                  required_min_rx=10000)
899         self.test_session.send_packet()
900         # double required min rx
901         self.vpp_session.modify_parameters(
902             required_min_rx=2 * self.vpp_session.required_min_rx)
903         p = wait_for_bfd_packet(
904             self, pcap_time_min=time.time() - self.vpp_clock_offset)
905         # poll bit needs to be set
906         self.assertIn("P", p.sprintf("%BFD.flags%"),
907                       "Poll bit not set in BFD packet")
908         # finish poll sequence with final packet
909         final = self.test_session.create_packet()
910         final[BFD].flags = "F"
911         timeout = self.test_session.detect_mult * \
912             max(self.test_session.desired_min_tx,
913                 self.vpp_session.required_min_rx) / USEC_IN_SEC
914         self.test_session.send_packet(final)
915         time_mark = time.time()
916         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
917         verify_event(self, e, expected_state=BFDState.down)
918         time_to_event = time.time() - time_mark
919         self.assert_in_range(time_to_event, .9 * timeout,
920                              1.1 * timeout, "session timeout")
921
922     def test_modify_req_min_rx_halve(self):
923         """ modify session - halve required min rx """
924         self.vpp_session.modify_parameters(
925             required_min_rx=2 * self.vpp_session.required_min_rx)
926         bfd_session_up(self)
927         p = wait_for_bfd_packet(self)
928         self.test_session.update(desired_min_tx=10000,
929                                  required_min_rx=10000)
930         self.test_session.send_packet()
931         p = wait_for_bfd_packet(
932             self, pcap_time_min=time.time() - self.vpp_clock_offset)
933         # halve required min rx
934         old_required_min_rx = self.vpp_session.required_min_rx
935         self.vpp_session.modify_parameters(
936             required_min_rx=self.vpp_session.required_min_rx // 2)
937         # now we wait 0.8*3*old-req-min-rx and the session should still be up
938         self.sleep(0.8 * self.vpp_session.detect_mult *
939                    old_required_min_rx / USEC_IN_SEC,
940                    "wait before finishing poll sequence")
941         self.assert_equal(len(self.vapi.collect_events()), 0,
942                           "number of bfd events")
943         p = wait_for_bfd_packet(self)
944         # poll bit needs to be set
945         self.assertIn("P", p.sprintf("%BFD.flags%"),
946                       "Poll bit not set in BFD packet")
947         # finish poll sequence with final packet
948         final = self.test_session.create_packet()
949         final[BFD].flags = "F"
950         self.test_session.send_packet(final)
951         # now the session should time out under new conditions
952         detection_time = self.test_session.detect_mult *\
953             self.vpp_session.required_min_rx / USEC_IN_SEC
954         before = time.time()
955         e = self.vapi.wait_for_event(
956             2 * detection_time, "bfd_udp_session_details")
957         after = time.time()
958         self.assert_in_range(after - before,
959                              0.9 * detection_time,
960                              1.1 * detection_time,
961                              "time before bfd session goes down")
962         verify_event(self, e, expected_state=BFDState.down)
963
964     def test_modify_detect_mult(self):
965         """ modify detect multiplier """
966         bfd_session_up(self)
967         p = wait_for_bfd_packet(self)
968         self.vpp_session.modify_parameters(detect_mult=1)
969         p = wait_for_bfd_packet(
970             self, pcap_time_min=time.time() - self.vpp_clock_offset)
971         self.assert_equal(self.vpp_session.detect_mult,
972                           p[BFD].detect_mult,
973                           "detect mult")
974         # poll bit must not be set
975         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
976                          "Poll bit not set in BFD packet")
977         self.vpp_session.modify_parameters(detect_mult=10)
978         p = wait_for_bfd_packet(
979             self, pcap_time_min=time.time() - self.vpp_clock_offset)
980         self.assert_equal(self.vpp_session.detect_mult,
981                           p[BFD].detect_mult,
982                           "detect mult")
983         # poll bit must not be set
984         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
985                          "Poll bit not set in BFD packet")
986
987     def test_queued_poll(self):
988         """ test poll sequence queueing """
989         bfd_session_up(self)
990         p = wait_for_bfd_packet(self)
991         self.vpp_session.modify_parameters(
992             required_min_rx=2 * self.vpp_session.required_min_rx)
993         p = wait_for_bfd_packet(self)
994         poll_sequence_start = time.time()
995         poll_sequence_length_min = 0.5
996         send_final_after = time.time() + poll_sequence_length_min
997         # poll bit needs to be set
998         self.assertIn("P", p.sprintf("%BFD.flags%"),
999                       "Poll bit not set in BFD packet")
1000         self.assert_equal(p[BFD].required_min_rx_interval,
1001                           self.vpp_session.required_min_rx,
1002                           "BFD required min rx interval")
1003         self.vpp_session.modify_parameters(
1004             required_min_rx=2 * self.vpp_session.required_min_rx)
1005         # 2nd poll sequence should be queued now
1006         # don't send the reply back yet, wait for some time to emulate
1007         # longer round-trip time
1008         packet_count = 0
1009         while time.time() < send_final_after:
1010             self.test_session.send_packet()
1011             p = wait_for_bfd_packet(self)
1012             self.assert_equal(len(self.vapi.collect_events()), 0,
1013                               "number of bfd events")
1014             self.assert_equal(p[BFD].required_min_rx_interval,
1015                               self.vpp_session.required_min_rx,
1016                               "BFD required min rx interval")
1017             packet_count += 1
1018             # poll bit must be set
1019             self.assertIn("P", p.sprintf("%BFD.flags%"),
1020                           "Poll bit not set in BFD packet")
1021         final = self.test_session.create_packet()
1022         final[BFD].flags = "F"
1023         self.test_session.send_packet(final)
1024         # finish 1st with final
1025         poll_sequence_length = time.time() - poll_sequence_start
1026         # vpp must wait for some time before starting new poll sequence
1027         poll_no_2_started = False
1028         for dummy in range(2 * packet_count):
1029             p = wait_for_bfd_packet(self)
1030             self.assert_equal(len(self.vapi.collect_events()), 0,
1031                               "number of bfd events")
1032             if "P" in p.sprintf("%BFD.flags%"):
1033                 poll_no_2_started = True
1034                 if time.time() < poll_sequence_start + poll_sequence_length:
1035                     raise Exception("VPP started 2nd poll sequence too soon")
1036                 final = self.test_session.create_packet()
1037                 final[BFD].flags = "F"
1038                 self.test_session.send_packet(final)
1039                 break
1040             else:
1041                 self.test_session.send_packet()
1042         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1043         # finish 2nd with final
1044         final = self.test_session.create_packet()
1045         final[BFD].flags = "F"
1046         self.test_session.send_packet(final)
1047         p = wait_for_bfd_packet(self)
1048         # poll bit must not be set
1049         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1050                          "Poll bit set in BFD packet")
1051
1052     # returning inconsistent results requiring retries in per-patch tests
1053     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1054     def test_poll_response(self):
1055         """ test correct response to control frame with poll bit set """
1056         bfd_session_up(self)
1057         poll = self.test_session.create_packet()
1058         poll[BFD].flags = "P"
1059         self.test_session.send_packet(poll)
1060         final = wait_for_bfd_packet(
1061             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1062         self.assertIn("F", final.sprintf("%BFD.flags%"))
1063
1064     def test_no_periodic_if_remote_demand(self):
1065         """ no periodic frames outside poll sequence if remote demand set """
1066         bfd_session_up(self)
1067         demand = self.test_session.create_packet()
1068         demand[BFD].flags = "D"
1069         self.test_session.send_packet(demand)
1070         transmit_time = 0.9 \
1071             * max(self.vpp_session.required_min_rx,
1072                   self.test_session.desired_min_tx) \
1073             / USEC_IN_SEC
1074         count = 0
1075         for dummy in range(self.test_session.detect_mult * 2):
1076             self.sleep(transmit_time)
1077             self.test_session.send_packet(demand)
1078             try:
1079                 p = wait_for_bfd_packet(self, timeout=0)
1080                 self.logger.error(ppp("Received unexpected packet:", p))
1081                 count += 1
1082             except CaptureTimeoutError:
1083                 pass
1084         events = self.vapi.collect_events()
1085         for e in events:
1086             self.logger.error("Received unexpected event: %s", e)
1087         self.assert_equal(count, 0, "number of packets received")
1088         self.assert_equal(len(events), 0, "number of events received")
1089
1090     def test_echo_looped_back(self):
1091         """ echo packets looped back """
1092         # don't need a session in this case..
1093         self.vpp_session.remove_vpp_config()
1094         self.pg0.enable_capture()
1095         echo_packet_count = 10
1096         # random source port low enough to increment a few times..
1097         udp_sport_tx = randint(1, 50000)
1098         udp_sport_rx = udp_sport_tx
1099         echo_packet = (Ether(src=self.pg0.remote_mac,
1100                              dst=self.pg0.local_mac) /
1101                        IP(src=self.pg0.remote_ip4,
1102                           dst=self.pg0.remote_ip4) /
1103                        UDP(dport=BFD.udp_dport_echo) /
1104                        Raw("this should be looped back"))
1105         for dummy in range(echo_packet_count):
1106             self.sleep(.01, "delay between echo packets")
1107             echo_packet[UDP].sport = udp_sport_tx
1108             udp_sport_tx += 1
1109             self.logger.debug(ppp("Sending packet:", echo_packet))
1110             self.pg0.add_stream(echo_packet)
1111             self.pg_start()
1112         for dummy in range(echo_packet_count):
1113             p = self.pg0.wait_for_packet(1)
1114             self.logger.debug(ppp("Got packet:", p))
1115             ether = p[Ether]
1116             self.assert_equal(self.pg0.remote_mac,
1117                               ether.dst, "Destination MAC")
1118             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1119             ip = p[IP]
1120             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1121             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1122             udp = p[UDP]
1123             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1124                               "UDP destination port")
1125             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1126             udp_sport_rx += 1
1127             # need to compare the hex payload here, otherwise BFD_vpp_echo
1128             # gets in way
1129             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1130                              scapy.compat.raw(echo_packet[UDP].payload),
1131                              "Received packet is not the echo packet sent")
1132         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1133                           "ECHO packet identifier for test purposes)")
1134
1135     def test_echo(self):
1136         """ echo function """
1137         bfd_session_up(self)
1138         self.test_session.update(required_min_echo_rx=150000)
1139         self.test_session.send_packet()
1140         detection_time = self.test_session.detect_mult *\
1141             self.vpp_session.required_min_rx / USEC_IN_SEC
1142         # echo shouldn't work without echo source set
1143         for dummy in range(10):
1144             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1145             self.sleep(sleep, "delay before sending bfd packet")
1146             self.test_session.send_packet()
1147         p = wait_for_bfd_packet(
1148             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1149         self.assert_equal(p[BFD].required_min_rx_interval,
1150                           self.vpp_session.required_min_rx,
1151                           "BFD required min rx interval")
1152         self.test_session.send_packet()
1153         self.vapi.bfd_udp_set_echo_source(
1154             sw_if_index=self.loopback0.sw_if_index)
1155         echo_seen = False
1156         # should be turned on - loopback echo packets
1157         for dummy in range(3):
1158             loop_until = time.time() + 0.75 * detection_time
1159             while time.time() < loop_until:
1160                 p = self.pg0.wait_for_packet(1)
1161                 self.logger.debug(ppp("Got packet:", p))
1162                 if p[UDP].dport == BFD.udp_dport_echo:
1163                     self.assert_equal(
1164                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1165                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1166                                         "BFD ECHO src IP equal to loopback IP")
1167                     self.logger.debug(ppp("Looping back packet:", p))
1168                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1169                                       "ECHO packet destination MAC address")
1170                     p[Ether].dst = self.pg0.local_mac
1171                     self.pg0.add_stream(p)
1172                     self.pg_start()
1173                     echo_seen = True
1174                 elif p.haslayer(BFD):
1175                     if echo_seen:
1176                         self.assertGreaterEqual(
1177                             p[BFD].required_min_rx_interval,
1178                             1000000)
1179                     if "P" in p.sprintf("%BFD.flags%"):
1180                         final = self.test_session.create_packet()
1181                         final[BFD].flags = "F"
1182                         self.test_session.send_packet(final)
1183                 else:
1184                     raise Exception(ppp("Received unknown packet:", p))
1185
1186                 self.assert_equal(len(self.vapi.collect_events()), 0,
1187                                   "number of bfd events")
1188             self.test_session.send_packet()
1189         self.assertTrue(echo_seen, "No echo packets received")
1190
1191     def test_echo_fail(self):
1192         """ session goes down if echo function fails """
1193         bfd_session_up(self)
1194         self.test_session.update(required_min_echo_rx=150000)
1195         self.test_session.send_packet()
1196         detection_time = self.test_session.detect_mult *\
1197             self.vpp_session.required_min_rx / USEC_IN_SEC
1198         self.vapi.bfd_udp_set_echo_source(
1199             sw_if_index=self.loopback0.sw_if_index)
1200         # echo function should be used now, but we will drop the echo packets
1201         verified_diag = False
1202         for dummy in range(3):
1203             loop_until = time.time() + 0.75 * detection_time
1204             while time.time() < loop_until:
1205                 p = self.pg0.wait_for_packet(1)
1206                 self.logger.debug(ppp("Got packet:", p))
1207                 if p[UDP].dport == BFD.udp_dport_echo:
1208                     # dropped
1209                     pass
1210                 elif p.haslayer(BFD):
1211                     if "P" in p.sprintf("%BFD.flags%"):
1212                         self.assertGreaterEqual(
1213                             p[BFD].required_min_rx_interval,
1214                             1000000)
1215                         final = self.test_session.create_packet()
1216                         final[BFD].flags = "F"
1217                         self.test_session.send_packet(final)
1218                     if p[BFD].state == BFDState.down:
1219                         self.assert_equal(p[BFD].diag,
1220                                           BFDDiagCode.echo_function_failed,
1221                                           BFDDiagCode)
1222                         verified_diag = True
1223                 else:
1224                     raise Exception(ppp("Received unknown packet:", p))
1225             self.test_session.send_packet()
1226         events = self.vapi.collect_events()
1227         self.assert_equal(len(events), 1, "number of bfd events")
1228         self.assert_equal(events[0].state, BFDState.down, BFDState)
1229         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1230
1231     def test_echo_stop(self):
1232         """ echo function stops if peer sets required min echo rx zero """
1233         bfd_session_up(self)
1234         self.test_session.update(required_min_echo_rx=150000)
1235         self.test_session.send_packet()
1236         self.vapi.bfd_udp_set_echo_source(
1237             sw_if_index=self.loopback0.sw_if_index)
1238         # wait for first echo packet
1239         while True:
1240             p = self.pg0.wait_for_packet(1)
1241             self.logger.debug(ppp("Got packet:", p))
1242             if p[UDP].dport == BFD.udp_dport_echo:
1243                 self.logger.debug(ppp("Looping back packet:", p))
1244                 p[Ether].dst = self.pg0.local_mac
1245                 self.pg0.add_stream(p)
1246                 self.pg_start()
1247                 break
1248             elif p.haslayer(BFD):
1249                 # ignore BFD
1250                 pass
1251             else:
1252                 raise Exception(ppp("Received unknown packet:", p))
1253         self.test_session.update(required_min_echo_rx=0)
1254         self.test_session.send_packet()
1255         # echo packets shouldn't arrive anymore
1256         for dummy in range(5):
1257             wait_for_bfd_packet(
1258                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1259             self.test_session.send_packet()
1260             events = self.vapi.collect_events()
1261             self.assert_equal(len(events), 0, "number of bfd events")
1262
1263     def test_echo_source_removed(self):
1264         """ echo function stops if echo source is removed """
1265         bfd_session_up(self)
1266         self.test_session.update(required_min_echo_rx=150000)
1267         self.test_session.send_packet()
1268         self.vapi.bfd_udp_set_echo_source(
1269             sw_if_index=self.loopback0.sw_if_index)
1270         # wait for first echo packet
1271         while True:
1272             p = self.pg0.wait_for_packet(1)
1273             self.logger.debug(ppp("Got packet:", p))
1274             if p[UDP].dport == BFD.udp_dport_echo:
1275                 self.logger.debug(ppp("Looping back packet:", p))
1276                 p[Ether].dst = self.pg0.local_mac
1277                 self.pg0.add_stream(p)
1278                 self.pg_start()
1279                 break
1280             elif p.haslayer(BFD):
1281                 # ignore BFD
1282                 pass
1283             else:
1284                 raise Exception(ppp("Received unknown packet:", p))
1285         self.vapi.bfd_udp_del_echo_source()
1286         self.test_session.send_packet()
1287         # echo packets shouldn't arrive anymore
1288         for dummy in range(5):
1289             wait_for_bfd_packet(
1290                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1291             self.test_session.send_packet()
1292             events = self.vapi.collect_events()
1293             self.assert_equal(len(events), 0, "number of bfd events")
1294
1295     def test_stale_echo(self):
1296         """ stale echo packets don't keep a session up """
1297         bfd_session_up(self)
1298         self.test_session.update(required_min_echo_rx=150000)
1299         self.vapi.bfd_udp_set_echo_source(
1300             sw_if_index=self.loopback0.sw_if_index)
1301         self.test_session.send_packet()
1302         # should be turned on - loopback echo packets
1303         echo_packet = None
1304         timeout_at = None
1305         timeout_ok = False
1306         for dummy in range(10 * self.vpp_session.detect_mult):
1307             p = self.pg0.wait_for_packet(1)
1308             if p[UDP].dport == BFD.udp_dport_echo:
1309                 if echo_packet is None:
1310                     self.logger.debug(ppp("Got first echo packet:", p))
1311                     echo_packet = p
1312                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1313                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1314                 else:
1315                     self.logger.debug(ppp("Got followup echo packet:", p))
1316                 self.logger.debug(ppp("Looping back first echo packet:", p))
1317                 echo_packet[Ether].dst = self.pg0.local_mac
1318                 self.pg0.add_stream(echo_packet)
1319                 self.pg_start()
1320             elif p.haslayer(BFD):
1321                 self.logger.debug(ppp("Got packet:", p))
1322                 if "P" in p.sprintf("%BFD.flags%"):
1323                     final = self.test_session.create_packet()
1324                     final[BFD].flags = "F"
1325                     self.test_session.send_packet(final)
1326                 if p[BFD].state == BFDState.down:
1327                     self.assertIsNotNone(
1328                         timeout_at,
1329                         "Session went down before first echo packet received")
1330                     now = time.time()
1331                     self.assertGreaterEqual(
1332                         now, timeout_at,
1333                         "Session timeout at %s, but is expected at %s" %
1334                         (now, timeout_at))
1335                     self.assert_equal(p[BFD].diag,
1336                                       BFDDiagCode.echo_function_failed,
1337                                       BFDDiagCode)
1338                     events = self.vapi.collect_events()
1339                     self.assert_equal(len(events), 1, "number of bfd events")
1340                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1341                     timeout_ok = True
1342                     break
1343             else:
1344                 raise Exception(ppp("Received unknown packet:", p))
1345             self.test_session.send_packet()
1346         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1347
1348     def test_invalid_echo_checksum(self):
1349         """ echo packets with invalid checksum don't keep a session up """
1350         bfd_session_up(self)
1351         self.test_session.update(required_min_echo_rx=150000)
1352         self.vapi.bfd_udp_set_echo_source(
1353             sw_if_index=self.loopback0.sw_if_index)
1354         self.test_session.send_packet()
1355         # should be turned on - loopback echo packets
1356         timeout_at = None
1357         timeout_ok = False
1358         for dummy in range(10 * self.vpp_session.detect_mult):
1359             p = self.pg0.wait_for_packet(1)
1360             if p[UDP].dport == BFD.udp_dport_echo:
1361                 self.logger.debug(ppp("Got echo packet:", p))
1362                 if timeout_at is None:
1363                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1364                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1365                 p[BFD_vpp_echo].checksum = getrandbits(64)
1366                 p[Ether].dst = self.pg0.local_mac
1367                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1368                 self.pg0.add_stream(p)
1369                 self.pg_start()
1370             elif p.haslayer(BFD):
1371                 self.logger.debug(ppp("Got packet:", p))
1372                 if "P" in p.sprintf("%BFD.flags%"):
1373                     final = self.test_session.create_packet()
1374                     final[BFD].flags = "F"
1375                     self.test_session.send_packet(final)
1376                 if p[BFD].state == BFDState.down:
1377                     self.assertIsNotNone(
1378                         timeout_at,
1379                         "Session went down before first echo packet received")
1380                     now = time.time()
1381                     self.assertGreaterEqual(
1382                         now, timeout_at,
1383                         "Session timeout at %s, but is expected at %s" %
1384                         (now, timeout_at))
1385                     self.assert_equal(p[BFD].diag,
1386                                       BFDDiagCode.echo_function_failed,
1387                                       BFDDiagCode)
1388                     events = self.vapi.collect_events()
1389                     self.assert_equal(len(events), 1, "number of bfd events")
1390                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1391                     timeout_ok = True
1392                     break
1393             else:
1394                 raise Exception(ppp("Received unknown packet:", p))
1395             self.test_session.send_packet()
1396         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1397
1398     def test_admin_up_down(self):
1399         """ put session admin-up and admin-down """
1400         bfd_session_up(self)
1401         self.vpp_session.admin_down()
1402         self.pg0.enable_capture()
1403         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1404         verify_event(self, e, expected_state=BFDState.admin_down)
1405         for dummy in range(2):
1406             p = wait_for_bfd_packet(self)
1407             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1408         # try to bring session up - shouldn't be possible
1409         self.test_session.update(state=BFDState.init)
1410         self.test_session.send_packet()
1411         for dummy in range(2):
1412             p = wait_for_bfd_packet(self)
1413             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1414         self.vpp_session.admin_up()
1415         self.test_session.update(state=BFDState.down)
1416         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1417         verify_event(self, e, expected_state=BFDState.down)
1418         p = wait_for_bfd_packet(
1419             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1420         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1421         self.test_session.send_packet()
1422         p = wait_for_bfd_packet(
1423             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1424         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1425         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1426         verify_event(self, e, expected_state=BFDState.init)
1427         self.test_session.update(state=BFDState.up)
1428         self.test_session.send_packet()
1429         p = wait_for_bfd_packet(
1430             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1431         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1432         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1433         verify_event(self, e, expected_state=BFDState.up)
1434
1435     def test_config_change_remote_demand(self):
1436         """ configuration change while peer in demand mode """
1437         bfd_session_up(self)
1438         demand = self.test_session.create_packet()
1439         demand[BFD].flags = "D"
1440         self.test_session.send_packet(demand)
1441         self.vpp_session.modify_parameters(
1442             required_min_rx=2 * self.vpp_session.required_min_rx)
1443         p = wait_for_bfd_packet(
1444             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1445         # poll bit must be set
1446         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1447         # terminate poll sequence
1448         final = self.test_session.create_packet()
1449         final[BFD].flags = "D+F"
1450         self.test_session.send_packet(final)
1451         # vpp should be quiet now again
1452         transmit_time = 0.9 \
1453             * max(self.vpp_session.required_min_rx,
1454                   self.test_session.desired_min_tx) \
1455             / USEC_IN_SEC
1456         count = 0
1457         for dummy in range(self.test_session.detect_mult * 2):
1458             self.sleep(transmit_time)
1459             self.test_session.send_packet(demand)
1460             try:
1461                 p = wait_for_bfd_packet(self, timeout=0)
1462                 self.logger.error(ppp("Received unexpected packet:", p))
1463                 count += 1
1464             except CaptureTimeoutError:
1465                 pass
1466         events = self.vapi.collect_events()
1467         for e in events:
1468             self.logger.error("Received unexpected event: %s", e)
1469         self.assert_equal(count, 0, "number of packets received")
1470         self.assert_equal(len(events), 0, "number of events received")
1471
1472     def test_intf_deleted(self):
1473         """ interface with bfd session deleted """
1474         intf = VppLoInterface(self)
1475         intf.config_ip4()
1476         intf.admin_up()
1477         sw_if_index = intf.sw_if_index
1478         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1479         vpp_session.add_vpp_config()
1480         vpp_session.admin_up()
1481         intf.remove_vpp_config()
1482         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1483         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1484         self.assertFalse(vpp_session.query_vpp_config())
1485
1486
1487 class BFD6TestCase(VppTestCase):
1488     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1489
1490     pg0 = None
1491     vpp_clock_offset = None
1492     vpp_session = None
1493     test_session = None
1494
1495     @classmethod
1496     def setUpClass(cls):
1497         super(BFD6TestCase, cls).setUpClass()
1498         cls.vapi.cli("set log class bfd level debug")
1499         try:
1500             cls.create_pg_interfaces([0])
1501             cls.pg0.config_ip6()
1502             cls.pg0.configure_ipv6_neighbors()
1503             cls.pg0.admin_up()
1504             cls.pg0.resolve_ndp()
1505             cls.create_loopback_interfaces(1)
1506             cls.loopback0 = cls.lo_interfaces[0]
1507             cls.loopback0.config_ip6()
1508             cls.loopback0.admin_up()
1509
1510         except Exception:
1511             super(BFD6TestCase, cls).tearDownClass()
1512             raise
1513
1514     @classmethod
1515     def tearDownClass(cls):
1516         super(BFD6TestCase, cls).tearDownClass()
1517
1518     def setUp(self):
1519         super(BFD6TestCase, self).setUp()
1520         self.factory = AuthKeyFactory()
1521         self.vapi.want_bfd_events()
1522         self.pg0.enable_capture()
1523         try:
1524             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1525                                                 self.pg0.remote_ip6,
1526                                                 af=AF_INET6)
1527             self.vpp_session.add_vpp_config()
1528             self.vpp_session.admin_up()
1529             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1530             self.logger.debug(self.vapi.cli("show adj nbr"))
1531         except BaseException:
1532             self.vapi.want_bfd_events(enable_disable=0)
1533             raise
1534
1535     def tearDown(self):
1536         if not self.vpp_dead:
1537             self.vapi.want_bfd_events(enable_disable=0)
1538         self.vapi.collect_events()  # clear the event queue
1539         super(BFD6TestCase, self).tearDown()
1540
1541     def test_session_up(self):
1542         """ bring BFD session up """
1543         bfd_session_up(self)
1544
1545     def test_session_up_by_ip(self):
1546         """ bring BFD session up - first frame looked up by address pair """
1547         self.logger.info("BFD: Sending Slow control frame")
1548         self.test_session.update(my_discriminator=randint(0, 40000000))
1549         self.test_session.send_packet()
1550         self.pg0.enable_capture()
1551         p = self.pg0.wait_for_packet(1)
1552         self.assert_equal(p[BFD].your_discriminator,
1553                           self.test_session.my_discriminator,
1554                           "BFD - your discriminator")
1555         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1556         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1557                                  state=BFDState.up)
1558         self.logger.info("BFD: Waiting for event")
1559         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1560         verify_event(self, e, expected_state=BFDState.init)
1561         self.logger.info("BFD: Sending Up")
1562         self.test_session.send_packet()
1563         self.logger.info("BFD: Waiting for event")
1564         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1565         verify_event(self, e, expected_state=BFDState.up)
1566         self.logger.info("BFD: Session is Up")
1567         self.test_session.update(state=BFDState.up)
1568         self.test_session.send_packet()
1569         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1570
1571     def test_hold_up(self):
1572         """ hold BFD session up """
1573         bfd_session_up(self)
1574         for dummy in range(self.test_session.detect_mult * 2):
1575             wait_for_bfd_packet(self)
1576             self.test_session.send_packet()
1577         self.assert_equal(len(self.vapi.collect_events()), 0,
1578                           "number of bfd events")
1579         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1580
1581     def test_echo_looped_back(self):
1582         """ echo packets looped back """
1583         # don't need a session in this case..
1584         self.vpp_session.remove_vpp_config()
1585         self.pg0.enable_capture()
1586         echo_packet_count = 10
1587         # random source port low enough to increment a few times..
1588         udp_sport_tx = randint(1, 50000)
1589         udp_sport_rx = udp_sport_tx
1590         echo_packet = (Ether(src=self.pg0.remote_mac,
1591                              dst=self.pg0.local_mac) /
1592                        IPv6(src=self.pg0.remote_ip6,
1593                             dst=self.pg0.remote_ip6) /
1594                        UDP(dport=BFD.udp_dport_echo) /
1595                        Raw("this should be looped back"))
1596         for dummy in range(echo_packet_count):
1597             self.sleep(.01, "delay between echo packets")
1598             echo_packet[UDP].sport = udp_sport_tx
1599             udp_sport_tx += 1
1600             self.logger.debug(ppp("Sending packet:", echo_packet))
1601             self.pg0.add_stream(echo_packet)
1602             self.pg_start()
1603         for dummy in range(echo_packet_count):
1604             p = self.pg0.wait_for_packet(1)
1605             self.logger.debug(ppp("Got packet:", p))
1606             ether = p[Ether]
1607             self.assert_equal(self.pg0.remote_mac,
1608                               ether.dst, "Destination MAC")
1609             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1610             ip = p[IPv6]
1611             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1612             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1613             udp = p[UDP]
1614             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1615                               "UDP destination port")
1616             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1617             udp_sport_rx += 1
1618             # need to compare the hex payload here, otherwise BFD_vpp_echo
1619             # gets in way
1620             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1621                              scapy.compat.raw(echo_packet[UDP].payload),
1622                              "Received packet is not the echo packet sent")
1623         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1624                           "ECHO packet identifier for test purposes)")
1625         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1626                           "ECHO packet identifier for test purposes)")
1627
1628     def test_echo(self):
1629         """ echo function """
1630         bfd_session_up(self)
1631         self.test_session.update(required_min_echo_rx=150000)
1632         self.test_session.send_packet()
1633         detection_time = self.test_session.detect_mult *\
1634             self.vpp_session.required_min_rx / USEC_IN_SEC
1635         # echo shouldn't work without echo source set
1636         for dummy in range(10):
1637             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1638             self.sleep(sleep, "delay before sending bfd packet")
1639             self.test_session.send_packet()
1640         p = wait_for_bfd_packet(
1641             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1642         self.assert_equal(p[BFD].required_min_rx_interval,
1643                           self.vpp_session.required_min_rx,
1644                           "BFD required min rx interval")
1645         self.test_session.send_packet()
1646         self.vapi.bfd_udp_set_echo_source(
1647             sw_if_index=self.loopback0.sw_if_index)
1648         echo_seen = False
1649         # should be turned on - loopback echo packets
1650         for dummy in range(3):
1651             loop_until = time.time() + 0.75 * detection_time
1652             while time.time() < loop_until:
1653                 p = self.pg0.wait_for_packet(1)
1654                 self.logger.debug(ppp("Got packet:", p))
1655                 if p[UDP].dport == BFD.udp_dport_echo:
1656                     self.assert_equal(
1657                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1658                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1659                                         "BFD ECHO src IP equal to loopback IP")
1660                     self.logger.debug(ppp("Looping back packet:", p))
1661                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1662                                       "ECHO packet destination MAC address")
1663                     p[Ether].dst = self.pg0.local_mac
1664                     self.pg0.add_stream(p)
1665                     self.pg_start()
1666                     echo_seen = True
1667                 elif p.haslayer(BFD):
1668                     if echo_seen:
1669                         self.assertGreaterEqual(
1670                             p[BFD].required_min_rx_interval,
1671                             1000000)
1672                     if "P" in p.sprintf("%BFD.flags%"):
1673                         final = self.test_session.create_packet()
1674                         final[BFD].flags = "F"
1675                         self.test_session.send_packet(final)
1676                 else:
1677                     raise Exception(ppp("Received unknown packet:", p))
1678
1679                 self.assert_equal(len(self.vapi.collect_events()), 0,
1680                                   "number of bfd events")
1681             self.test_session.send_packet()
1682         self.assertTrue(echo_seen, "No echo packets received")
1683
1684     def test_intf_deleted(self):
1685         """ interface with bfd session deleted """
1686         intf = VppLoInterface(self)
1687         intf.config_ip6()
1688         intf.admin_up()
1689         sw_if_index = intf.sw_if_index
1690         vpp_session = VppBFDUDPSession(
1691             self, intf, intf.remote_ip6, af=AF_INET6)
1692         vpp_session.add_vpp_config()
1693         vpp_session.admin_up()
1694         intf.remove_vpp_config()
1695         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1696         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1697         self.assertFalse(vpp_session.query_vpp_config())
1698
1699
1700 class BFDFIBTestCase(VppTestCase):
1701     """ BFD-FIB interactions (IPv6) """
1702
1703     vpp_session = None
1704     test_session = None
1705
1706     @classmethod
1707     def setUpClass(cls):
1708         super(BFDFIBTestCase, cls).setUpClass()
1709
1710     @classmethod
1711     def tearDownClass(cls):
1712         super(BFDFIBTestCase, cls).tearDownClass()
1713
1714     def setUp(self):
1715         super(BFDFIBTestCase, self).setUp()
1716         self.create_pg_interfaces(range(1))
1717
1718         self.vapi.want_bfd_events()
1719         self.pg0.enable_capture()
1720
1721         for i in self.pg_interfaces:
1722             i.admin_up()
1723             i.config_ip6()
1724             i.configure_ipv6_neighbors()
1725
1726     def tearDown(self):
1727         if not self.vpp_dead:
1728             self.vapi.want_bfd_events(enable_disable=False)
1729
1730         super(BFDFIBTestCase, self).tearDown()
1731
1732     @staticmethod
1733     def pkt_is_not_data_traffic(p):
1734         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1735         if p.haslayer(BFD) or is_ipv6_misc(p):
1736             return True
1737         return False
1738
1739     def test_session_with_fib(self):
1740         """ BFD-FIB interactions """
1741
1742         # packets to match against both of the routes
1743         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1744               IPv6(src="3001::1", dst="2001::1") /
1745               UDP(sport=1234, dport=1234) /
1746               Raw(b'\xa5' * 100)),
1747              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1748               IPv6(src="3001::1", dst="2002::1") /
1749               UDP(sport=1234, dport=1234) /
1750               Raw(b'\xa5' * 100))]
1751
1752         # A recursive and a non-recursive route via a next-hop that
1753         # will have a BFD session
1754         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1755                                   [VppRoutePath(self.pg0.remote_ip6,
1756                                                 self.pg0.sw_if_index)])
1757         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1758                                   [VppRoutePath(self.pg0.remote_ip6,
1759                                                 0xffffffff)])
1760         ip_2001_s_64.add_vpp_config()
1761         ip_2002_s_64.add_vpp_config()
1762
1763         # bring the session up now the routes are present
1764         self.vpp_session = VppBFDUDPSession(self,
1765                                             self.pg0,
1766                                             self.pg0.remote_ip6,
1767                                             af=AF_INET6)
1768         self.vpp_session.add_vpp_config()
1769         self.vpp_session.admin_up()
1770         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1771
1772         # session is up - traffic passes
1773         bfd_session_up(self)
1774
1775         self.pg0.add_stream(p)
1776         self.pg_start()
1777         for packet in p:
1778             captured = self.pg0.wait_for_packet(
1779                 1,
1780                 filter_out_fn=self.pkt_is_not_data_traffic)
1781             self.assertEqual(captured[IPv6].dst,
1782                              packet[IPv6].dst)
1783
1784         # session is up - traffic is dropped
1785         bfd_session_down(self)
1786
1787         self.pg0.add_stream(p)
1788         self.pg_start()
1789         with self.assertRaises(CaptureTimeoutError):
1790             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1791
1792         # session is up - traffic passes
1793         bfd_session_up(self)
1794
1795         self.pg0.add_stream(p)
1796         self.pg_start()
1797         for packet in p:
1798             captured = self.pg0.wait_for_packet(
1799                 1,
1800                 filter_out_fn=self.pkt_is_not_data_traffic)
1801             self.assertEqual(captured[IPv6].dst,
1802                              packet[IPv6].dst)
1803
1804
1805 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1806 class BFDTunTestCase(VppTestCase):
1807     """ BFD over GRE tunnel """
1808
1809     vpp_session = None
1810     test_session = None
1811
1812     @classmethod
1813     def setUpClass(cls):
1814         super(BFDTunTestCase, cls).setUpClass()
1815
1816     @classmethod
1817     def tearDownClass(cls):
1818         super(BFDTunTestCase, cls).tearDownClass()
1819
1820     def setUp(self):
1821         super(BFDTunTestCase, self).setUp()
1822         self.create_pg_interfaces(range(1))
1823
1824         self.vapi.want_bfd_events()
1825         self.pg0.enable_capture()
1826
1827         for i in self.pg_interfaces:
1828             i.admin_up()
1829             i.config_ip4()
1830             i.resolve_arp()
1831
1832     def tearDown(self):
1833         if not self.vpp_dead:
1834             self.vapi.want_bfd_events(enable_disable=0)
1835
1836         super(BFDTunTestCase, self).tearDown()
1837
1838     @staticmethod
1839     def pkt_is_not_data_traffic(p):
1840         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1841         if p.haslayer(BFD) or is_ipv6_misc(p):
1842             return True
1843         return False
1844
1845     def test_bfd_o_gre(self):
1846         """ BFD-o-GRE  """
1847
1848         # A GRE interface over which to run a BFD session
1849         gre_if = VppGreInterface(self,
1850                                  self.pg0.local_ip4,
1851                                  self.pg0.remote_ip4)
1852         gre_if.add_vpp_config()
1853         gre_if.admin_up()
1854         gre_if.config_ip4()
1855
1856         # bring the session up now the routes are present
1857         self.vpp_session = VppBFDUDPSession(self,
1858                                             gre_if,
1859                                             gre_if.remote_ip4,
1860                                             is_tunnel=True)
1861         self.vpp_session.add_vpp_config()
1862         self.vpp_session.admin_up()
1863
1864         self.test_session = BFDTestSession(
1865             self, gre_if, AF_INET,
1866             tunnel_header=(IP(src=self.pg0.remote_ip4,
1867                               dst=self.pg0.local_ip4) /
1868                            GRE()),
1869             phy_interface=self.pg0)
1870
1871         # packets to match against both of the routes
1872         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1873               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1874               UDP(sport=1234, dport=1234) /
1875               Raw(b'\xa5' * 100))]
1876
1877         # session is up - traffic passes
1878         bfd_session_up(self)
1879
1880         self.send_and_expect(self.pg0, p, self.pg0)
1881
1882         # bring session down
1883         bfd_session_down(self)
1884
1885
1886 class BFDSHA1TestCase(VppTestCase):
1887     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1888
1889     pg0 = None
1890     vpp_clock_offset = None
1891     vpp_session = None
1892     test_session = None
1893
1894     @classmethod
1895     def setUpClass(cls):
1896         super(BFDSHA1TestCase, cls).setUpClass()
1897         cls.vapi.cli("set log class bfd level debug")
1898         try:
1899             cls.create_pg_interfaces([0])
1900             cls.pg0.config_ip4()
1901             cls.pg0.admin_up()
1902             cls.pg0.resolve_arp()
1903
1904         except Exception:
1905             super(BFDSHA1TestCase, cls).tearDownClass()
1906             raise
1907
1908     @classmethod
1909     def tearDownClass(cls):
1910         super(BFDSHA1TestCase, cls).tearDownClass()
1911
1912     def setUp(self):
1913         super(BFDSHA1TestCase, self).setUp()
1914         self.factory = AuthKeyFactory()
1915         self.vapi.want_bfd_events()
1916         self.pg0.enable_capture()
1917
1918     def tearDown(self):
1919         if not self.vpp_dead:
1920             self.vapi.want_bfd_events(enable_disable=False)
1921         self.vapi.collect_events()  # clear the event queue
1922         super(BFDSHA1TestCase, self).tearDown()
1923
1924     def test_session_up(self):
1925         """ bring BFD session up """
1926         key = self.factory.create_random_key(self)
1927         key.add_vpp_config()
1928         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1929                                             self.pg0.remote_ip4,
1930                                             sha1_key=key)
1931         self.vpp_session.add_vpp_config()
1932         self.vpp_session.admin_up()
1933         self.test_session = BFDTestSession(
1934             self, self.pg0, AF_INET, sha1_key=key,
1935             bfd_key_id=self.vpp_session.bfd_key_id)
1936         bfd_session_up(self)
1937
1938     def test_hold_up(self):
1939         """ hold BFD session up """
1940         key = self.factory.create_random_key(self)
1941         key.add_vpp_config()
1942         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1943                                             self.pg0.remote_ip4,
1944                                             sha1_key=key)
1945         self.vpp_session.add_vpp_config()
1946         self.vpp_session.admin_up()
1947         self.test_session = BFDTestSession(
1948             self, self.pg0, AF_INET, sha1_key=key,
1949             bfd_key_id=self.vpp_session.bfd_key_id)
1950         bfd_session_up(self)
1951         for dummy in range(self.test_session.detect_mult * 2):
1952             wait_for_bfd_packet(self)
1953             self.test_session.send_packet()
1954         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1955
1956     def test_hold_up_meticulous(self):
1957         """ hold BFD session up - meticulous auth """
1958         key = self.factory.create_random_key(
1959             self, BFDAuthType.meticulous_keyed_sha1)
1960         key.add_vpp_config()
1961         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1962                                             self.pg0.remote_ip4, sha1_key=key)
1963         self.vpp_session.add_vpp_config()
1964         self.vpp_session.admin_up()
1965         # specify sequence number so that it wraps
1966         self.test_session = BFDTestSession(
1967             self, self.pg0, AF_INET, sha1_key=key,
1968             bfd_key_id=self.vpp_session.bfd_key_id,
1969             our_seq_number=0xFFFFFFFF - 4)
1970         bfd_session_up(self)
1971         for dummy in range(30):
1972             wait_for_bfd_packet(self)
1973             self.test_session.inc_seq_num()
1974             self.test_session.send_packet()
1975         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1976
1977     def test_send_bad_seq_number(self):
1978         """ session is not kept alive by msgs with bad sequence numbers"""
1979         key = self.factory.create_random_key(
1980             self, BFDAuthType.meticulous_keyed_sha1)
1981         key.add_vpp_config()
1982         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1983                                             self.pg0.remote_ip4, sha1_key=key)
1984         self.vpp_session.add_vpp_config()
1985         self.test_session = BFDTestSession(
1986             self, self.pg0, AF_INET, sha1_key=key,
1987             bfd_key_id=self.vpp_session.bfd_key_id)
1988         bfd_session_up(self)
1989         detection_time = self.test_session.detect_mult *\
1990             self.vpp_session.required_min_rx / USEC_IN_SEC
1991         send_until = time.time() + 2 * detection_time
1992         while time.time() < send_until:
1993             self.test_session.send_packet()
1994             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1995                        "time between bfd packets")
1996         e = self.vapi.collect_events()
1997         # session should be down now, because the sequence numbers weren't
1998         # updated
1999         self.assert_equal(len(e), 1, "number of bfd events")
2000         verify_event(self, e[0], expected_state=BFDState.down)
2001
2002     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2003                                        legitimate_test_session,
2004                                        rogue_test_session,
2005                                        rogue_bfd_values=None):
2006         """ execute a rogue session interaction scenario
2007
2008         1. create vpp session, add config
2009         2. bring the legitimate session up
2010         3. copy the bfd values from legitimate session to rogue session
2011         4. apply rogue_bfd_values to rogue session
2012         5. set rogue session state to down
2013         6. send message to take the session down from the rogue session
2014         7. assert that the legitimate session is unaffected
2015         """
2016
2017         self.vpp_session = vpp_bfd_udp_session
2018         self.vpp_session.add_vpp_config()
2019         self.test_session = legitimate_test_session
2020         # bring vpp session up
2021         bfd_session_up(self)
2022         # send packet from rogue session
2023         rogue_test_session.update(
2024             my_discriminator=self.test_session.my_discriminator,
2025             your_discriminator=self.test_session.your_discriminator,
2026             desired_min_tx=self.test_session.desired_min_tx,
2027             required_min_rx=self.test_session.required_min_rx,
2028             detect_mult=self.test_session.detect_mult,
2029             diag=self.test_session.diag,
2030             state=self.test_session.state,
2031             auth_type=self.test_session.auth_type)
2032         if rogue_bfd_values:
2033             rogue_test_session.update(**rogue_bfd_values)
2034         rogue_test_session.update(state=BFDState.down)
2035         rogue_test_session.send_packet()
2036         wait_for_bfd_packet(self)
2037         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2038
2039     def test_mismatch_auth(self):
2040         """ session is not brought down by unauthenticated msg """
2041         key = self.factory.create_random_key(self)
2042         key.add_vpp_config()
2043         vpp_session = VppBFDUDPSession(
2044             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2045         legitimate_test_session = BFDTestSession(
2046             self, self.pg0, AF_INET, sha1_key=key,
2047             bfd_key_id=vpp_session.bfd_key_id)
2048         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2049         self.execute_rogue_session_scenario(vpp_session,
2050                                             legitimate_test_session,
2051                                             rogue_test_session)
2052
2053     def test_mismatch_bfd_key_id(self):
2054         """ session is not brought down by msg with non-existent key-id """
2055         key = self.factory.create_random_key(self)
2056         key.add_vpp_config()
2057         vpp_session = VppBFDUDPSession(
2058             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2059         # pick a different random bfd key id
2060         x = randint(0, 255)
2061         while x == vpp_session.bfd_key_id:
2062             x = randint(0, 255)
2063         legitimate_test_session = BFDTestSession(
2064             self, self.pg0, AF_INET, sha1_key=key,
2065             bfd_key_id=vpp_session.bfd_key_id)
2066         rogue_test_session = BFDTestSession(
2067             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2068         self.execute_rogue_session_scenario(vpp_session,
2069                                             legitimate_test_session,
2070                                             rogue_test_session)
2071
2072     def test_mismatched_auth_type(self):
2073         """ session is not brought down by msg with wrong auth type """
2074         key = self.factory.create_random_key(self)
2075         key.add_vpp_config()
2076         vpp_session = VppBFDUDPSession(
2077             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2078         legitimate_test_session = BFDTestSession(
2079             self, self.pg0, AF_INET, sha1_key=key,
2080             bfd_key_id=vpp_session.bfd_key_id)
2081         rogue_test_session = BFDTestSession(
2082             self, self.pg0, AF_INET, sha1_key=key,
2083             bfd_key_id=vpp_session.bfd_key_id)
2084         self.execute_rogue_session_scenario(
2085             vpp_session, legitimate_test_session, rogue_test_session,
2086             {'auth_type': BFDAuthType.keyed_md5})
2087
2088     def test_restart(self):
2089         """ simulate remote peer restart and resynchronization """
2090         key = self.factory.create_random_key(
2091             self, BFDAuthType.meticulous_keyed_sha1)
2092         key.add_vpp_config()
2093         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2094                                             self.pg0.remote_ip4, sha1_key=key)
2095         self.vpp_session.add_vpp_config()
2096         self.test_session = BFDTestSession(
2097             self, self.pg0, AF_INET, sha1_key=key,
2098             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2099         bfd_session_up(self)
2100         # don't send any packets for 2*detection_time
2101         detection_time = self.test_session.detect_mult *\
2102             self.vpp_session.required_min_rx / USEC_IN_SEC
2103         self.sleep(2 * detection_time, "simulating peer restart")
2104         events = self.vapi.collect_events()
2105         self.assert_equal(len(events), 1, "number of bfd events")
2106         verify_event(self, events[0], expected_state=BFDState.down)
2107         self.test_session.update(state=BFDState.down)
2108         # reset sequence number
2109         self.test_session.our_seq_number = 0
2110         self.test_session.vpp_seq_number = None
2111         # now throw away any pending packets
2112         self.pg0.enable_capture()
2113         self.test_session.my_discriminator = 0
2114         bfd_session_up(self)
2115
2116
2117 class BFDAuthOnOffTestCase(VppTestCase):
2118     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2119
2120     pg0 = None
2121     vpp_session = None
2122     test_session = None
2123
2124     @classmethod
2125     def setUpClass(cls):
2126         super(BFDAuthOnOffTestCase, cls).setUpClass()
2127         cls.vapi.cli("set log class bfd level debug")
2128         try:
2129             cls.create_pg_interfaces([0])
2130             cls.pg0.config_ip4()
2131             cls.pg0.admin_up()
2132             cls.pg0.resolve_arp()
2133
2134         except Exception:
2135             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2136             raise
2137
2138     @classmethod
2139     def tearDownClass(cls):
2140         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2141
2142     def setUp(self):
2143         super(BFDAuthOnOffTestCase, self).setUp()
2144         self.factory = AuthKeyFactory()
2145         self.vapi.want_bfd_events()
2146         self.pg0.enable_capture()
2147
2148     def tearDown(self):
2149         if not self.vpp_dead:
2150             self.vapi.want_bfd_events(enable_disable=False)
2151         self.vapi.collect_events()  # clear the event queue
2152         super(BFDAuthOnOffTestCase, self).tearDown()
2153
2154     def test_auth_on_immediate(self):
2155         """ turn auth on without disturbing session state (immediate) """
2156         key = self.factory.create_random_key(self)
2157         key.add_vpp_config()
2158         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2159                                             self.pg0.remote_ip4)
2160         self.vpp_session.add_vpp_config()
2161         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2162         bfd_session_up(self)
2163         for dummy in range(self.test_session.detect_mult * 2):
2164             p = wait_for_bfd_packet(self)
2165             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2166             self.test_session.send_packet()
2167         self.vpp_session.activate_auth(key)
2168         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2169         self.test_session.sha1_key = key
2170         for dummy in range(self.test_session.detect_mult * 2):
2171             p = wait_for_bfd_packet(self)
2172             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2173             self.test_session.send_packet()
2174         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2175         self.assert_equal(len(self.vapi.collect_events()), 0,
2176                           "number of bfd events")
2177
2178     def test_auth_off_immediate(self):
2179         """ turn auth off without disturbing session state (immediate) """
2180         key = self.factory.create_random_key(self)
2181         key.add_vpp_config()
2182         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2183                                             self.pg0.remote_ip4, sha1_key=key)
2184         self.vpp_session.add_vpp_config()
2185         self.test_session = BFDTestSession(
2186             self, self.pg0, AF_INET, sha1_key=key,
2187             bfd_key_id=self.vpp_session.bfd_key_id)
2188         bfd_session_up(self)
2189         # self.vapi.want_bfd_events(enable_disable=0)
2190         for dummy in range(self.test_session.detect_mult * 2):
2191             p = wait_for_bfd_packet(self)
2192             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2193             self.test_session.inc_seq_num()
2194             self.test_session.send_packet()
2195         self.vpp_session.deactivate_auth()
2196         self.test_session.bfd_key_id = None
2197         self.test_session.sha1_key = None
2198         for dummy in range(self.test_session.detect_mult * 2):
2199             p = wait_for_bfd_packet(self)
2200             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2201             self.test_session.inc_seq_num()
2202             self.test_session.send_packet()
2203         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2204         self.assert_equal(len(self.vapi.collect_events()), 0,
2205                           "number of bfd events")
2206
2207     def test_auth_change_key_immediate(self):
2208         """ change auth key without disturbing session state (immediate) """
2209         key1 = self.factory.create_random_key(self)
2210         key1.add_vpp_config()
2211         key2 = self.factory.create_random_key(self)
2212         key2.add_vpp_config()
2213         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2214                                             self.pg0.remote_ip4, sha1_key=key1)
2215         self.vpp_session.add_vpp_config()
2216         self.test_session = BFDTestSession(
2217             self, self.pg0, AF_INET, sha1_key=key1,
2218             bfd_key_id=self.vpp_session.bfd_key_id)
2219         bfd_session_up(self)
2220         for dummy in range(self.test_session.detect_mult * 2):
2221             p = wait_for_bfd_packet(self)
2222             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2223             self.test_session.send_packet()
2224         self.vpp_session.activate_auth(key2)
2225         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2226         self.test_session.sha1_key = key2
2227         for dummy in range(self.test_session.detect_mult * 2):
2228             p = wait_for_bfd_packet(self)
2229             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2230             self.test_session.send_packet()
2231         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2232         self.assert_equal(len(self.vapi.collect_events()), 0,
2233                           "number of bfd events")
2234
2235     def test_auth_on_delayed(self):
2236         """ turn auth on without disturbing session state (delayed) """
2237         key = self.factory.create_random_key(self)
2238         key.add_vpp_config()
2239         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2240                                             self.pg0.remote_ip4)
2241         self.vpp_session.add_vpp_config()
2242         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2243         bfd_session_up(self)
2244         for dummy in range(self.test_session.detect_mult * 2):
2245             wait_for_bfd_packet(self)
2246             self.test_session.send_packet()
2247         self.vpp_session.activate_auth(key, delayed=True)
2248         for dummy in range(self.test_session.detect_mult * 2):
2249             p = wait_for_bfd_packet(self)
2250             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2251             self.test_session.send_packet()
2252         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2253         self.test_session.sha1_key = key
2254         self.test_session.send_packet()
2255         for dummy in range(self.test_session.detect_mult * 2):
2256             p = wait_for_bfd_packet(self)
2257             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2258             self.test_session.send_packet()
2259         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2260         self.assert_equal(len(self.vapi.collect_events()), 0,
2261                           "number of bfd events")
2262
2263     def test_auth_off_delayed(self):
2264         """ turn auth off without disturbing session state (delayed) """
2265         key = self.factory.create_random_key(self)
2266         key.add_vpp_config()
2267         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2268                                             self.pg0.remote_ip4, sha1_key=key)
2269         self.vpp_session.add_vpp_config()
2270         self.test_session = BFDTestSession(
2271             self, self.pg0, AF_INET, sha1_key=key,
2272             bfd_key_id=self.vpp_session.bfd_key_id)
2273         bfd_session_up(self)
2274         for dummy in range(self.test_session.detect_mult * 2):
2275             p = wait_for_bfd_packet(self)
2276             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2277             self.test_session.send_packet()
2278         self.vpp_session.deactivate_auth(delayed=True)
2279         for dummy in range(self.test_session.detect_mult * 2):
2280             p = wait_for_bfd_packet(self)
2281             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2282             self.test_session.send_packet()
2283         self.test_session.bfd_key_id = None
2284         self.test_session.sha1_key = None
2285         self.test_session.send_packet()
2286         for dummy in range(self.test_session.detect_mult * 2):
2287             p = wait_for_bfd_packet(self)
2288             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2289             self.test_session.send_packet()
2290         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2291         self.assert_equal(len(self.vapi.collect_events()), 0,
2292                           "number of bfd events")
2293
2294     def test_auth_change_key_delayed(self):
2295         """ change auth key without disturbing session state (delayed) """
2296         key1 = self.factory.create_random_key(self)
2297         key1.add_vpp_config()
2298         key2 = self.factory.create_random_key(self)
2299         key2.add_vpp_config()
2300         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2301                                             self.pg0.remote_ip4, sha1_key=key1)
2302         self.vpp_session.add_vpp_config()
2303         self.vpp_session.admin_up()
2304         self.test_session = BFDTestSession(
2305             self, self.pg0, AF_INET, sha1_key=key1,
2306             bfd_key_id=self.vpp_session.bfd_key_id)
2307         bfd_session_up(self)
2308         for dummy in range(self.test_session.detect_mult * 2):
2309             p = wait_for_bfd_packet(self)
2310             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2311             self.test_session.send_packet()
2312         self.vpp_session.activate_auth(key2, delayed=True)
2313         for dummy in range(self.test_session.detect_mult * 2):
2314             p = wait_for_bfd_packet(self)
2315             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2316             self.test_session.send_packet()
2317         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2318         self.test_session.sha1_key = key2
2319         self.test_session.send_packet()
2320         for dummy in range(self.test_session.detect_mult * 2):
2321             p = wait_for_bfd_packet(self)
2322             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2323             self.test_session.send_packet()
2324         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2325         self.assert_equal(len(self.vapi.collect_events()), 0,
2326                           "number of bfd events")
2327
2328
2329 class BFDCLITestCase(VppTestCase):
2330     """Bidirectional Forwarding Detection (BFD) (CLI) """
2331     pg0 = None
2332
2333     @classmethod
2334     def setUpClass(cls):
2335         super(BFDCLITestCase, cls).setUpClass()
2336         cls.vapi.cli("set log class bfd level debug")
2337         try:
2338             cls.create_pg_interfaces((0,))
2339             cls.pg0.config_ip4()
2340             cls.pg0.config_ip6()
2341             cls.pg0.resolve_arp()
2342             cls.pg0.resolve_ndp()
2343
2344         except Exception:
2345             super(BFDCLITestCase, cls).tearDownClass()
2346             raise
2347
2348     @classmethod
2349     def tearDownClass(cls):
2350         super(BFDCLITestCase, cls).tearDownClass()
2351
2352     def setUp(self):
2353         super(BFDCLITestCase, self).setUp()
2354         self.factory = AuthKeyFactory()
2355         self.pg0.enable_capture()
2356
2357     def tearDown(self):
2358         try:
2359             self.vapi.want_bfd_events(enable_disable=False)
2360         except UnexpectedApiReturnValueError:
2361             # some tests aren't subscribed, so this is not an issue
2362             pass
2363         self.vapi.collect_events()  # clear the event queue
2364         super(BFDCLITestCase, self).tearDown()
2365
2366     def cli_verify_no_response(self, cli):
2367         """ execute a CLI, asserting that the response is empty """
2368         self.assert_equal(self.vapi.cli(cli),
2369                           "",
2370                           "CLI command response")
2371
2372     def cli_verify_response(self, cli, expected):
2373         """ execute a CLI, asserting that the response matches expectation """
2374         try:
2375             reply = self.vapi.cli(cli)
2376         except CliFailedCommandError as cli_error:
2377             reply = str(cli_error)
2378         self.assert_equal(reply.strip(),
2379                           expected,
2380                           "CLI command response")
2381
2382     def test_show(self):
2383         """ show commands """
2384         k1 = self.factory.create_random_key(self)
2385         k1.add_vpp_config()
2386         k2 = self.factory.create_random_key(
2387             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2388         k2.add_vpp_config()
2389         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2390         s1.add_vpp_config()
2391         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2392                               sha1_key=k2)
2393         s2.add_vpp_config()
2394         self.logger.info(self.vapi.ppcli("show bfd keys"))
2395         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2396         self.logger.info(self.vapi.ppcli("show bfd"))
2397
2398     def test_set_del_sha1_key(self):
2399         """ set/delete SHA1 auth key """
2400         k = self.factory.create_random_key(self)
2401         self.registry.register(k, self.logger)
2402         self.cli_verify_no_response(
2403             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2404             (k.conf_key_id,
2405                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2406         self.assertTrue(k.query_vpp_config())
2407         self.vpp_session = VppBFDUDPSession(
2408             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2409         self.vpp_session.add_vpp_config()
2410         self.test_session = \
2411             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2412                            bfd_key_id=self.vpp_session.bfd_key_id)
2413         self.vapi.want_bfd_events()
2414         bfd_session_up(self)
2415         bfd_session_down(self)
2416         # try to replace the secret for the key - should fail because the key
2417         # is in-use
2418         k2 = self.factory.create_random_key(self)
2419         self.cli_verify_response(
2420             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2421             (k.conf_key_id,
2422                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2423             "bfd key set: `bfd_auth_set_key' API call failed, "
2424             "rv=-103:BFD object in use")
2425         # manipulating the session using old secret should still work
2426         bfd_session_up(self)
2427         bfd_session_down(self)
2428         self.vpp_session.remove_vpp_config()
2429         self.cli_verify_no_response(
2430             "bfd key del conf-key-id %s" % k.conf_key_id)
2431         self.assertFalse(k.query_vpp_config())
2432
2433     def test_set_del_meticulous_sha1_key(self):
2434         """ set/delete meticulous SHA1 auth key """
2435         k = self.factory.create_random_key(
2436             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2437         self.registry.register(k, self.logger)
2438         self.cli_verify_no_response(
2439             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2440             (k.conf_key_id,
2441                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2442         self.assertTrue(k.query_vpp_config())
2443         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2444                                             self.pg0.remote_ip6, af=AF_INET6,
2445                                             sha1_key=k)
2446         self.vpp_session.add_vpp_config()
2447         self.vpp_session.admin_up()
2448         self.test_session = \
2449             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2450                            bfd_key_id=self.vpp_session.bfd_key_id)
2451         self.vapi.want_bfd_events()
2452         bfd_session_up(self)
2453         bfd_session_down(self)
2454         # try to replace the secret for the key - should fail because the key
2455         # is in-use
2456         k2 = self.factory.create_random_key(self)
2457         self.cli_verify_response(
2458             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2459             (k.conf_key_id,
2460                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2461             "bfd key set: `bfd_auth_set_key' API call failed, "
2462             "rv=-103:BFD object in use")
2463         # manipulating the session using old secret should still work
2464         bfd_session_up(self)
2465         bfd_session_down(self)
2466         self.vpp_session.remove_vpp_config()
2467         self.cli_verify_no_response(
2468             "bfd key del conf-key-id %s" % k.conf_key_id)
2469         self.assertFalse(k.query_vpp_config())
2470
2471     def test_add_mod_del_bfd_udp(self):
2472         """ create/modify/delete IPv4 BFD UDP session """
2473         vpp_session = VppBFDUDPSession(
2474             self, self.pg0, self.pg0.remote_ip4)
2475         self.registry.register(vpp_session, self.logger)
2476         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2477             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2478             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2479                                 self.pg0.remote_ip4,
2480                                 vpp_session.desired_min_tx,
2481                                 vpp_session.required_min_rx,
2482                                 vpp_session.detect_mult)
2483         self.cli_verify_no_response(cli_add_cmd)
2484         # 2nd add should fail
2485         self.cli_verify_response(
2486             cli_add_cmd,
2487             "bfd udp session add: `bfd_add_add_session' API call"
2488             " failed, rv=-101:Duplicate BFD object")
2489         verify_bfd_session_config(self, vpp_session)
2490         mod_session = VppBFDUDPSession(
2491             self, self.pg0, self.pg0.remote_ip4,
2492             required_min_rx=2 * vpp_session.required_min_rx,
2493             desired_min_tx=3 * vpp_session.desired_min_tx,
2494             detect_mult=4 * vpp_session.detect_mult)
2495         self.cli_verify_no_response(
2496             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2497             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2498             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2499              mod_session.desired_min_tx, mod_session.required_min_rx,
2500              mod_session.detect_mult))
2501         verify_bfd_session_config(self, mod_session)
2502         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2503             "peer-addr %s" % (self.pg0.name,
2504                               self.pg0.local_ip4, self.pg0.remote_ip4)
2505         self.cli_verify_no_response(cli_del_cmd)
2506         # 2nd del is expected to fail
2507         self.cli_verify_response(
2508             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2509             " failed, rv=-102:No such BFD object")
2510         self.assertFalse(vpp_session.query_vpp_config())
2511
2512     def test_add_mod_del_bfd_udp6(self):
2513         """ create/modify/delete IPv6 BFD UDP session """
2514         vpp_session = VppBFDUDPSession(
2515             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2516         self.registry.register(vpp_session, self.logger)
2517         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2518             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2519             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2520                                 self.pg0.remote_ip6,
2521                                 vpp_session.desired_min_tx,
2522                                 vpp_session.required_min_rx,
2523                                 vpp_session.detect_mult)
2524         self.cli_verify_no_response(cli_add_cmd)
2525         # 2nd add should fail
2526         self.cli_verify_response(
2527             cli_add_cmd,
2528             "bfd udp session add: `bfd_add_add_session' API call"
2529             " failed, rv=-101:Duplicate BFD object")
2530         verify_bfd_session_config(self, vpp_session)
2531         mod_session = VppBFDUDPSession(
2532             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2533             required_min_rx=2 * vpp_session.required_min_rx,
2534             desired_min_tx=3 * vpp_session.desired_min_tx,
2535             detect_mult=4 * vpp_session.detect_mult)
2536         self.cli_verify_no_response(
2537             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2538             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2539             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2540              mod_session.desired_min_tx,
2541              mod_session.required_min_rx, mod_session.detect_mult))
2542         verify_bfd_session_config(self, mod_session)
2543         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2544             "peer-addr %s" % (self.pg0.name,
2545                               self.pg0.local_ip6, self.pg0.remote_ip6)
2546         self.cli_verify_no_response(cli_del_cmd)
2547         # 2nd del is expected to fail
2548         self.cli_verify_response(
2549             cli_del_cmd,
2550             "bfd udp session del: `bfd_udp_del_session' API call"
2551             " failed, rv=-102:No such BFD object")
2552         self.assertFalse(vpp_session.query_vpp_config())
2553
2554     def test_add_mod_del_bfd_udp_auth(self):
2555         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2556         key = self.factory.create_random_key(self)
2557         key.add_vpp_config()
2558         vpp_session = VppBFDUDPSession(
2559             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2560         self.registry.register(vpp_session, self.logger)
2561         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2562             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2563             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2564             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2565                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2566                vpp_session.detect_mult, key.conf_key_id,
2567                vpp_session.bfd_key_id)
2568         self.cli_verify_no_response(cli_add_cmd)
2569         # 2nd add should fail
2570         self.cli_verify_response(
2571             cli_add_cmd,
2572             "bfd udp session add: `bfd_add_add_session' API call"
2573             " failed, rv=-101:Duplicate BFD object")
2574         verify_bfd_session_config(self, vpp_session)
2575         mod_session = VppBFDUDPSession(
2576             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2577             bfd_key_id=vpp_session.bfd_key_id,
2578             required_min_rx=2 * vpp_session.required_min_rx,
2579             desired_min_tx=3 * vpp_session.desired_min_tx,
2580             detect_mult=4 * vpp_session.detect_mult)
2581         self.cli_verify_no_response(
2582             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2583             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2584             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2585              mod_session.desired_min_tx,
2586              mod_session.required_min_rx, mod_session.detect_mult))
2587         verify_bfd_session_config(self, mod_session)
2588         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2589             "peer-addr %s" % (self.pg0.name,
2590                               self.pg0.local_ip4, self.pg0.remote_ip4)
2591         self.cli_verify_no_response(cli_del_cmd)
2592         # 2nd del is expected to fail
2593         self.cli_verify_response(
2594             cli_del_cmd,
2595             "bfd udp session del: `bfd_udp_del_session' API call"
2596             " failed, rv=-102:No such BFD object")
2597         self.assertFalse(vpp_session.query_vpp_config())
2598
2599     def test_add_mod_del_bfd_udp6_auth(self):
2600         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2601         key = self.factory.create_random_key(
2602             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2603         key.add_vpp_config()
2604         vpp_session = VppBFDUDPSession(
2605             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2606         self.registry.register(vpp_session, self.logger)
2607         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2608             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2609             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2610             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2611                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2612                vpp_session.detect_mult, key.conf_key_id,
2613                vpp_session.bfd_key_id)
2614         self.cli_verify_no_response(cli_add_cmd)
2615         # 2nd add should fail
2616         self.cli_verify_response(
2617             cli_add_cmd,
2618             "bfd udp session add: `bfd_add_add_session' API call"
2619             " failed, rv=-101:Duplicate BFD object")
2620         verify_bfd_session_config(self, vpp_session)
2621         mod_session = VppBFDUDPSession(
2622             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2623             bfd_key_id=vpp_session.bfd_key_id,
2624             required_min_rx=2 * vpp_session.required_min_rx,
2625             desired_min_tx=3 * vpp_session.desired_min_tx,
2626             detect_mult=4 * vpp_session.detect_mult)
2627         self.cli_verify_no_response(
2628             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2629             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2630             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2631              mod_session.desired_min_tx,
2632              mod_session.required_min_rx, mod_session.detect_mult))
2633         verify_bfd_session_config(self, mod_session)
2634         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2635             "peer-addr %s" % (self.pg0.name,
2636                               self.pg0.local_ip6, self.pg0.remote_ip6)
2637         self.cli_verify_no_response(cli_del_cmd)
2638         # 2nd del is expected to fail
2639         self.cli_verify_response(
2640             cli_del_cmd,
2641             "bfd udp session del: `bfd_udp_del_session' API call"
2642             " failed, rv=-102:No such BFD object")
2643         self.assertFalse(vpp_session.query_vpp_config())
2644
2645     def test_auth_on_off(self):
2646         """ turn authentication on and off """
2647         key = self.factory.create_random_key(
2648             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2649         key.add_vpp_config()
2650         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2651         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2652                                         sha1_key=key)
2653         session.add_vpp_config()
2654         cli_activate = \
2655             "bfd udp session auth activate interface %s local-addr %s "\
2656             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2657             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2658                key.conf_key_id, auth_session.bfd_key_id)
2659         self.cli_verify_no_response(cli_activate)
2660         verify_bfd_session_config(self, auth_session)
2661         self.cli_verify_no_response(cli_activate)
2662         verify_bfd_session_config(self, auth_session)
2663         cli_deactivate = \
2664             "bfd udp session auth deactivate interface %s local-addr %s "\
2665             "peer-addr %s "\
2666             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2667         self.cli_verify_no_response(cli_deactivate)
2668         verify_bfd_session_config(self, session)
2669         self.cli_verify_no_response(cli_deactivate)
2670         verify_bfd_session_config(self, session)
2671
2672     def test_auth_on_off_delayed(self):
2673         """ turn authentication on and off (delayed) """
2674         key = self.factory.create_random_key(
2675             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2676         key.add_vpp_config()
2677         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2678         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2679                                         sha1_key=key)
2680         session.add_vpp_config()
2681         cli_activate = \
2682             "bfd udp session auth activate interface %s local-addr %s "\
2683             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2684             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2685                key.conf_key_id, auth_session.bfd_key_id)
2686         self.cli_verify_no_response(cli_activate)
2687         verify_bfd_session_config(self, auth_session)
2688         self.cli_verify_no_response(cli_activate)
2689         verify_bfd_session_config(self, auth_session)
2690         cli_deactivate = \
2691             "bfd udp session auth deactivate interface %s local-addr %s "\
2692             "peer-addr %s delayed yes"\
2693             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2694         self.cli_verify_no_response(cli_deactivate)
2695         verify_bfd_session_config(self, session)
2696         self.cli_verify_no_response(cli_deactivate)
2697         verify_bfd_session_config(self, session)
2698
2699     def test_admin_up_down(self):
2700         """ put session admin-up and admin-down """
2701         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2702         session.add_vpp_config()
2703         cli_down = \
2704             "bfd udp session set-flags admin down interface %s local-addr %s "\
2705             "peer-addr %s "\
2706             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2707         cli_up = \
2708             "bfd udp session set-flags admin up interface %s local-addr %s "\
2709             "peer-addr %s "\
2710             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2711         self.cli_verify_no_response(cli_down)
2712         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2713         self.cli_verify_no_response(cli_up)
2714         verify_bfd_session_config(self, session, state=BFDState.down)
2715
2716     def test_set_del_udp_echo_source(self):
2717         """ set/del udp echo source """
2718         self.create_loopback_interfaces(1)
2719         self.loopback0 = self.lo_interfaces[0]
2720         self.loopback0.admin_up()
2721         self.cli_verify_response("show bfd echo-source",
2722                                  "UDP echo source is not set.")
2723         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2724         self.cli_verify_no_response(cli_set)
2725         self.cli_verify_response("show bfd echo-source",
2726                                  "UDP echo source is: %s\n"
2727                                  "IPv4 address usable as echo source: none\n"
2728                                  "IPv6 address usable as echo source: none" %
2729                                  self.loopback0.name)
2730         self.loopback0.config_ip4()
2731         unpacked = unpack("!L", self.loopback0.local_ip4n)
2732         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2733         self.cli_verify_response("show bfd echo-source",
2734                                  "UDP echo source is: %s\n"
2735                                  "IPv4 address usable as echo source: %s\n"
2736                                  "IPv6 address usable as echo source: none" %
2737                                  (self.loopback0.name, echo_ip4))
2738         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2739         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2740                                             unpacked[2], unpacked[3] ^ 1))
2741         self.loopback0.config_ip6()
2742         self.cli_verify_response("show bfd echo-source",
2743                                  "UDP echo source is: %s\n"
2744                                  "IPv4 address usable as echo source: %s\n"
2745                                  "IPv6 address usable as echo source: %s" %
2746                                  (self.loopback0.name, echo_ip4, echo_ip6))
2747         cli_del = "bfd udp echo-source del"
2748         self.cli_verify_no_response(cli_del)
2749         self.cli_verify_response("show bfd echo-source",
2750                                  "UDP echo source is not set.")
2751
2752
2753 if __name__ == '__main__':
2754     unittest.main(testRunner=VppTestRunner)