vppinfra: improve clocks_per_second convergence
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
29     CliFailedCommandError
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
33
34 USEC_IN_SEC = 1000000
35
36
37 class AuthKeyFactory(object):
38     """Factory class for creating auth keys with unique conf key ID"""
39
40     def __init__(self):
41         self._conf_key_ids = {}
42
43     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44         """ create a random key with unique conf key id """
45         conf_key_id = randint(0, 0xFFFFFFFF)
46         while conf_key_id in self._conf_key_ids:
47             conf_key_id = randint(0, 0xFFFFFFFF)
48         self._conf_key_ids[conf_key_id] = 1
49         key = scapy.compat.raw(
50             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51         return VppBFDAuthKey(test=test, auth_type=auth_type,
52                              conf_key_id=conf_key_id, key=key)
53
54
55 class BFDAPITestCase(VppTestCase):
56     """Bidirectional Forwarding Detection (BFD) - API"""
57
58     pg0 = None
59     pg1 = None
60
61     @classmethod
62     def setUpClass(cls):
63         super(BFDAPITestCase, cls).setUpClass()
64         cls.vapi.cli("set log class bfd level debug")
65         try:
66             cls.create_pg_interfaces(range(2))
67             for i in cls.pg_interfaces:
68                 i.config_ip4()
69                 i.config_ip6()
70                 i.resolve_arp()
71
72         except Exception:
73             super(BFDAPITestCase, cls).tearDownClass()
74             raise
75
76     @classmethod
77     def tearDownClass(cls):
78         super(BFDAPITestCase, cls).tearDownClass()
79
80     def setUp(self):
81         super(BFDAPITestCase, self).setUp()
82         self.factory = AuthKeyFactory()
83
84     def test_add_bfd(self):
85         """ create a BFD session """
86         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
87         session.add_vpp_config()
88         self.logger.debug("Session state is %s", session.state)
89         session.remove_vpp_config()
90         session.add_vpp_config()
91         self.logger.debug("Session state is %s", session.state)
92         session.remove_vpp_config()
93
94     def test_double_add(self):
95         """ create the same BFD session twice (negative case) """
96         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
97         session.add_vpp_config()
98
99         with self.vapi.assert_negative_api_retval():
100             session.add_vpp_config()
101
102         session.remove_vpp_config()
103
104     def test_add_bfd6(self):
105         """ create IPv6 BFD session """
106         session = VppBFDUDPSession(
107             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
108         session.add_vpp_config()
109         self.logger.debug("Session state is %s", session.state)
110         session.remove_vpp_config()
111         session.add_vpp_config()
112         self.logger.debug("Session state is %s", session.state)
113         session.remove_vpp_config()
114
115     def test_mod_bfd(self):
116         """ modify BFD session parameters """
117         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
118                                    desired_min_tx=50000,
119                                    required_min_rx=10000,
120                                    detect_mult=1)
121         session.add_vpp_config()
122         s = session.get_bfd_udp_session_dump_entry()
123         self.assert_equal(session.desired_min_tx,
124                           s.desired_min_tx,
125                           "desired min transmit interval")
126         self.assert_equal(session.required_min_rx,
127                           s.required_min_rx,
128                           "required min receive interval")
129         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
131                                   required_min_rx=session.required_min_rx * 2,
132                                   detect_mult=session.detect_mult * 2)
133         s = session.get_bfd_udp_session_dump_entry()
134         self.assert_equal(session.desired_min_tx,
135                           s.desired_min_tx,
136                           "desired min transmit interval")
137         self.assert_equal(session.required_min_rx,
138                           s.required_min_rx,
139                           "required min receive interval")
140         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
141
142     def test_add_sha1_keys(self):
143         """ add SHA1 keys """
144         key_count = 10
145         keys = [self.factory.create_random_key(
146             self) for i in range(0, key_count)]
147         for key in keys:
148             self.assertFalse(key.query_vpp_config())
149         for key in keys:
150             key.add_vpp_config()
151         for key in keys:
152             self.assertTrue(key.query_vpp_config())
153         # remove randomly
154         indexes = list(range(key_count))
155         shuffle(indexes)
156         removed = []
157         for i in indexes:
158             key = keys[i]
159             key.remove_vpp_config()
160             removed.append(i)
161             for j in range(key_count):
162                 key = keys[j]
163                 if j in removed:
164                     self.assertFalse(key.query_vpp_config())
165                 else:
166                     self.assertTrue(key.query_vpp_config())
167         # should be removed now
168         for key in keys:
169             self.assertFalse(key.query_vpp_config())
170         # add back and remove again
171         for key in keys:
172             key.add_vpp_config()
173         for key in keys:
174             self.assertTrue(key.query_vpp_config())
175         for key in keys:
176             key.remove_vpp_config()
177         for key in keys:
178             self.assertFalse(key.query_vpp_config())
179
180     def test_add_bfd_sha1(self):
181         """ create a BFD session (SHA1) """
182         key = self.factory.create_random_key(self)
183         key.add_vpp_config()
184         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185                                    sha1_key=key)
186         session.add_vpp_config()
187         self.logger.debug("Session state is %s", session.state)
188         session.remove_vpp_config()
189         session.add_vpp_config()
190         self.logger.debug("Session state is %s", session.state)
191         session.remove_vpp_config()
192
193     def test_double_add_sha1(self):
194         """ create the same BFD session twice (negative case) (SHA1) """
195         key = self.factory.create_random_key(self)
196         key.add_vpp_config()
197         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
198                                    sha1_key=key)
199         session.add_vpp_config()
200         with self.assertRaises(Exception):
201             session.add_vpp_config()
202
203     def test_add_auth_nonexistent_key(self):
204         """ create BFD session using non-existent SHA1 (negative case) """
205         session = VppBFDUDPSession(
206             self, self.pg0, self.pg0.remote_ip4,
207             sha1_key=self.factory.create_random_key(self))
208         with self.assertRaises(Exception):
209             session.add_vpp_config()
210
211     def test_shared_sha1_key(self):
212         """ share single SHA1 key between multiple BFD sessions """
213         key = self.factory.create_random_key(self)
214         key.add_vpp_config()
215         sessions = [
216             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217                              sha1_key=key),
218             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
219                              sha1_key=key, af=AF_INET6),
220             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
221                              sha1_key=key),
222             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
223                              sha1_key=key, af=AF_INET6)]
224         for s in sessions:
225             s.add_vpp_config()
226         removed = 0
227         for s in sessions:
228             e = key.get_bfd_auth_keys_dump_entry()
229             self.assert_equal(e.use_count, len(sessions) - removed,
230                               "Use count for shared key")
231             s.remove_vpp_config()
232             removed += 1
233         e = key.get_bfd_auth_keys_dump_entry()
234         self.assert_equal(e.use_count, len(sessions) - removed,
235                           "Use count for shared key")
236
237     def test_activate_auth(self):
238         """ activate SHA1 authentication """
239         key = self.factory.create_random_key(self)
240         key.add_vpp_config()
241         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242         session.add_vpp_config()
243         session.activate_auth(key)
244
245     def test_deactivate_auth(self):
246         """ deactivate SHA1 authentication """
247         key = self.factory.create_random_key(self)
248         key.add_vpp_config()
249         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
250         session.add_vpp_config()
251         session.activate_auth(key)
252         session.deactivate_auth()
253
254     def test_change_key(self):
255         """ change SHA1 key """
256         key1 = self.factory.create_random_key(self)
257         key2 = self.factory.create_random_key(self)
258         while key2.conf_key_id == key1.conf_key_id:
259             key2 = self.factory.create_random_key(self)
260         key1.add_vpp_config()
261         key2.add_vpp_config()
262         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
263                                    sha1_key=key1)
264         session.add_vpp_config()
265         session.activate_auth(key2)
266
267     def test_set_del_udp_echo_source(self):
268         """ set/del udp echo source """
269         self.create_loopback_interfaces(1)
270         self.loopback0 = self.lo_interfaces[0]
271         self.loopback0.admin_up()
272         echo_source = self.vapi.bfd_udp_get_echo_source()
273         self.assertFalse(echo_source.is_set)
274         self.assertFalse(echo_source.have_usable_ip4)
275         self.assertFalse(echo_source.have_usable_ip6)
276
277         self.vapi.bfd_udp_set_echo_source(
278             sw_if_index=self.loopback0.sw_if_index)
279         echo_source = self.vapi.bfd_udp_get_echo_source()
280         self.assertTrue(echo_source.is_set)
281         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282         self.assertFalse(echo_source.have_usable_ip4)
283         self.assertFalse(echo_source.have_usable_ip6)
284
285         self.loopback0.config_ip4()
286         unpacked = unpack("!L", self.loopback0.local_ip4n)
287         echo_ip4 = pack("!L", unpacked[0] ^ 1)
288         echo_source = self.vapi.bfd_udp_get_echo_source()
289         self.assertTrue(echo_source.is_set)
290         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
291         self.assertTrue(echo_source.have_usable_ip4)
292         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
293         self.assertFalse(echo_source.have_usable_ip6)
294
295         self.loopback0.config_ip6()
296         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
297         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
298                         unpacked[3] ^ 1)
299         echo_source = self.vapi.bfd_udp_get_echo_source()
300         self.assertTrue(echo_source.is_set)
301         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
302         self.assertTrue(echo_source.have_usable_ip4)
303         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
304         self.assertTrue(echo_source.have_usable_ip6)
305         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
306
307         self.vapi.bfd_udp_del_echo_source()
308         echo_source = self.vapi.bfd_udp_get_echo_source()
309         self.assertFalse(echo_source.is_set)
310         self.assertFalse(echo_source.have_usable_ip4)
311         self.assertFalse(echo_source.have_usable_ip6)
312
313
314 class BFDTestSession(object):
315     """ BFD session as seen from test framework side """
316
317     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
318                  bfd_key_id=None, our_seq_number=None,
319                  tunnel_header=None, phy_interface=None):
320         self.test = test
321         self.af = af
322         self.sha1_key = sha1_key
323         self.bfd_key_id = bfd_key_id
324         self.interface = interface
325         if phy_interface:
326             self.phy_interface = phy_interface
327         else:
328             self.phy_interface = self.interface
329         self.udp_sport = randint(49152, 65535)
330         if our_seq_number is None:
331             self.our_seq_number = randint(0, 40000000)
332         else:
333             self.our_seq_number = our_seq_number
334         self.vpp_seq_number = None
335         self.my_discriminator = 0
336         self.desired_min_tx = 300000
337         self.required_min_rx = 300000
338         self.required_min_echo_rx = None
339         self.detect_mult = detect_mult
340         self.diag = BFDDiagCode.no_diagnostic
341         self.your_discriminator = None
342         self.state = BFDState.down
343         self.auth_type = BFDAuthType.no_auth
344         self.tunnel_header = tunnel_header
345
346     def inc_seq_num(self):
347         """ increment sequence number, wrapping if needed """
348         if self.our_seq_number == 0xFFFFFFFF:
349             self.our_seq_number = 0
350         else:
351             self.our_seq_number += 1
352
353     def update(self, my_discriminator=None, your_discriminator=None,
354                desired_min_tx=None, required_min_rx=None,
355                required_min_echo_rx=None, detect_mult=None,
356                diag=None, state=None, auth_type=None):
357         """ update BFD parameters associated with session """
358         if my_discriminator is not None:
359             self.my_discriminator = my_discriminator
360         if your_discriminator is not None:
361             self.your_discriminator = your_discriminator
362         if required_min_rx is not None:
363             self.required_min_rx = required_min_rx
364         if required_min_echo_rx is not None:
365             self.required_min_echo_rx = required_min_echo_rx
366         if desired_min_tx is not None:
367             self.desired_min_tx = desired_min_tx
368         if detect_mult is not None:
369             self.detect_mult = detect_mult
370         if diag is not None:
371             self.diag = diag
372         if state is not None:
373             self.state = state
374         if auth_type is not None:
375             self.auth_type = auth_type
376
377     def fill_packet_fields(self, packet):
378         """ set packet fields with known values in packet """
379         bfd = packet[BFD]
380         if self.my_discriminator:
381             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
382                                    self.my_discriminator)
383             bfd.my_discriminator = self.my_discriminator
384         if self.your_discriminator:
385             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
386                                    self.your_discriminator)
387             bfd.your_discriminator = self.your_discriminator
388         if self.required_min_rx:
389             self.test.logger.debug(
390                 "BFD: setting packet.required_min_rx_interval=%s",
391                 self.required_min_rx)
392             bfd.required_min_rx_interval = self.required_min_rx
393         if self.required_min_echo_rx:
394             self.test.logger.debug(
395                 "BFD: setting packet.required_min_echo_rx=%s",
396                 self.required_min_echo_rx)
397             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
398         if self.desired_min_tx:
399             self.test.logger.debug(
400                 "BFD: setting packet.desired_min_tx_interval=%s",
401                 self.desired_min_tx)
402             bfd.desired_min_tx_interval = self.desired_min_tx
403         if self.detect_mult:
404             self.test.logger.debug(
405                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
406             bfd.detect_mult = self.detect_mult
407         if self.diag:
408             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
409             bfd.diag = self.diag
410         if self.state:
411             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
412             bfd.state = self.state
413         if self.auth_type:
414             # this is used by a negative test-case
415             self.test.logger.debug("BFD: setting packet.auth_type=%s",
416                                    self.auth_type)
417             bfd.auth_type = self.auth_type
418
419     def create_packet(self):
420         """ create a BFD packet, reflecting the current state of session """
421         if self.sha1_key:
422             bfd = BFD(flags="A")
423             bfd.auth_type = self.sha1_key.auth_type
424             bfd.auth_len = BFD.sha1_auth_len
425             bfd.auth_key_id = self.bfd_key_id
426             bfd.auth_seq_num = self.our_seq_number
427             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
428         else:
429             bfd = BFD()
430         packet = Ether(src=self.phy_interface.remote_mac,
431                        dst=self.phy_interface.local_mac)
432         if self.tunnel_header:
433             packet = packet / self.tunnel_header
434         if self.af == AF_INET6:
435             packet = (packet /
436                       IPv6(src=self.interface.remote_ip6,
437                            dst=self.interface.local_ip6,
438                            hlim=255) /
439                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
440                       bfd)
441         else:
442             packet = (packet /
443                       IP(src=self.interface.remote_ip4,
444                          dst=self.interface.local_ip4,
445                          ttl=255) /
446                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
447                       bfd)
448         self.test.logger.debug("BFD: Creating packet")
449         self.fill_packet_fields(packet)
450         if self.sha1_key:
451             hash_material = scapy.compat.raw(
452                 packet[BFD])[:32] + self.sha1_key.key + \
453                 b"\0" * (20 - len(self.sha1_key.key))
454             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
455                                    hashlib.sha1(hash_material).hexdigest())
456             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
457         return packet
458
459     def send_packet(self, packet=None, interface=None):
460         """ send packet on interface, creating the packet if needed """
461         if packet is None:
462             packet = self.create_packet()
463         if interface is None:
464             interface = self.phy_interface
465         self.test.logger.debug(ppp("Sending packet:", packet))
466         interface.add_stream(packet)
467         self.test.pg_start()
468
469     def verify_sha1_auth(self, packet):
470         """ Verify correctness of authentication in BFD layer. """
471         bfd = packet[BFD]
472         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
473         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
474                                BFDAuthType)
475         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
476         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
477         if self.vpp_seq_number is None:
478             self.vpp_seq_number = bfd.auth_seq_num
479             self.test.logger.debug("Received initial sequence number: %s" %
480                                    self.vpp_seq_number)
481         else:
482             recvd_seq_num = bfd.auth_seq_num
483             self.test.logger.debug("Received followup sequence number: %s" %
484                                    recvd_seq_num)
485             if self.vpp_seq_number < 0xffffffff:
486                 if self.sha1_key.auth_type == \
487                         BFDAuthType.meticulous_keyed_sha1:
488                     self.test.assert_equal(recvd_seq_num,
489                                            self.vpp_seq_number + 1,
490                                            "BFD sequence number")
491                 else:
492                     self.test.assert_in_range(recvd_seq_num,
493                                               self.vpp_seq_number,
494                                               self.vpp_seq_number + 1,
495                                               "BFD sequence number")
496             else:
497                 if self.sha1_key.auth_type == \
498                         BFDAuthType.meticulous_keyed_sha1:
499                     self.test.assert_equal(recvd_seq_num, 0,
500                                            "BFD sequence number")
501                 else:
502                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
503                                        "BFD sequence number not one of "
504                                        "(%s, 0)" % self.vpp_seq_number)
505             self.vpp_seq_number = recvd_seq_num
506         # last 20 bytes represent the hash - so replace them with the key,
507         # pad the result with zeros and hash the result
508         hash_material = bfd.original[:-20] + self.sha1_key.key + \
509             b"\0" * (20 - len(self.sha1_key.key))
510         expected_hash = hashlib.sha1(hash_material).hexdigest()
511         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
512                                expected_hash.encode(), "Auth key hash")
513
514     def verify_bfd(self, packet):
515         """ Verify correctness of BFD layer. """
516         bfd = packet[BFD]
517         self.test.assert_equal(bfd.version, 1, "BFD version")
518         self.test.assert_equal(bfd.your_discriminator,
519                                self.my_discriminator,
520                                "BFD - your discriminator")
521         if self.sha1_key:
522             self.verify_sha1_auth(packet)
523
524
525 def bfd_session_up(test):
526     """ Bring BFD session up """
527     test.logger.info("BFD: Waiting for slow hello")
528     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
529     old_offset = None
530     if hasattr(test, 'vpp_clock_offset'):
531         old_offset = test.vpp_clock_offset
532     test.vpp_clock_offset = time.time() - float(p.time)
533     test.logger.debug("BFD: Calculated vpp clock offset: %s",
534                       test.vpp_clock_offset)
535     if old_offset:
536         test.assertAlmostEqual(
537             old_offset, test.vpp_clock_offset, delta=0.5,
538             msg="vpp clock offset not stable (new: %s, old: %s)" %
539             (test.vpp_clock_offset, old_offset))
540     test.logger.info("BFD: Sending Init")
541     test.test_session.update(my_discriminator=randint(0, 40000000),
542                              your_discriminator=p[BFD].my_discriminator,
543                              state=BFDState.init)
544     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
545             BFDAuthType.meticulous_keyed_sha1:
546         test.test_session.inc_seq_num()
547     test.test_session.send_packet()
548     test.logger.info("BFD: Waiting for event")
549     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
550     verify_event(test, e, expected_state=BFDState.up)
551     test.logger.info("BFD: Session is Up")
552     test.test_session.update(state=BFDState.up)
553     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554             BFDAuthType.meticulous_keyed_sha1:
555         test.test_session.inc_seq_num()
556     test.test_session.send_packet()
557     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
558
559
560 def bfd_session_down(test):
561     """ Bring BFD session down """
562     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563     test.test_session.update(state=BFDState.down)
564     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
565             BFDAuthType.meticulous_keyed_sha1:
566         test.test_session.inc_seq_num()
567     test.test_session.send_packet()
568     test.logger.info("BFD: Waiting for event")
569     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
570     verify_event(test, e, expected_state=BFDState.down)
571     test.logger.info("BFD: Session is Down")
572     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
573
574
575 def verify_bfd_session_config(test, session, state=None):
576     dump = session.get_bfd_udp_session_dump_entry()
577     test.assertIsNotNone(dump)
578     # since dump is not none, we have verified that sw_if_index and addresses
579     # are valid (in get_bfd_udp_session_dump_entry)
580     if state:
581         test.assert_equal(dump.state, state, "session state")
582     test.assert_equal(dump.required_min_rx, session.required_min_rx,
583                       "required min rx interval")
584     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
585                       "desired min tx interval")
586     test.assert_equal(dump.detect_mult, session.detect_mult,
587                       "detect multiplier")
588     if session.sha1_key is None:
589         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
590     else:
591         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
592         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
593                           "bfd key id")
594         test.assert_equal(dump.conf_key_id,
595                           session.sha1_key.conf_key_id,
596                           "config key id")
597
598
599 def verify_ip(test, packet):
600     """ Verify correctness of IP layer. """
601     if test.vpp_session.af == AF_INET6:
602         ip = packet[IPv6]
603         local_ip = test.vpp_session.interface.local_ip6
604         remote_ip = test.vpp_session.interface.remote_ip6
605         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
606     else:
607         ip = packet[IP]
608         local_ip = test.vpp_session.interface.local_ip4
609         remote_ip = test.vpp_session.interface.remote_ip4
610         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
611     test.assert_equal(ip.src, local_ip, "IP source address")
612     test.assert_equal(ip.dst, remote_ip, "IP destination address")
613
614
615 def verify_udp(test, packet):
616     """ Verify correctness of UDP layer. """
617     udp = packet[UDP]
618     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
619     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
620                          "UDP source port")
621
622
623 def verify_event(test, event, expected_state):
624     """ Verify correctness of event values. """
625     e = event
626     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
627     test.assert_equal(e.sw_if_index,
628                       test.vpp_session.interface.sw_if_index,
629                       "BFD interface index")
630
631     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
632                       "Local IPv6 address")
633     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
634                       "Peer IPv6 address")
635     test.assert_equal(e.state, expected_state, BFDState)
636
637
638 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
639     """ wait for BFD packet and verify its correctness
640
641     :param timeout: how long to wait
642     :param pcap_time_min: ignore packets with pcap timestamp lower than this
643
644     :returns: tuple (packet, time spent waiting for packet)
645     """
646     test.logger.info("BFD: Waiting for BFD packet")
647     deadline = time.time() + timeout
648     counter = 0
649     while True:
650         counter += 1
651         # sanity check
652         test.assert_in_range(counter, 0, 100, "number of packets ignored")
653         time_left = deadline - time.time()
654         if time_left < 0:
655             raise CaptureTimeoutError("Packet did not arrive within timeout")
656         p = test.pg0.wait_for_packet(timeout=time_left)
657         test.logger.debug(ppp("BFD: Got packet:", p))
658         if pcap_time_min is not None and p.time < pcap_time_min:
659             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
660                                   "pcap time min %s):" %
661                                   (p.time, pcap_time_min), p))
662         else:
663             break
664     if is_tunnel:
665         # strip an IP layer and move to the next
666         p = p[IP].payload
667
668     bfd = p[BFD]
669     if bfd is None:
670         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
671     if bfd.payload:
672         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
673     verify_ip(test, p)
674     verify_udp(test, p)
675     test.test_session.verify_bfd(p)
676     return p
677
678
679 class BFD4TestCase(VppTestCase):
680     """Bidirectional Forwarding Detection (BFD)"""
681
682     pg0 = None
683     vpp_clock_offset = None
684     vpp_session = None
685     test_session = None
686
687     @classmethod
688     def setUpClass(cls):
689         super(BFD4TestCase, cls).setUpClass()
690         cls.vapi.cli("set log class bfd level debug")
691         try:
692             cls.create_pg_interfaces([0])
693             cls.create_loopback_interfaces(1)
694             cls.loopback0 = cls.lo_interfaces[0]
695             cls.loopback0.config_ip4()
696             cls.loopback0.admin_up()
697             cls.pg0.config_ip4()
698             cls.pg0.configure_ipv4_neighbors()
699             cls.pg0.admin_up()
700             cls.pg0.resolve_arp()
701
702         except Exception:
703             super(BFD4TestCase, cls).tearDownClass()
704             raise
705
706     @classmethod
707     def tearDownClass(cls):
708         super(BFD4TestCase, cls).tearDownClass()
709
710     def setUp(self):
711         super(BFD4TestCase, self).setUp()
712         self.factory = AuthKeyFactory()
713         self.vapi.want_bfd_events()
714         self.pg0.enable_capture()
715         try:
716             self.vpp_session = VppBFDUDPSession(self, self.pg0,
717                                                 self.pg0.remote_ip4)
718             self.vpp_session.add_vpp_config()
719             self.vpp_session.admin_up()
720             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
721         except BaseException:
722             self.vapi.want_bfd_events(enable_disable=0)
723             raise
724
725     def tearDown(self):
726         if not self.vpp_dead:
727             self.vapi.want_bfd_events(enable_disable=0)
728         self.vapi.collect_events()  # clear the event queue
729         super(BFD4TestCase, self).tearDown()
730
731     def test_session_up(self):
732         """ bring BFD session up """
733         bfd_session_up(self)
734
735     def test_session_up_by_ip(self):
736         """ bring BFD session up - first frame looked up by address pair """
737         self.logger.info("BFD: Sending Slow control frame")
738         self.test_session.update(my_discriminator=randint(0, 40000000))
739         self.test_session.send_packet()
740         self.pg0.enable_capture()
741         p = self.pg0.wait_for_packet(1)
742         self.assert_equal(p[BFD].your_discriminator,
743                           self.test_session.my_discriminator,
744                           "BFD - your discriminator")
745         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
746         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
747                                  state=BFDState.up)
748         self.logger.info("BFD: Waiting for event")
749         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750         verify_event(self, e, expected_state=BFDState.init)
751         self.logger.info("BFD: Sending Up")
752         self.test_session.send_packet()
753         self.logger.info("BFD: Waiting for event")
754         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755         verify_event(self, e, expected_state=BFDState.up)
756         self.logger.info("BFD: Session is Up")
757         self.test_session.update(state=BFDState.up)
758         self.test_session.send_packet()
759         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
760
761     def test_session_down(self):
762         """ bring BFD session down """
763         bfd_session_up(self)
764         bfd_session_down(self)
765
766     def test_hold_up(self):
767         """ hold BFD session up """
768         bfd_session_up(self)
769         for dummy in range(self.test_session.detect_mult * 2):
770             wait_for_bfd_packet(self)
771             self.test_session.send_packet()
772         self.assert_equal(len(self.vapi.collect_events()), 0,
773                           "number of bfd events")
774
775     def test_slow_timer(self):
776         """ verify slow periodic control frames while session down """
777         packet_count = 3
778         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
779         prev_packet = wait_for_bfd_packet(self, 2)
780         for dummy in range(packet_count):
781             next_packet = wait_for_bfd_packet(self, 2)
782             time_diff = next_packet.time - prev_packet.time
783             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
784             # to work around timing issues
785             self.assert_in_range(
786                 time_diff, 0.70, 1.05, "time between slow packets")
787             prev_packet = next_packet
788
789     def test_zero_remote_min_rx(self):
790         """ no packets when zero remote required min rx interval """
791         bfd_session_up(self)
792         self.test_session.update(required_min_rx=0)
793         self.test_session.send_packet()
794         for dummy in range(self.test_session.detect_mult):
795             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
796                        "sleep before transmitting bfd packet")
797             self.test_session.send_packet()
798             try:
799                 p = wait_for_bfd_packet(self, timeout=0)
800                 self.logger.error(ppp("Received unexpected packet:", p))
801             except CaptureTimeoutError:
802                 pass
803         self.assert_equal(
804             len(self.vapi.collect_events()), 0, "number of bfd events")
805         self.test_session.update(required_min_rx=300000)
806         for dummy in range(3):
807             self.test_session.send_packet()
808             wait_for_bfd_packet(
809                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
810         self.assert_equal(
811             len(self.vapi.collect_events()), 0, "number of bfd events")
812
813     def test_conn_down(self):
814         """ verify session goes down after inactivity """
815         bfd_session_up(self)
816         detection_time = self.test_session.detect_mult *\
817             self.vpp_session.required_min_rx / USEC_IN_SEC
818         self.sleep(detection_time, "waiting for BFD session time-out")
819         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
820         verify_event(self, e, expected_state=BFDState.down)
821
822     def test_peer_discr_reset_sess_down(self):
823         """ peer discriminator reset after session goes down """
824         bfd_session_up(self)
825         detection_time = self.test_session.detect_mult *\
826             self.vpp_session.required_min_rx / USEC_IN_SEC
827         self.sleep(detection_time, "waiting for BFD session time-out")
828         self.test_session.my_discriminator = 0
829         wait_for_bfd_packet(self,
830                             pcap_time_min=time.time() - self.vpp_clock_offset)
831
832     def test_large_required_min_rx(self):
833         """ large remote required min rx interval """
834         bfd_session_up(self)
835         p = wait_for_bfd_packet(self)
836         interval = 3000000
837         self.test_session.update(required_min_rx=interval)
838         self.test_session.send_packet()
839         time_mark = time.time()
840         count = 0
841         # busy wait here, trying to collect a packet or event, vpp is not
842         # allowed to send packets and the session will timeout first - so the
843         # Up->Down event must arrive before any packets do
844         while time.time() < time_mark + interval / USEC_IN_SEC:
845             try:
846                 p = wait_for_bfd_packet(self, timeout=0)
847                 # if vpp managed to send a packet before we did the session
848                 # session update, then that's fine, ignore it
849                 if p.time < time_mark - self.vpp_clock_offset:
850                     continue
851                 self.logger.error(ppp("Received unexpected packet:", p))
852                 count += 1
853             except CaptureTimeoutError:
854                 pass
855             events = self.vapi.collect_events()
856             if len(events) > 0:
857                 verify_event(self, events[0], BFDState.down)
858                 break
859         self.assert_equal(count, 0, "number of packets received")
860
861     def test_immediate_remote_min_rx_reduction(self):
862         """ immediately honor remote required min rx reduction """
863         self.vpp_session.remove_vpp_config()
864         self.vpp_session = VppBFDUDPSession(
865             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
866         self.pg0.enable_capture()
867         self.vpp_session.add_vpp_config()
868         self.test_session.update(desired_min_tx=1000000,
869                                  required_min_rx=1000000)
870         bfd_session_up(self)
871         reference_packet = wait_for_bfd_packet(self)
872         time_mark = time.time()
873         interval = 300000
874         self.test_session.update(required_min_rx=interval)
875         self.test_session.send_packet()
876         extra_time = time.time() - time_mark
877         p = wait_for_bfd_packet(self)
878         # first packet is allowed to be late by time we spent doing the update
879         # calculated in extra_time
880         self.assert_in_range(p.time - reference_packet.time,
881                              .95 * 0.75 * interval / USEC_IN_SEC,
882                              1.05 * interval / USEC_IN_SEC + extra_time,
883                              "time between BFD packets")
884         reference_packet = p
885         for dummy in range(3):
886             p = wait_for_bfd_packet(self)
887             diff = p.time - reference_packet.time
888             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
889                                  1.05 * interval / USEC_IN_SEC,
890                                  "time between BFD packets")
891             reference_packet = p
892
893     def test_modify_req_min_rx_double(self):
894         """ modify session - double required min rx """
895         bfd_session_up(self)
896         p = wait_for_bfd_packet(self)
897         self.test_session.update(desired_min_tx=10000,
898                                  required_min_rx=10000)
899         self.test_session.send_packet()
900         # double required min rx
901         self.vpp_session.modify_parameters(
902             required_min_rx=2 * self.vpp_session.required_min_rx)
903         p = wait_for_bfd_packet(
904             self, pcap_time_min=time.time() - self.vpp_clock_offset)
905         # poll bit needs to be set
906         self.assertIn("P", p.sprintf("%BFD.flags%"),
907                       "Poll bit not set in BFD packet")
908         # finish poll sequence with final packet
909         final = self.test_session.create_packet()
910         final[BFD].flags = "F"
911         timeout = self.test_session.detect_mult * \
912             max(self.test_session.desired_min_tx,
913                 self.vpp_session.required_min_rx) / USEC_IN_SEC
914         self.test_session.send_packet(final)
915         time_mark = time.time()
916         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
917         verify_event(self, e, expected_state=BFDState.down)
918         time_to_event = time.time() - time_mark
919         self.assert_in_range(time_to_event, .9 * timeout,
920                              1.1 * timeout, "session timeout")
921
922     def test_modify_req_min_rx_halve(self):
923         """ modify session - halve required min rx """
924         self.vpp_session.modify_parameters(
925             required_min_rx=2 * self.vpp_session.required_min_rx)
926         bfd_session_up(self)
927         p = wait_for_bfd_packet(self)
928         self.test_session.update(desired_min_tx=10000,
929                                  required_min_rx=10000)
930         self.test_session.send_packet()
931         p = wait_for_bfd_packet(
932             self, pcap_time_min=time.time() - self.vpp_clock_offset)
933         # halve required min rx
934         old_required_min_rx = self.vpp_session.required_min_rx
935         self.vpp_session.modify_parameters(
936             required_min_rx=self.vpp_session.required_min_rx // 2)
937         # now we wait 0.8*3*old-req-min-rx and the session should still be up
938         self.sleep(0.8 * self.vpp_session.detect_mult *
939                    old_required_min_rx / USEC_IN_SEC,
940                    "wait before finishing poll sequence")
941         self.assert_equal(len(self.vapi.collect_events()), 0,
942                           "number of bfd events")
943         p = wait_for_bfd_packet(self)
944         # poll bit needs to be set
945         self.assertIn("P", p.sprintf("%BFD.flags%"),
946                       "Poll bit not set in BFD packet")
947         # finish poll sequence with final packet
948         final = self.test_session.create_packet()
949         final[BFD].flags = "F"
950         self.test_session.send_packet(final)
951         # now the session should time out under new conditions
952         detection_time = self.test_session.detect_mult *\
953             self.vpp_session.required_min_rx / USEC_IN_SEC
954         before = time.time()
955         e = self.vapi.wait_for_event(
956             2 * detection_time, "bfd_udp_session_details")
957         after = time.time()
958         self.assert_in_range(after - before,
959                              0.9 * detection_time,
960                              1.1 * detection_time,
961                              "time before bfd session goes down")
962         verify_event(self, e, expected_state=BFDState.down)
963
964     def test_modify_detect_mult(self):
965         """ modify detect multiplier """
966         bfd_session_up(self)
967         p = wait_for_bfd_packet(self)
968         self.vpp_session.modify_parameters(detect_mult=1)
969         p = wait_for_bfd_packet(
970             self, pcap_time_min=time.time() - self.vpp_clock_offset)
971         self.assert_equal(self.vpp_session.detect_mult,
972                           p[BFD].detect_mult,
973                           "detect mult")
974         # poll bit must not be set
975         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
976                          "Poll bit not set in BFD packet")
977         self.vpp_session.modify_parameters(detect_mult=10)
978         p = wait_for_bfd_packet(
979             self, pcap_time_min=time.time() - self.vpp_clock_offset)
980         self.assert_equal(self.vpp_session.detect_mult,
981                           p[BFD].detect_mult,
982                           "detect mult")
983         # poll bit must not be set
984         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
985                          "Poll bit not set in BFD packet")
986
987     def test_queued_poll(self):
988         """ test poll sequence queueing """
989         bfd_session_up(self)
990         p = wait_for_bfd_packet(self)
991         self.vpp_session.modify_parameters(
992             required_min_rx=2 * self.vpp_session.required_min_rx)
993         p = wait_for_bfd_packet(self)
994         poll_sequence_start = time.time()
995         poll_sequence_length_min = 0.5
996         send_final_after = time.time() + poll_sequence_length_min
997         # poll bit needs to be set
998         self.assertIn("P", p.sprintf("%BFD.flags%"),
999                       "Poll bit not set in BFD packet")
1000         self.assert_equal(p[BFD].required_min_rx_interval,
1001                           self.vpp_session.required_min_rx,
1002                           "BFD required min rx interval")
1003         self.vpp_session.modify_parameters(
1004             required_min_rx=2 * self.vpp_session.required_min_rx)
1005         # 2nd poll sequence should be queued now
1006         # don't send the reply back yet, wait for some time to emulate
1007         # longer round-trip time
1008         packet_count = 0
1009         while time.time() < send_final_after:
1010             self.test_session.send_packet()
1011             p = wait_for_bfd_packet(self)
1012             self.assert_equal(len(self.vapi.collect_events()), 0,
1013                               "number of bfd events")
1014             self.assert_equal(p[BFD].required_min_rx_interval,
1015                               self.vpp_session.required_min_rx,
1016                               "BFD required min rx interval")
1017             packet_count += 1
1018             # poll bit must be set
1019             self.assertIn("P", p.sprintf("%BFD.flags%"),
1020                           "Poll bit not set in BFD packet")
1021         final = self.test_session.create_packet()
1022         final[BFD].flags = "F"
1023         self.test_session.send_packet(final)
1024         # finish 1st with final
1025         poll_sequence_length = time.time() - poll_sequence_start
1026         # vpp must wait for some time before starting new poll sequence
1027         poll_no_2_started = False
1028         for dummy in range(2 * packet_count):
1029             p = wait_for_bfd_packet(self)
1030             self.assert_equal(len(self.vapi.collect_events()), 0,
1031                               "number of bfd events")
1032             if "P" in p.sprintf("%BFD.flags%"):
1033                 poll_no_2_started = True
1034                 if time.time() < poll_sequence_start + poll_sequence_length:
1035                     raise Exception("VPP started 2nd poll sequence too soon")
1036                 final = self.test_session.create_packet()
1037                 final[BFD].flags = "F"
1038                 self.test_session.send_packet(final)
1039                 break
1040             else:
1041                 self.test_session.send_packet()
1042         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1043         # finish 2nd with final
1044         final = self.test_session.create_packet()
1045         final[BFD].flags = "F"
1046         self.test_session.send_packet(final)
1047         p = wait_for_bfd_packet(self)
1048         # poll bit must not be set
1049         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1050                          "Poll bit set in BFD packet")
1051
1052     # returning inconsistent results requiring retries in per-patch tests
1053     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1054     def test_poll_response(self):
1055         """ test correct response to control frame with poll bit set """
1056         bfd_session_up(self)
1057         poll = self.test_session.create_packet()
1058         poll[BFD].flags = "P"
1059         self.test_session.send_packet(poll)
1060         final = wait_for_bfd_packet(
1061             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1062         self.assertIn("F", final.sprintf("%BFD.flags%"))
1063
1064     def test_no_periodic_if_remote_demand(self):
1065         """ no periodic frames outside poll sequence if remote demand set """
1066         bfd_session_up(self)
1067         demand = self.test_session.create_packet()
1068         demand[BFD].flags = "D"
1069         self.test_session.send_packet(demand)
1070         transmit_time = 0.9 \
1071             * max(self.vpp_session.required_min_rx,
1072                   self.test_session.desired_min_tx) \
1073             / USEC_IN_SEC
1074         count = 0
1075         for dummy in range(self.test_session.detect_mult * 2):
1076             self.sleep(transmit_time)
1077             self.test_session.send_packet(demand)
1078             try:
1079                 p = wait_for_bfd_packet(self, timeout=0)
1080                 self.logger.error(ppp("Received unexpected packet:", p))
1081                 count += 1
1082             except CaptureTimeoutError:
1083                 pass
1084         events = self.vapi.collect_events()
1085         for e in events:
1086             self.logger.error("Received unexpected event: %s", e)
1087         self.assert_equal(count, 0, "number of packets received")
1088         self.assert_equal(len(events), 0, "number of events received")
1089
1090     def test_echo_looped_back(self):
1091         """ echo packets looped back """
1092         # don't need a session in this case..
1093         self.vpp_session.remove_vpp_config()
1094         self.pg0.enable_capture()
1095         echo_packet_count = 10
1096         # random source port low enough to increment a few times..
1097         udp_sport_tx = randint(1, 50000)
1098         udp_sport_rx = udp_sport_tx
1099         echo_packet = (Ether(src=self.pg0.remote_mac,
1100                              dst=self.pg0.local_mac) /
1101                        IP(src=self.pg0.remote_ip4,
1102                           dst=self.pg0.remote_ip4) /
1103                        UDP(dport=BFD.udp_dport_echo) /
1104                        Raw("this should be looped back"))
1105         for dummy in range(echo_packet_count):
1106             self.sleep(.01, "delay between echo packets")
1107             echo_packet[UDP].sport = udp_sport_tx
1108             udp_sport_tx += 1
1109             self.logger.debug(ppp("Sending packet:", echo_packet))
1110             self.pg0.add_stream(echo_packet)
1111             self.pg_start()
1112         for dummy in range(echo_packet_count):
1113             p = self.pg0.wait_for_packet(1)
1114             self.logger.debug(ppp("Got packet:", p))
1115             ether = p[Ether]
1116             self.assert_equal(self.pg0.remote_mac,
1117                               ether.dst, "Destination MAC")
1118             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1119             ip = p[IP]
1120             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1121             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1122             udp = p[UDP]
1123             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1124                               "UDP destination port")
1125             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1126             udp_sport_rx += 1
1127             # need to compare the hex payload here, otherwise BFD_vpp_echo
1128             # gets in way
1129             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1130                              scapy.compat.raw(echo_packet[UDP].payload),
1131                              "Received packet is not the echo packet sent")
1132         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1133                           "ECHO packet identifier for test purposes)")
1134
1135     @unittest.skip("Test fails sporadically, BFD rework required to fix it")
1136     def test_echo(self):
1137         """ echo function """
1138         bfd_session_up(self)
1139         self.test_session.update(required_min_echo_rx=150000)
1140         self.test_session.send_packet()
1141         detection_time = self.test_session.detect_mult *\
1142             self.vpp_session.required_min_rx / USEC_IN_SEC
1143         # echo shouldn't work without echo source set
1144         for dummy in range(10):
1145             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1146             self.sleep(sleep, "delay before sending bfd packet")
1147             self.test_session.send_packet()
1148         p = wait_for_bfd_packet(
1149             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1150         self.assert_equal(p[BFD].required_min_rx_interval,
1151                           self.vpp_session.required_min_rx,
1152                           "BFD required min rx interval")
1153         self.test_session.send_packet()
1154         self.vapi.bfd_udp_set_echo_source(
1155             sw_if_index=self.loopback0.sw_if_index)
1156         echo_seen = False
1157         # should be turned on - loopback echo packets
1158         for dummy in range(3):
1159             loop_until = time.time() + 0.75 * detection_time
1160             while time.time() < loop_until:
1161                 p = self.pg0.wait_for_packet(1)
1162                 self.logger.debug(ppp("Got packet:", p))
1163                 if p[UDP].dport == BFD.udp_dport_echo:
1164                     self.assert_equal(
1165                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1166                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1167                                         "BFD ECHO src IP equal to loopback IP")
1168                     self.logger.debug(ppp("Looping back packet:", p))
1169                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1170                                       "ECHO packet destination MAC address")
1171                     p[Ether].dst = self.pg0.local_mac
1172                     self.pg0.add_stream(p)
1173                     self.pg_start()
1174                     echo_seen = True
1175                 elif p.haslayer(BFD):
1176                     if echo_seen:
1177                         self.assertGreaterEqual(
1178                             p[BFD].required_min_rx_interval,
1179                             1000000)
1180                     if "P" in p.sprintf("%BFD.flags%"):
1181                         final = self.test_session.create_packet()
1182                         final[BFD].flags = "F"
1183                         self.test_session.send_packet(final)
1184                 else:
1185                     raise Exception(ppp("Received unknown packet:", p))
1186
1187                 self.assert_equal(len(self.vapi.collect_events()), 0,
1188                                   "number of bfd events")
1189             self.test_session.send_packet()
1190         self.assertTrue(echo_seen, "No echo packets received")
1191
1192     @unittest.skip("Test fails sporadically, BFD rework required to fix it")
1193     def test_echo_fail(self):
1194         """ session goes down if echo function fails """
1195         bfd_session_up(self)
1196         self.test_session.update(required_min_echo_rx=150000)
1197         self.test_session.send_packet()
1198         detection_time = self.test_session.detect_mult *\
1199             self.vpp_session.required_min_rx / USEC_IN_SEC
1200         self.vapi.bfd_udp_set_echo_source(
1201             sw_if_index=self.loopback0.sw_if_index)
1202         # echo function should be used now, but we will drop the echo packets
1203         verified_diag = False
1204         for dummy in range(3):
1205             loop_until = time.time() + 0.75 * detection_time
1206             while time.time() < loop_until:
1207                 p = self.pg0.wait_for_packet(1)
1208                 self.logger.debug(ppp("Got packet:", p))
1209                 if p[UDP].dport == BFD.udp_dport_echo:
1210                     # dropped
1211                     pass
1212                 elif p.haslayer(BFD):
1213                     if "P" in p.sprintf("%BFD.flags%"):
1214                         self.assertGreaterEqual(
1215                             p[BFD].required_min_rx_interval,
1216                             1000000)
1217                         final = self.test_session.create_packet()
1218                         final[BFD].flags = "F"
1219                         self.test_session.send_packet(final)
1220                     if p[BFD].state == BFDState.down:
1221                         self.assert_equal(p[BFD].diag,
1222                                           BFDDiagCode.echo_function_failed,
1223                                           BFDDiagCode)
1224                         verified_diag = True
1225                 else:
1226                     raise Exception(ppp("Received unknown packet:", p))
1227             self.test_session.send_packet()
1228         events = self.vapi.collect_events()
1229         self.assert_equal(len(events), 1, "number of bfd events")
1230         self.assert_equal(events[0].state, BFDState.down, BFDState)
1231         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1232
1233     def test_echo_stop(self):
1234         """ echo function stops if peer sets required min echo rx zero """
1235         bfd_session_up(self)
1236         self.test_session.update(required_min_echo_rx=150000)
1237         self.test_session.send_packet()
1238         self.vapi.bfd_udp_set_echo_source(
1239             sw_if_index=self.loopback0.sw_if_index)
1240         # wait for first echo packet
1241         while True:
1242             p = self.pg0.wait_for_packet(1)
1243             self.logger.debug(ppp("Got packet:", p))
1244             if p[UDP].dport == BFD.udp_dport_echo:
1245                 self.logger.debug(ppp("Looping back packet:", p))
1246                 p[Ether].dst = self.pg0.local_mac
1247                 self.pg0.add_stream(p)
1248                 self.pg_start()
1249                 break
1250             elif p.haslayer(BFD):
1251                 # ignore BFD
1252                 pass
1253             else:
1254                 raise Exception(ppp("Received unknown packet:", p))
1255         self.test_session.update(required_min_echo_rx=0)
1256         self.test_session.send_packet()
1257         # echo packets shouldn't arrive anymore
1258         for dummy in range(5):
1259             wait_for_bfd_packet(
1260                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1261             self.test_session.send_packet()
1262             events = self.vapi.collect_events()
1263             self.assert_equal(len(events), 0, "number of bfd events")
1264
1265     def test_echo_source_removed(self):
1266         """ echo function stops if echo source is removed """
1267         bfd_session_up(self)
1268         self.test_session.update(required_min_echo_rx=150000)
1269         self.test_session.send_packet()
1270         self.vapi.bfd_udp_set_echo_source(
1271             sw_if_index=self.loopback0.sw_if_index)
1272         # wait for first echo packet
1273         while True:
1274             p = self.pg0.wait_for_packet(1)
1275             self.logger.debug(ppp("Got packet:", p))
1276             if p[UDP].dport == BFD.udp_dport_echo:
1277                 self.logger.debug(ppp("Looping back packet:", p))
1278                 p[Ether].dst = self.pg0.local_mac
1279                 self.pg0.add_stream(p)
1280                 self.pg_start()
1281                 break
1282             elif p.haslayer(BFD):
1283                 # ignore BFD
1284                 pass
1285             else:
1286                 raise Exception(ppp("Received unknown packet:", p))
1287         self.vapi.bfd_udp_del_echo_source()
1288         self.test_session.send_packet()
1289         # echo packets shouldn't arrive anymore
1290         for dummy in range(5):
1291             wait_for_bfd_packet(
1292                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1293             self.test_session.send_packet()
1294             events = self.vapi.collect_events()
1295             self.assert_equal(len(events), 0, "number of bfd events")
1296
1297     def test_stale_echo(self):
1298         """ stale echo packets don't keep a session up """
1299         bfd_session_up(self)
1300         self.test_session.update(required_min_echo_rx=150000)
1301         self.vapi.bfd_udp_set_echo_source(
1302             sw_if_index=self.loopback0.sw_if_index)
1303         self.test_session.send_packet()
1304         # should be turned on - loopback echo packets
1305         echo_packet = None
1306         timeout_at = None
1307         timeout_ok = False
1308         for dummy in range(10 * self.vpp_session.detect_mult):
1309             p = self.pg0.wait_for_packet(1)
1310             if p[UDP].dport == BFD.udp_dport_echo:
1311                 if echo_packet is None:
1312                     self.logger.debug(ppp("Got first echo packet:", p))
1313                     echo_packet = p
1314                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1315                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1316                 else:
1317                     self.logger.debug(ppp("Got followup echo packet:", p))
1318                 self.logger.debug(ppp("Looping back first echo packet:", p))
1319                 echo_packet[Ether].dst = self.pg0.local_mac
1320                 self.pg0.add_stream(echo_packet)
1321                 self.pg_start()
1322             elif p.haslayer(BFD):
1323                 self.logger.debug(ppp("Got packet:", p))
1324                 if "P" in p.sprintf("%BFD.flags%"):
1325                     final = self.test_session.create_packet()
1326                     final[BFD].flags = "F"
1327                     self.test_session.send_packet(final)
1328                 if p[BFD].state == BFDState.down:
1329                     self.assertIsNotNone(
1330                         timeout_at,
1331                         "Session went down before first echo packet received")
1332                     now = time.time()
1333                     self.assertGreaterEqual(
1334                         now, timeout_at,
1335                         "Session timeout at %s, but is expected at %s" %
1336                         (now, timeout_at))
1337                     self.assert_equal(p[BFD].diag,
1338                                       BFDDiagCode.echo_function_failed,
1339                                       BFDDiagCode)
1340                     events = self.vapi.collect_events()
1341                     self.assert_equal(len(events), 1, "number of bfd events")
1342                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1343                     timeout_ok = True
1344                     break
1345             else:
1346                 raise Exception(ppp("Received unknown packet:", p))
1347             self.test_session.send_packet()
1348         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1349
1350     def test_invalid_echo_checksum(self):
1351         """ echo packets with invalid checksum don't keep a session up """
1352         bfd_session_up(self)
1353         self.test_session.update(required_min_echo_rx=150000)
1354         self.vapi.bfd_udp_set_echo_source(
1355             sw_if_index=self.loopback0.sw_if_index)
1356         self.test_session.send_packet()
1357         # should be turned on - loopback echo packets
1358         timeout_at = None
1359         timeout_ok = False
1360         for dummy in range(10 * self.vpp_session.detect_mult):
1361             p = self.pg0.wait_for_packet(1)
1362             if p[UDP].dport == BFD.udp_dport_echo:
1363                 self.logger.debug(ppp("Got echo packet:", p))
1364                 if timeout_at is None:
1365                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1366                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1367                 p[BFD_vpp_echo].checksum = getrandbits(64)
1368                 p[Ether].dst = self.pg0.local_mac
1369                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1370                 self.pg0.add_stream(p)
1371                 self.pg_start()
1372             elif p.haslayer(BFD):
1373                 self.logger.debug(ppp("Got packet:", p))
1374                 if "P" in p.sprintf("%BFD.flags%"):
1375                     final = self.test_session.create_packet()
1376                     final[BFD].flags = "F"
1377                     self.test_session.send_packet(final)
1378                 if p[BFD].state == BFDState.down:
1379                     self.assertIsNotNone(
1380                         timeout_at,
1381                         "Session went down before first echo packet received")
1382                     now = time.time()
1383                     self.assertGreaterEqual(
1384                         now, timeout_at,
1385                         "Session timeout at %s, but is expected at %s" %
1386                         (now, timeout_at))
1387                     self.assert_equal(p[BFD].diag,
1388                                       BFDDiagCode.echo_function_failed,
1389                                       BFDDiagCode)
1390                     events = self.vapi.collect_events()
1391                     self.assert_equal(len(events), 1, "number of bfd events")
1392                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1393                     timeout_ok = True
1394                     break
1395             else:
1396                 raise Exception(ppp("Received unknown packet:", p))
1397             self.test_session.send_packet()
1398         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1399
1400     def test_admin_up_down(self):
1401         """ put session admin-up and admin-down """
1402         bfd_session_up(self)
1403         self.vpp_session.admin_down()
1404         self.pg0.enable_capture()
1405         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1406         verify_event(self, e, expected_state=BFDState.admin_down)
1407         for dummy in range(2):
1408             p = wait_for_bfd_packet(self)
1409             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1410         # try to bring session up - shouldn't be possible
1411         self.test_session.update(state=BFDState.init)
1412         self.test_session.send_packet()
1413         for dummy in range(2):
1414             p = wait_for_bfd_packet(self)
1415             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1416         self.vpp_session.admin_up()
1417         self.test_session.update(state=BFDState.down)
1418         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1419         verify_event(self, e, expected_state=BFDState.down)
1420         p = wait_for_bfd_packet(
1421             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1422         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1423         self.test_session.send_packet()
1424         p = wait_for_bfd_packet(
1425             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1426         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1427         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1428         verify_event(self, e, expected_state=BFDState.init)
1429         self.test_session.update(state=BFDState.up)
1430         self.test_session.send_packet()
1431         p = wait_for_bfd_packet(
1432             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1433         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1434         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1435         verify_event(self, e, expected_state=BFDState.up)
1436
1437     def test_config_change_remote_demand(self):
1438         """ configuration change while peer in demand mode """
1439         bfd_session_up(self)
1440         demand = self.test_session.create_packet()
1441         demand[BFD].flags = "D"
1442         self.test_session.send_packet(demand)
1443         self.vpp_session.modify_parameters(
1444             required_min_rx=2 * self.vpp_session.required_min_rx)
1445         p = wait_for_bfd_packet(
1446             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1447         # poll bit must be set
1448         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1449         # terminate poll sequence
1450         final = self.test_session.create_packet()
1451         final[BFD].flags = "D+F"
1452         self.test_session.send_packet(final)
1453         # vpp should be quiet now again
1454         transmit_time = 0.9 \
1455             * max(self.vpp_session.required_min_rx,
1456                   self.test_session.desired_min_tx) \
1457             / USEC_IN_SEC
1458         count = 0
1459         for dummy in range(self.test_session.detect_mult * 2):
1460             self.sleep(transmit_time)
1461             self.test_session.send_packet(demand)
1462             try:
1463                 p = wait_for_bfd_packet(self, timeout=0)
1464                 self.logger.error(ppp("Received unexpected packet:", p))
1465                 count += 1
1466             except CaptureTimeoutError:
1467                 pass
1468         events = self.vapi.collect_events()
1469         for e in events:
1470             self.logger.error("Received unexpected event: %s", e)
1471         self.assert_equal(count, 0, "number of packets received")
1472         self.assert_equal(len(events), 0, "number of events received")
1473
1474     def test_intf_deleted(self):
1475         """ interface with bfd session deleted """
1476         intf = VppLoInterface(self)
1477         intf.config_ip4()
1478         intf.admin_up()
1479         sw_if_index = intf.sw_if_index
1480         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1481         vpp_session.add_vpp_config()
1482         vpp_session.admin_up()
1483         intf.remove_vpp_config()
1484         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1485         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1486         self.assertFalse(vpp_session.query_vpp_config())
1487
1488
1489 class BFD6TestCase(VppTestCase):
1490     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1491
1492     pg0 = None
1493     vpp_clock_offset = None
1494     vpp_session = None
1495     test_session = None
1496
1497     @classmethod
1498     def setUpClass(cls):
1499         super(BFD6TestCase, cls).setUpClass()
1500         cls.vapi.cli("set log class bfd level debug")
1501         try:
1502             cls.create_pg_interfaces([0])
1503             cls.pg0.config_ip6()
1504             cls.pg0.configure_ipv6_neighbors()
1505             cls.pg0.admin_up()
1506             cls.pg0.resolve_ndp()
1507             cls.create_loopback_interfaces(1)
1508             cls.loopback0 = cls.lo_interfaces[0]
1509             cls.loopback0.config_ip6()
1510             cls.loopback0.admin_up()
1511
1512         except Exception:
1513             super(BFD6TestCase, cls).tearDownClass()
1514             raise
1515
1516     @classmethod
1517     def tearDownClass(cls):
1518         super(BFD6TestCase, cls).tearDownClass()
1519
1520     def setUp(self):
1521         super(BFD6TestCase, self).setUp()
1522         self.factory = AuthKeyFactory()
1523         self.vapi.want_bfd_events()
1524         self.pg0.enable_capture()
1525         try:
1526             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1527                                                 self.pg0.remote_ip6,
1528                                                 af=AF_INET6)
1529             self.vpp_session.add_vpp_config()
1530             self.vpp_session.admin_up()
1531             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1532             self.logger.debug(self.vapi.cli("show adj nbr"))
1533         except BaseException:
1534             self.vapi.want_bfd_events(enable_disable=0)
1535             raise
1536
1537     def tearDown(self):
1538         if not self.vpp_dead:
1539             self.vapi.want_bfd_events(enable_disable=0)
1540         self.vapi.collect_events()  # clear the event queue
1541         super(BFD6TestCase, self).tearDown()
1542
1543     def test_session_up(self):
1544         """ bring BFD session up """
1545         bfd_session_up(self)
1546
1547     def test_session_up_by_ip(self):
1548         """ bring BFD session up - first frame looked up by address pair """
1549         self.logger.info("BFD: Sending Slow control frame")
1550         self.test_session.update(my_discriminator=randint(0, 40000000))
1551         self.test_session.send_packet()
1552         self.pg0.enable_capture()
1553         p = self.pg0.wait_for_packet(1)
1554         self.assert_equal(p[BFD].your_discriminator,
1555                           self.test_session.my_discriminator,
1556                           "BFD - your discriminator")
1557         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1558         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1559                                  state=BFDState.up)
1560         self.logger.info("BFD: Waiting for event")
1561         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1562         verify_event(self, e, expected_state=BFDState.init)
1563         self.logger.info("BFD: Sending Up")
1564         self.test_session.send_packet()
1565         self.logger.info("BFD: Waiting for event")
1566         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1567         verify_event(self, e, expected_state=BFDState.up)
1568         self.logger.info("BFD: Session is Up")
1569         self.test_session.update(state=BFDState.up)
1570         self.test_session.send_packet()
1571         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1572
1573     def test_hold_up(self):
1574         """ hold BFD session up """
1575         bfd_session_up(self)
1576         for dummy in range(self.test_session.detect_mult * 2):
1577             wait_for_bfd_packet(self)
1578             self.test_session.send_packet()
1579         self.assert_equal(len(self.vapi.collect_events()), 0,
1580                           "number of bfd events")
1581         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1582
1583     def test_echo_looped_back(self):
1584         """ echo packets looped back """
1585         # don't need a session in this case..
1586         self.vpp_session.remove_vpp_config()
1587         self.pg0.enable_capture()
1588         echo_packet_count = 10
1589         # random source port low enough to increment a few times..
1590         udp_sport_tx = randint(1, 50000)
1591         udp_sport_rx = udp_sport_tx
1592         echo_packet = (Ether(src=self.pg0.remote_mac,
1593                              dst=self.pg0.local_mac) /
1594                        IPv6(src=self.pg0.remote_ip6,
1595                             dst=self.pg0.remote_ip6) /
1596                        UDP(dport=BFD.udp_dport_echo) /
1597                        Raw("this should be looped back"))
1598         for dummy in range(echo_packet_count):
1599             self.sleep(.01, "delay between echo packets")
1600             echo_packet[UDP].sport = udp_sport_tx
1601             udp_sport_tx += 1
1602             self.logger.debug(ppp("Sending packet:", echo_packet))
1603             self.pg0.add_stream(echo_packet)
1604             self.pg_start()
1605         for dummy in range(echo_packet_count):
1606             p = self.pg0.wait_for_packet(1)
1607             self.logger.debug(ppp("Got packet:", p))
1608             ether = p[Ether]
1609             self.assert_equal(self.pg0.remote_mac,
1610                               ether.dst, "Destination MAC")
1611             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1612             ip = p[IPv6]
1613             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1614             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1615             udp = p[UDP]
1616             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1617                               "UDP destination port")
1618             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1619             udp_sport_rx += 1
1620             # need to compare the hex payload here, otherwise BFD_vpp_echo
1621             # gets in way
1622             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1623                              scapy.compat.raw(echo_packet[UDP].payload),
1624                              "Received packet is not the echo packet sent")
1625         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1626                           "ECHO packet identifier for test purposes)")
1627         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1628                           "ECHO packet identifier for test purposes)")
1629
1630     def test_echo(self):
1631         """ echo function """
1632         bfd_session_up(self)
1633         self.test_session.update(required_min_echo_rx=150000)
1634         self.test_session.send_packet()
1635         detection_time = self.test_session.detect_mult *\
1636             self.vpp_session.required_min_rx / USEC_IN_SEC
1637         # echo shouldn't work without echo source set
1638         for dummy in range(10):
1639             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1640             self.sleep(sleep, "delay before sending bfd packet")
1641             self.test_session.send_packet()
1642         p = wait_for_bfd_packet(
1643             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1644         self.assert_equal(p[BFD].required_min_rx_interval,
1645                           self.vpp_session.required_min_rx,
1646                           "BFD required min rx interval")
1647         self.test_session.send_packet()
1648         self.vapi.bfd_udp_set_echo_source(
1649             sw_if_index=self.loopback0.sw_if_index)
1650         echo_seen = False
1651         # should be turned on - loopback echo packets
1652         for dummy in range(3):
1653             loop_until = time.time() + 0.75 * detection_time
1654             while time.time() < loop_until:
1655                 p = self.pg0.wait_for_packet(1)
1656                 self.logger.debug(ppp("Got packet:", p))
1657                 if p[UDP].dport == BFD.udp_dport_echo:
1658                     self.assert_equal(
1659                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1660                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1661                                         "BFD ECHO src IP equal to loopback IP")
1662                     self.logger.debug(ppp("Looping back packet:", p))
1663                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1664                                       "ECHO packet destination MAC address")
1665                     p[Ether].dst = self.pg0.local_mac
1666                     self.pg0.add_stream(p)
1667                     self.pg_start()
1668                     echo_seen = True
1669                 elif p.haslayer(BFD):
1670                     if echo_seen:
1671                         self.assertGreaterEqual(
1672                             p[BFD].required_min_rx_interval,
1673                             1000000)
1674                     if "P" in p.sprintf("%BFD.flags%"):
1675                         final = self.test_session.create_packet()
1676                         final[BFD].flags = "F"
1677                         self.test_session.send_packet(final)
1678                 else:
1679                     raise Exception(ppp("Received unknown packet:", p))
1680
1681                 self.assert_equal(len(self.vapi.collect_events()), 0,
1682                                   "number of bfd events")
1683             self.test_session.send_packet()
1684         self.assertTrue(echo_seen, "No echo packets received")
1685
1686     def test_intf_deleted(self):
1687         """ interface with bfd session deleted """
1688         intf = VppLoInterface(self)
1689         intf.config_ip6()
1690         intf.admin_up()
1691         sw_if_index = intf.sw_if_index
1692         vpp_session = VppBFDUDPSession(
1693             self, intf, intf.remote_ip6, af=AF_INET6)
1694         vpp_session.add_vpp_config()
1695         vpp_session.admin_up()
1696         intf.remove_vpp_config()
1697         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1698         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1699         self.assertFalse(vpp_session.query_vpp_config())
1700
1701
1702 class BFDFIBTestCase(VppTestCase):
1703     """ BFD-FIB interactions (IPv6) """
1704
1705     vpp_session = None
1706     test_session = None
1707
1708     @classmethod
1709     def setUpClass(cls):
1710         super(BFDFIBTestCase, cls).setUpClass()
1711
1712     @classmethod
1713     def tearDownClass(cls):
1714         super(BFDFIBTestCase, cls).tearDownClass()
1715
1716     def setUp(self):
1717         super(BFDFIBTestCase, self).setUp()
1718         self.create_pg_interfaces(range(1))
1719
1720         self.vapi.want_bfd_events()
1721         self.pg0.enable_capture()
1722
1723         for i in self.pg_interfaces:
1724             i.admin_up()
1725             i.config_ip6()
1726             i.configure_ipv6_neighbors()
1727
1728     def tearDown(self):
1729         if not self.vpp_dead:
1730             self.vapi.want_bfd_events(enable_disable=False)
1731
1732         super(BFDFIBTestCase, self).tearDown()
1733
1734     @staticmethod
1735     def pkt_is_not_data_traffic(p):
1736         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1737         if p.haslayer(BFD) or is_ipv6_misc(p):
1738             return True
1739         return False
1740
1741     def test_session_with_fib(self):
1742         """ BFD-FIB interactions """
1743
1744         # packets to match against both of the routes
1745         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1746               IPv6(src="3001::1", dst="2001::1") /
1747               UDP(sport=1234, dport=1234) /
1748               Raw(b'\xa5' * 100)),
1749              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1750               IPv6(src="3001::1", dst="2002::1") /
1751               UDP(sport=1234, dport=1234) /
1752               Raw(b'\xa5' * 100))]
1753
1754         # A recursive and a non-recursive route via a next-hop that
1755         # will have a BFD session
1756         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1757                                   [VppRoutePath(self.pg0.remote_ip6,
1758                                                 self.pg0.sw_if_index)])
1759         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1760                                   [VppRoutePath(self.pg0.remote_ip6,
1761                                                 0xffffffff)])
1762         ip_2001_s_64.add_vpp_config()
1763         ip_2002_s_64.add_vpp_config()
1764
1765         # bring the session up now the routes are present
1766         self.vpp_session = VppBFDUDPSession(self,
1767                                             self.pg0,
1768                                             self.pg0.remote_ip6,
1769                                             af=AF_INET6)
1770         self.vpp_session.add_vpp_config()
1771         self.vpp_session.admin_up()
1772         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1773
1774         # session is up - traffic passes
1775         bfd_session_up(self)
1776
1777         self.pg0.add_stream(p)
1778         self.pg_start()
1779         for packet in p:
1780             captured = self.pg0.wait_for_packet(
1781                 1,
1782                 filter_out_fn=self.pkt_is_not_data_traffic)
1783             self.assertEqual(captured[IPv6].dst,
1784                              packet[IPv6].dst)
1785
1786         # session is up - traffic is dropped
1787         bfd_session_down(self)
1788
1789         self.pg0.add_stream(p)
1790         self.pg_start()
1791         with self.assertRaises(CaptureTimeoutError):
1792             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1793
1794         # session is up - traffic passes
1795         bfd_session_up(self)
1796
1797         self.pg0.add_stream(p)
1798         self.pg_start()
1799         for packet in p:
1800             captured = self.pg0.wait_for_packet(
1801                 1,
1802                 filter_out_fn=self.pkt_is_not_data_traffic)
1803             self.assertEqual(captured[IPv6].dst,
1804                              packet[IPv6].dst)
1805
1806
1807 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1808 class BFDTunTestCase(VppTestCase):
1809     """ BFD over GRE tunnel """
1810
1811     vpp_session = None
1812     test_session = None
1813
1814     @classmethod
1815     def setUpClass(cls):
1816         super(BFDTunTestCase, cls).setUpClass()
1817
1818     @classmethod
1819     def tearDownClass(cls):
1820         super(BFDTunTestCase, cls).tearDownClass()
1821
1822     def setUp(self):
1823         super(BFDTunTestCase, self).setUp()
1824         self.create_pg_interfaces(range(1))
1825
1826         self.vapi.want_bfd_events()
1827         self.pg0.enable_capture()
1828
1829         for i in self.pg_interfaces:
1830             i.admin_up()
1831             i.config_ip4()
1832             i.resolve_arp()
1833
1834     def tearDown(self):
1835         if not self.vpp_dead:
1836             self.vapi.want_bfd_events(enable_disable=0)
1837
1838         super(BFDTunTestCase, self).tearDown()
1839
1840     @staticmethod
1841     def pkt_is_not_data_traffic(p):
1842         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1843         if p.haslayer(BFD) or is_ipv6_misc(p):
1844             return True
1845         return False
1846
1847     def test_bfd_o_gre(self):
1848         """ BFD-o-GRE  """
1849
1850         # A GRE interface over which to run a BFD session
1851         gre_if = VppGreInterface(self,
1852                                  self.pg0.local_ip4,
1853                                  self.pg0.remote_ip4)
1854         gre_if.add_vpp_config()
1855         gre_if.admin_up()
1856         gre_if.config_ip4()
1857
1858         # bring the session up now the routes are present
1859         self.vpp_session = VppBFDUDPSession(self,
1860                                             gre_if,
1861                                             gre_if.remote_ip4,
1862                                             is_tunnel=True)
1863         self.vpp_session.add_vpp_config()
1864         self.vpp_session.admin_up()
1865
1866         self.test_session = BFDTestSession(
1867             self, gre_if, AF_INET,
1868             tunnel_header=(IP(src=self.pg0.remote_ip4,
1869                               dst=self.pg0.local_ip4) /
1870                            GRE()),
1871             phy_interface=self.pg0)
1872
1873         # packets to match against both of the routes
1874         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1875               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1876               UDP(sport=1234, dport=1234) /
1877               Raw(b'\xa5' * 100))]
1878
1879         # session is up - traffic passes
1880         bfd_session_up(self)
1881
1882         self.send_and_expect(self.pg0, p, self.pg0)
1883
1884         # bring session down
1885         bfd_session_down(self)
1886
1887
1888 class BFDSHA1TestCase(VppTestCase):
1889     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1890
1891     pg0 = None
1892     vpp_clock_offset = None
1893     vpp_session = None
1894     test_session = None
1895
1896     @classmethod
1897     def setUpClass(cls):
1898         super(BFDSHA1TestCase, cls).setUpClass()
1899         cls.vapi.cli("set log class bfd level debug")
1900         try:
1901             cls.create_pg_interfaces([0])
1902             cls.pg0.config_ip4()
1903             cls.pg0.admin_up()
1904             cls.pg0.resolve_arp()
1905
1906         except Exception:
1907             super(BFDSHA1TestCase, cls).tearDownClass()
1908             raise
1909
1910     @classmethod
1911     def tearDownClass(cls):
1912         super(BFDSHA1TestCase, cls).tearDownClass()
1913
1914     def setUp(self):
1915         super(BFDSHA1TestCase, self).setUp()
1916         self.factory = AuthKeyFactory()
1917         self.vapi.want_bfd_events()
1918         self.pg0.enable_capture()
1919
1920     def tearDown(self):
1921         if not self.vpp_dead:
1922             self.vapi.want_bfd_events(enable_disable=False)
1923         self.vapi.collect_events()  # clear the event queue
1924         super(BFDSHA1TestCase, self).tearDown()
1925
1926     def test_session_up(self):
1927         """ bring BFD session up """
1928         key = self.factory.create_random_key(self)
1929         key.add_vpp_config()
1930         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1931                                             self.pg0.remote_ip4,
1932                                             sha1_key=key)
1933         self.vpp_session.add_vpp_config()
1934         self.vpp_session.admin_up()
1935         self.test_session = BFDTestSession(
1936             self, self.pg0, AF_INET, sha1_key=key,
1937             bfd_key_id=self.vpp_session.bfd_key_id)
1938         bfd_session_up(self)
1939
1940     def test_hold_up(self):
1941         """ hold BFD session up """
1942         key = self.factory.create_random_key(self)
1943         key.add_vpp_config()
1944         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1945                                             self.pg0.remote_ip4,
1946                                             sha1_key=key)
1947         self.vpp_session.add_vpp_config()
1948         self.vpp_session.admin_up()
1949         self.test_session = BFDTestSession(
1950             self, self.pg0, AF_INET, sha1_key=key,
1951             bfd_key_id=self.vpp_session.bfd_key_id)
1952         bfd_session_up(self)
1953         for dummy in range(self.test_session.detect_mult * 2):
1954             wait_for_bfd_packet(self)
1955             self.test_session.send_packet()
1956         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1957
1958     def test_hold_up_meticulous(self):
1959         """ hold BFD session up - meticulous auth """
1960         key = self.factory.create_random_key(
1961             self, BFDAuthType.meticulous_keyed_sha1)
1962         key.add_vpp_config()
1963         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1964                                             self.pg0.remote_ip4, sha1_key=key)
1965         self.vpp_session.add_vpp_config()
1966         self.vpp_session.admin_up()
1967         # specify sequence number so that it wraps
1968         self.test_session = BFDTestSession(
1969             self, self.pg0, AF_INET, sha1_key=key,
1970             bfd_key_id=self.vpp_session.bfd_key_id,
1971             our_seq_number=0xFFFFFFFF - 4)
1972         bfd_session_up(self)
1973         for dummy in range(30):
1974             wait_for_bfd_packet(self)
1975             self.test_session.inc_seq_num()
1976             self.test_session.send_packet()
1977         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1978
1979     def test_send_bad_seq_number(self):
1980         """ session is not kept alive by msgs with bad sequence numbers"""
1981         key = self.factory.create_random_key(
1982             self, BFDAuthType.meticulous_keyed_sha1)
1983         key.add_vpp_config()
1984         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1985                                             self.pg0.remote_ip4, sha1_key=key)
1986         self.vpp_session.add_vpp_config()
1987         self.test_session = BFDTestSession(
1988             self, self.pg0, AF_INET, sha1_key=key,
1989             bfd_key_id=self.vpp_session.bfd_key_id)
1990         bfd_session_up(self)
1991         detection_time = self.test_session.detect_mult *\
1992             self.vpp_session.required_min_rx / USEC_IN_SEC
1993         send_until = time.time() + 2 * detection_time
1994         while time.time() < send_until:
1995             self.test_session.send_packet()
1996             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1997                        "time between bfd packets")
1998         e = self.vapi.collect_events()
1999         # session should be down now, because the sequence numbers weren't
2000         # updated
2001         self.assert_equal(len(e), 1, "number of bfd events")
2002         verify_event(self, e[0], expected_state=BFDState.down)
2003
2004     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2005                                        legitimate_test_session,
2006                                        rogue_test_session,
2007                                        rogue_bfd_values=None):
2008         """ execute a rogue session interaction scenario
2009
2010         1. create vpp session, add config
2011         2. bring the legitimate session up
2012         3. copy the bfd values from legitimate session to rogue session
2013         4. apply rogue_bfd_values to rogue session
2014         5. set rogue session state to down
2015         6. send message to take the session down from the rogue session
2016         7. assert that the legitimate session is unaffected
2017         """
2018
2019         self.vpp_session = vpp_bfd_udp_session
2020         self.vpp_session.add_vpp_config()
2021         self.test_session = legitimate_test_session
2022         # bring vpp session up
2023         bfd_session_up(self)
2024         # send packet from rogue session
2025         rogue_test_session.update(
2026             my_discriminator=self.test_session.my_discriminator,
2027             your_discriminator=self.test_session.your_discriminator,
2028             desired_min_tx=self.test_session.desired_min_tx,
2029             required_min_rx=self.test_session.required_min_rx,
2030             detect_mult=self.test_session.detect_mult,
2031             diag=self.test_session.diag,
2032             state=self.test_session.state,
2033             auth_type=self.test_session.auth_type)
2034         if rogue_bfd_values:
2035             rogue_test_session.update(**rogue_bfd_values)
2036         rogue_test_session.update(state=BFDState.down)
2037         rogue_test_session.send_packet()
2038         wait_for_bfd_packet(self)
2039         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2040
2041     def test_mismatch_auth(self):
2042         """ session is not brought down by unauthenticated msg """
2043         key = self.factory.create_random_key(self)
2044         key.add_vpp_config()
2045         vpp_session = VppBFDUDPSession(
2046             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2047         legitimate_test_session = BFDTestSession(
2048             self, self.pg0, AF_INET, sha1_key=key,
2049             bfd_key_id=vpp_session.bfd_key_id)
2050         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2051         self.execute_rogue_session_scenario(vpp_session,
2052                                             legitimate_test_session,
2053                                             rogue_test_session)
2054
2055     def test_mismatch_bfd_key_id(self):
2056         """ session is not brought down by msg with non-existent key-id """
2057         key = self.factory.create_random_key(self)
2058         key.add_vpp_config()
2059         vpp_session = VppBFDUDPSession(
2060             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2061         # pick a different random bfd key id
2062         x = randint(0, 255)
2063         while x == vpp_session.bfd_key_id:
2064             x = randint(0, 255)
2065         legitimate_test_session = BFDTestSession(
2066             self, self.pg0, AF_INET, sha1_key=key,
2067             bfd_key_id=vpp_session.bfd_key_id)
2068         rogue_test_session = BFDTestSession(
2069             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2070         self.execute_rogue_session_scenario(vpp_session,
2071                                             legitimate_test_session,
2072                                             rogue_test_session)
2073
2074     def test_mismatched_auth_type(self):
2075         """ session is not brought down by msg with wrong auth type """
2076         key = self.factory.create_random_key(self)
2077         key.add_vpp_config()
2078         vpp_session = VppBFDUDPSession(
2079             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2080         legitimate_test_session = BFDTestSession(
2081             self, self.pg0, AF_INET, sha1_key=key,
2082             bfd_key_id=vpp_session.bfd_key_id)
2083         rogue_test_session = BFDTestSession(
2084             self, self.pg0, AF_INET, sha1_key=key,
2085             bfd_key_id=vpp_session.bfd_key_id)
2086         self.execute_rogue_session_scenario(
2087             vpp_session, legitimate_test_session, rogue_test_session,
2088             {'auth_type': BFDAuthType.keyed_md5})
2089
2090     def test_restart(self):
2091         """ simulate remote peer restart and resynchronization """
2092         key = self.factory.create_random_key(
2093             self, BFDAuthType.meticulous_keyed_sha1)
2094         key.add_vpp_config()
2095         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2096                                             self.pg0.remote_ip4, sha1_key=key)
2097         self.vpp_session.add_vpp_config()
2098         self.test_session = BFDTestSession(
2099             self, self.pg0, AF_INET, sha1_key=key,
2100             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2101         bfd_session_up(self)
2102         # don't send any packets for 2*detection_time
2103         detection_time = self.test_session.detect_mult *\
2104             self.vpp_session.required_min_rx / USEC_IN_SEC
2105         self.sleep(2 * detection_time, "simulating peer restart")
2106         events = self.vapi.collect_events()
2107         self.assert_equal(len(events), 1, "number of bfd events")
2108         verify_event(self, events[0], expected_state=BFDState.down)
2109         self.test_session.update(state=BFDState.down)
2110         # reset sequence number
2111         self.test_session.our_seq_number = 0
2112         self.test_session.vpp_seq_number = None
2113         # now throw away any pending packets
2114         self.pg0.enable_capture()
2115         self.test_session.my_discriminator = 0
2116         bfd_session_up(self)
2117
2118
2119 class BFDAuthOnOffTestCase(VppTestCase):
2120     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2121
2122     pg0 = None
2123     vpp_session = None
2124     test_session = None
2125
2126     @classmethod
2127     def setUpClass(cls):
2128         super(BFDAuthOnOffTestCase, cls).setUpClass()
2129         cls.vapi.cli("set log class bfd level debug")
2130         try:
2131             cls.create_pg_interfaces([0])
2132             cls.pg0.config_ip4()
2133             cls.pg0.admin_up()
2134             cls.pg0.resolve_arp()
2135
2136         except Exception:
2137             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2138             raise
2139
2140     @classmethod
2141     def tearDownClass(cls):
2142         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2143
2144     def setUp(self):
2145         super(BFDAuthOnOffTestCase, self).setUp()
2146         self.factory = AuthKeyFactory()
2147         self.vapi.want_bfd_events()
2148         self.pg0.enable_capture()
2149
2150     def tearDown(self):
2151         if not self.vpp_dead:
2152             self.vapi.want_bfd_events(enable_disable=False)
2153         self.vapi.collect_events()  # clear the event queue
2154         super(BFDAuthOnOffTestCase, self).tearDown()
2155
2156     def test_auth_on_immediate(self):
2157         """ turn auth on without disturbing session state (immediate) """
2158         key = self.factory.create_random_key(self)
2159         key.add_vpp_config()
2160         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2161                                             self.pg0.remote_ip4)
2162         self.vpp_session.add_vpp_config()
2163         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2164         bfd_session_up(self)
2165         for dummy in range(self.test_session.detect_mult * 2):
2166             p = wait_for_bfd_packet(self)
2167             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2168             self.test_session.send_packet()
2169         self.vpp_session.activate_auth(key)
2170         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2171         self.test_session.sha1_key = key
2172         for dummy in range(self.test_session.detect_mult * 2):
2173             p = wait_for_bfd_packet(self)
2174             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2175             self.test_session.send_packet()
2176         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2177         self.assert_equal(len(self.vapi.collect_events()), 0,
2178                           "number of bfd events")
2179
2180     def test_auth_off_immediate(self):
2181         """ turn auth off without disturbing session state (immediate) """
2182         key = self.factory.create_random_key(self)
2183         key.add_vpp_config()
2184         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2185                                             self.pg0.remote_ip4, sha1_key=key)
2186         self.vpp_session.add_vpp_config()
2187         self.test_session = BFDTestSession(
2188             self, self.pg0, AF_INET, sha1_key=key,
2189             bfd_key_id=self.vpp_session.bfd_key_id)
2190         bfd_session_up(self)
2191         # self.vapi.want_bfd_events(enable_disable=0)
2192         for dummy in range(self.test_session.detect_mult * 2):
2193             p = wait_for_bfd_packet(self)
2194             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2195             self.test_session.inc_seq_num()
2196             self.test_session.send_packet()
2197         self.vpp_session.deactivate_auth()
2198         self.test_session.bfd_key_id = None
2199         self.test_session.sha1_key = None
2200         for dummy in range(self.test_session.detect_mult * 2):
2201             p = wait_for_bfd_packet(self)
2202             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2203             self.test_session.inc_seq_num()
2204             self.test_session.send_packet()
2205         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2206         self.assert_equal(len(self.vapi.collect_events()), 0,
2207                           "number of bfd events")
2208
2209     def test_auth_change_key_immediate(self):
2210         """ change auth key without disturbing session state (immediate) """
2211         key1 = self.factory.create_random_key(self)
2212         key1.add_vpp_config()
2213         key2 = self.factory.create_random_key(self)
2214         key2.add_vpp_config()
2215         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2216                                             self.pg0.remote_ip4, sha1_key=key1)
2217         self.vpp_session.add_vpp_config()
2218         self.test_session = BFDTestSession(
2219             self, self.pg0, AF_INET, sha1_key=key1,
2220             bfd_key_id=self.vpp_session.bfd_key_id)
2221         bfd_session_up(self)
2222         for dummy in range(self.test_session.detect_mult * 2):
2223             p = wait_for_bfd_packet(self)
2224             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2225             self.test_session.send_packet()
2226         self.vpp_session.activate_auth(key2)
2227         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2228         self.test_session.sha1_key = key2
2229         for dummy in range(self.test_session.detect_mult * 2):
2230             p = wait_for_bfd_packet(self)
2231             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2232             self.test_session.send_packet()
2233         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2234         self.assert_equal(len(self.vapi.collect_events()), 0,
2235                           "number of bfd events")
2236
2237     def test_auth_on_delayed(self):
2238         """ turn auth on without disturbing session state (delayed) """
2239         key = self.factory.create_random_key(self)
2240         key.add_vpp_config()
2241         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2242                                             self.pg0.remote_ip4)
2243         self.vpp_session.add_vpp_config()
2244         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2245         bfd_session_up(self)
2246         for dummy in range(self.test_session.detect_mult * 2):
2247             wait_for_bfd_packet(self)
2248             self.test_session.send_packet()
2249         self.vpp_session.activate_auth(key, delayed=True)
2250         for dummy in range(self.test_session.detect_mult * 2):
2251             p = wait_for_bfd_packet(self)
2252             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2253             self.test_session.send_packet()
2254         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2255         self.test_session.sha1_key = key
2256         self.test_session.send_packet()
2257         for dummy in range(self.test_session.detect_mult * 2):
2258             p = wait_for_bfd_packet(self)
2259             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2260             self.test_session.send_packet()
2261         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2262         self.assert_equal(len(self.vapi.collect_events()), 0,
2263                           "number of bfd events")
2264
2265     def test_auth_off_delayed(self):
2266         """ turn auth off without disturbing session state (delayed) """
2267         key = self.factory.create_random_key(self)
2268         key.add_vpp_config()
2269         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2270                                             self.pg0.remote_ip4, sha1_key=key)
2271         self.vpp_session.add_vpp_config()
2272         self.test_session = BFDTestSession(
2273             self, self.pg0, AF_INET, sha1_key=key,
2274             bfd_key_id=self.vpp_session.bfd_key_id)
2275         bfd_session_up(self)
2276         for dummy in range(self.test_session.detect_mult * 2):
2277             p = wait_for_bfd_packet(self)
2278             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2279             self.test_session.send_packet()
2280         self.vpp_session.deactivate_auth(delayed=True)
2281         for dummy in range(self.test_session.detect_mult * 2):
2282             p = wait_for_bfd_packet(self)
2283             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2284             self.test_session.send_packet()
2285         self.test_session.bfd_key_id = None
2286         self.test_session.sha1_key = None
2287         self.test_session.send_packet()
2288         for dummy in range(self.test_session.detect_mult * 2):
2289             p = wait_for_bfd_packet(self)
2290             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2291             self.test_session.send_packet()
2292         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2293         self.assert_equal(len(self.vapi.collect_events()), 0,
2294                           "number of bfd events")
2295
2296     def test_auth_change_key_delayed(self):
2297         """ change auth key without disturbing session state (delayed) """
2298         key1 = self.factory.create_random_key(self)
2299         key1.add_vpp_config()
2300         key2 = self.factory.create_random_key(self)
2301         key2.add_vpp_config()
2302         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2303                                             self.pg0.remote_ip4, sha1_key=key1)
2304         self.vpp_session.add_vpp_config()
2305         self.vpp_session.admin_up()
2306         self.test_session = BFDTestSession(
2307             self, self.pg0, AF_INET, sha1_key=key1,
2308             bfd_key_id=self.vpp_session.bfd_key_id)
2309         bfd_session_up(self)
2310         for dummy in range(self.test_session.detect_mult * 2):
2311             p = wait_for_bfd_packet(self)
2312             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2313             self.test_session.send_packet()
2314         self.vpp_session.activate_auth(key2, delayed=True)
2315         for dummy in range(self.test_session.detect_mult * 2):
2316             p = wait_for_bfd_packet(self)
2317             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2318             self.test_session.send_packet()
2319         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2320         self.test_session.sha1_key = key2
2321         self.test_session.send_packet()
2322         for dummy in range(self.test_session.detect_mult * 2):
2323             p = wait_for_bfd_packet(self)
2324             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2325             self.test_session.send_packet()
2326         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2327         self.assert_equal(len(self.vapi.collect_events()), 0,
2328                           "number of bfd events")
2329
2330
2331 class BFDCLITestCase(VppTestCase):
2332     """Bidirectional Forwarding Detection (BFD) (CLI) """
2333     pg0 = None
2334
2335     @classmethod
2336     def setUpClass(cls):
2337         super(BFDCLITestCase, cls).setUpClass()
2338         cls.vapi.cli("set log class bfd level debug")
2339         try:
2340             cls.create_pg_interfaces((0,))
2341             cls.pg0.config_ip4()
2342             cls.pg0.config_ip6()
2343             cls.pg0.resolve_arp()
2344             cls.pg0.resolve_ndp()
2345
2346         except Exception:
2347             super(BFDCLITestCase, cls).tearDownClass()
2348             raise
2349
2350     @classmethod
2351     def tearDownClass(cls):
2352         super(BFDCLITestCase, cls).tearDownClass()
2353
2354     def setUp(self):
2355         super(BFDCLITestCase, self).setUp()
2356         self.factory = AuthKeyFactory()
2357         self.pg0.enable_capture()
2358
2359     def tearDown(self):
2360         try:
2361             self.vapi.want_bfd_events(enable_disable=False)
2362         except UnexpectedApiReturnValueError:
2363             # some tests aren't subscribed, so this is not an issue
2364             pass
2365         self.vapi.collect_events()  # clear the event queue
2366         super(BFDCLITestCase, self).tearDown()
2367
2368     def cli_verify_no_response(self, cli):
2369         """ execute a CLI, asserting that the response is empty """
2370         self.assert_equal(self.vapi.cli(cli),
2371                           "",
2372                           "CLI command response")
2373
2374     def cli_verify_response(self, cli, expected):
2375         """ execute a CLI, asserting that the response matches expectation """
2376         try:
2377             reply = self.vapi.cli(cli)
2378         except CliFailedCommandError as cli_error:
2379             reply = str(cli_error)
2380         self.assert_equal(reply.strip(),
2381                           expected,
2382                           "CLI command response")
2383
2384     def test_show(self):
2385         """ show commands """
2386         k1 = self.factory.create_random_key(self)
2387         k1.add_vpp_config()
2388         k2 = self.factory.create_random_key(
2389             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2390         k2.add_vpp_config()
2391         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2392         s1.add_vpp_config()
2393         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2394                               sha1_key=k2)
2395         s2.add_vpp_config()
2396         self.logger.info(self.vapi.ppcli("show bfd keys"))
2397         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2398         self.logger.info(self.vapi.ppcli("show bfd"))
2399
2400     def test_set_del_sha1_key(self):
2401         """ set/delete SHA1 auth key """
2402         k = self.factory.create_random_key(self)
2403         self.registry.register(k, self.logger)
2404         self.cli_verify_no_response(
2405             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2406             (k.conf_key_id,
2407                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2408         self.assertTrue(k.query_vpp_config())
2409         self.vpp_session = VppBFDUDPSession(
2410             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2411         self.vpp_session.add_vpp_config()
2412         self.test_session = \
2413             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2414                            bfd_key_id=self.vpp_session.bfd_key_id)
2415         self.vapi.want_bfd_events()
2416         bfd_session_up(self)
2417         bfd_session_down(self)
2418         # try to replace the secret for the key - should fail because the key
2419         # is in-use
2420         k2 = self.factory.create_random_key(self)
2421         self.cli_verify_response(
2422             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2423             (k.conf_key_id,
2424                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2425             "bfd key set: `bfd_auth_set_key' API call failed, "
2426             "rv=-103:BFD object in use")
2427         # manipulating the session using old secret should still work
2428         bfd_session_up(self)
2429         bfd_session_down(self)
2430         self.vpp_session.remove_vpp_config()
2431         self.cli_verify_no_response(
2432             "bfd key del conf-key-id %s" % k.conf_key_id)
2433         self.assertFalse(k.query_vpp_config())
2434
2435     def test_set_del_meticulous_sha1_key(self):
2436         """ set/delete meticulous SHA1 auth key """
2437         k = self.factory.create_random_key(
2438             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2439         self.registry.register(k, self.logger)
2440         self.cli_verify_no_response(
2441             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2442             (k.conf_key_id,
2443                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2444         self.assertTrue(k.query_vpp_config())
2445         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2446                                             self.pg0.remote_ip6, af=AF_INET6,
2447                                             sha1_key=k)
2448         self.vpp_session.add_vpp_config()
2449         self.vpp_session.admin_up()
2450         self.test_session = \
2451             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2452                            bfd_key_id=self.vpp_session.bfd_key_id)
2453         self.vapi.want_bfd_events()
2454         bfd_session_up(self)
2455         bfd_session_down(self)
2456         # try to replace the secret for the key - should fail because the key
2457         # is in-use
2458         k2 = self.factory.create_random_key(self)
2459         self.cli_verify_response(
2460             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2461             (k.conf_key_id,
2462                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2463             "bfd key set: `bfd_auth_set_key' API call failed, "
2464             "rv=-103:BFD object in use")
2465         # manipulating the session using old secret should still work
2466         bfd_session_up(self)
2467         bfd_session_down(self)
2468         self.vpp_session.remove_vpp_config()
2469         self.cli_verify_no_response(
2470             "bfd key del conf-key-id %s" % k.conf_key_id)
2471         self.assertFalse(k.query_vpp_config())
2472
2473     def test_add_mod_del_bfd_udp(self):
2474         """ create/modify/delete IPv4 BFD UDP session """
2475         vpp_session = VppBFDUDPSession(
2476             self, self.pg0, self.pg0.remote_ip4)
2477         self.registry.register(vpp_session, self.logger)
2478         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2479             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2480             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2481                                 self.pg0.remote_ip4,
2482                                 vpp_session.desired_min_tx,
2483                                 vpp_session.required_min_rx,
2484                                 vpp_session.detect_mult)
2485         self.cli_verify_no_response(cli_add_cmd)
2486         # 2nd add should fail
2487         self.cli_verify_response(
2488             cli_add_cmd,
2489             "bfd udp session add: `bfd_add_add_session' API call"
2490             " failed, rv=-101:Duplicate BFD object")
2491         verify_bfd_session_config(self, vpp_session)
2492         mod_session = VppBFDUDPSession(
2493             self, self.pg0, self.pg0.remote_ip4,
2494             required_min_rx=2 * vpp_session.required_min_rx,
2495             desired_min_tx=3 * vpp_session.desired_min_tx,
2496             detect_mult=4 * vpp_session.detect_mult)
2497         self.cli_verify_no_response(
2498             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2499             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2500             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2501              mod_session.desired_min_tx, mod_session.required_min_rx,
2502              mod_session.detect_mult))
2503         verify_bfd_session_config(self, mod_session)
2504         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2505             "peer-addr %s" % (self.pg0.name,
2506                               self.pg0.local_ip4, self.pg0.remote_ip4)
2507         self.cli_verify_no_response(cli_del_cmd)
2508         # 2nd del is expected to fail
2509         self.cli_verify_response(
2510             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2511             " failed, rv=-102:No such BFD object")
2512         self.assertFalse(vpp_session.query_vpp_config())
2513
2514     def test_add_mod_del_bfd_udp6(self):
2515         """ create/modify/delete IPv6 BFD UDP session """
2516         vpp_session = VppBFDUDPSession(
2517             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2518         self.registry.register(vpp_session, self.logger)
2519         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2520             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2521             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2522                                 self.pg0.remote_ip6,
2523                                 vpp_session.desired_min_tx,
2524                                 vpp_session.required_min_rx,
2525                                 vpp_session.detect_mult)
2526         self.cli_verify_no_response(cli_add_cmd)
2527         # 2nd add should fail
2528         self.cli_verify_response(
2529             cli_add_cmd,
2530             "bfd udp session add: `bfd_add_add_session' API call"
2531             " failed, rv=-101:Duplicate BFD object")
2532         verify_bfd_session_config(self, vpp_session)
2533         mod_session = VppBFDUDPSession(
2534             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2535             required_min_rx=2 * vpp_session.required_min_rx,
2536             desired_min_tx=3 * vpp_session.desired_min_tx,
2537             detect_mult=4 * vpp_session.detect_mult)
2538         self.cli_verify_no_response(
2539             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2540             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2541             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2542              mod_session.desired_min_tx,
2543              mod_session.required_min_rx, mod_session.detect_mult))
2544         verify_bfd_session_config(self, mod_session)
2545         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2546             "peer-addr %s" % (self.pg0.name,
2547                               self.pg0.local_ip6, self.pg0.remote_ip6)
2548         self.cli_verify_no_response(cli_del_cmd)
2549         # 2nd del is expected to fail
2550         self.cli_verify_response(
2551             cli_del_cmd,
2552             "bfd udp session del: `bfd_udp_del_session' API call"
2553             " failed, rv=-102:No such BFD object")
2554         self.assertFalse(vpp_session.query_vpp_config())
2555
2556     def test_add_mod_del_bfd_udp_auth(self):
2557         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2558         key = self.factory.create_random_key(self)
2559         key.add_vpp_config()
2560         vpp_session = VppBFDUDPSession(
2561             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2562         self.registry.register(vpp_session, self.logger)
2563         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2564             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2565             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2566             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2567                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2568                vpp_session.detect_mult, key.conf_key_id,
2569                vpp_session.bfd_key_id)
2570         self.cli_verify_no_response(cli_add_cmd)
2571         # 2nd add should fail
2572         self.cli_verify_response(
2573             cli_add_cmd,
2574             "bfd udp session add: `bfd_add_add_session' API call"
2575             " failed, rv=-101:Duplicate BFD object")
2576         verify_bfd_session_config(self, vpp_session)
2577         mod_session = VppBFDUDPSession(
2578             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2579             bfd_key_id=vpp_session.bfd_key_id,
2580             required_min_rx=2 * vpp_session.required_min_rx,
2581             desired_min_tx=3 * vpp_session.desired_min_tx,
2582             detect_mult=4 * vpp_session.detect_mult)
2583         self.cli_verify_no_response(
2584             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2585             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2586             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2587              mod_session.desired_min_tx,
2588              mod_session.required_min_rx, mod_session.detect_mult))
2589         verify_bfd_session_config(self, mod_session)
2590         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2591             "peer-addr %s" % (self.pg0.name,
2592                               self.pg0.local_ip4, self.pg0.remote_ip4)
2593         self.cli_verify_no_response(cli_del_cmd)
2594         # 2nd del is expected to fail
2595         self.cli_verify_response(
2596             cli_del_cmd,
2597             "bfd udp session del: `bfd_udp_del_session' API call"
2598             " failed, rv=-102:No such BFD object")
2599         self.assertFalse(vpp_session.query_vpp_config())
2600
2601     def test_add_mod_del_bfd_udp6_auth(self):
2602         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2603         key = self.factory.create_random_key(
2604             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2605         key.add_vpp_config()
2606         vpp_session = VppBFDUDPSession(
2607             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2608         self.registry.register(vpp_session, self.logger)
2609         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2610             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2611             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2612             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2613                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2614                vpp_session.detect_mult, key.conf_key_id,
2615                vpp_session.bfd_key_id)
2616         self.cli_verify_no_response(cli_add_cmd)
2617         # 2nd add should fail
2618         self.cli_verify_response(
2619             cli_add_cmd,
2620             "bfd udp session add: `bfd_add_add_session' API call"
2621             " failed, rv=-101:Duplicate BFD object")
2622         verify_bfd_session_config(self, vpp_session)
2623         mod_session = VppBFDUDPSession(
2624             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2625             bfd_key_id=vpp_session.bfd_key_id,
2626             required_min_rx=2 * vpp_session.required_min_rx,
2627             desired_min_tx=3 * vpp_session.desired_min_tx,
2628             detect_mult=4 * vpp_session.detect_mult)
2629         self.cli_verify_no_response(
2630             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2631             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2632             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2633              mod_session.desired_min_tx,
2634              mod_session.required_min_rx, mod_session.detect_mult))
2635         verify_bfd_session_config(self, mod_session)
2636         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2637             "peer-addr %s" % (self.pg0.name,
2638                               self.pg0.local_ip6, self.pg0.remote_ip6)
2639         self.cli_verify_no_response(cli_del_cmd)
2640         # 2nd del is expected to fail
2641         self.cli_verify_response(
2642             cli_del_cmd,
2643             "bfd udp session del: `bfd_udp_del_session' API call"
2644             " failed, rv=-102:No such BFD object")
2645         self.assertFalse(vpp_session.query_vpp_config())
2646
2647     def test_auth_on_off(self):
2648         """ turn authentication on and off """
2649         key = self.factory.create_random_key(
2650             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2651         key.add_vpp_config()
2652         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2653         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2654                                         sha1_key=key)
2655         session.add_vpp_config()
2656         cli_activate = \
2657             "bfd udp session auth activate interface %s local-addr %s "\
2658             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2659             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2660                key.conf_key_id, auth_session.bfd_key_id)
2661         self.cli_verify_no_response(cli_activate)
2662         verify_bfd_session_config(self, auth_session)
2663         self.cli_verify_no_response(cli_activate)
2664         verify_bfd_session_config(self, auth_session)
2665         cli_deactivate = \
2666             "bfd udp session auth deactivate interface %s local-addr %s "\
2667             "peer-addr %s "\
2668             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2669         self.cli_verify_no_response(cli_deactivate)
2670         verify_bfd_session_config(self, session)
2671         self.cli_verify_no_response(cli_deactivate)
2672         verify_bfd_session_config(self, session)
2673
2674     def test_auth_on_off_delayed(self):
2675         """ turn authentication on and off (delayed) """
2676         key = self.factory.create_random_key(
2677             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2678         key.add_vpp_config()
2679         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2680         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2681                                         sha1_key=key)
2682         session.add_vpp_config()
2683         cli_activate = \
2684             "bfd udp session auth activate interface %s local-addr %s "\
2685             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2686             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2687                key.conf_key_id, auth_session.bfd_key_id)
2688         self.cli_verify_no_response(cli_activate)
2689         verify_bfd_session_config(self, auth_session)
2690         self.cli_verify_no_response(cli_activate)
2691         verify_bfd_session_config(self, auth_session)
2692         cli_deactivate = \
2693             "bfd udp session auth deactivate interface %s local-addr %s "\
2694             "peer-addr %s delayed yes"\
2695             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2696         self.cli_verify_no_response(cli_deactivate)
2697         verify_bfd_session_config(self, session)
2698         self.cli_verify_no_response(cli_deactivate)
2699         verify_bfd_session_config(self, session)
2700
2701     def test_admin_up_down(self):
2702         """ put session admin-up and admin-down """
2703         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2704         session.add_vpp_config()
2705         cli_down = \
2706             "bfd udp session set-flags admin down interface %s local-addr %s "\
2707             "peer-addr %s "\
2708             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2709         cli_up = \
2710             "bfd udp session set-flags admin up interface %s local-addr %s "\
2711             "peer-addr %s "\
2712             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2713         self.cli_verify_no_response(cli_down)
2714         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2715         self.cli_verify_no_response(cli_up)
2716         verify_bfd_session_config(self, session, state=BFDState.down)
2717
2718     def test_set_del_udp_echo_source(self):
2719         """ set/del udp echo source """
2720         self.create_loopback_interfaces(1)
2721         self.loopback0 = self.lo_interfaces[0]
2722         self.loopback0.admin_up()
2723         self.cli_verify_response("show bfd echo-source",
2724                                  "UDP echo source is not set.")
2725         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2726         self.cli_verify_no_response(cli_set)
2727         self.cli_verify_response("show bfd echo-source",
2728                                  "UDP echo source is: %s\n"
2729                                  "IPv4 address usable as echo source: none\n"
2730                                  "IPv6 address usable as echo source: none" %
2731                                  self.loopback0.name)
2732         self.loopback0.config_ip4()
2733         unpacked = unpack("!L", self.loopback0.local_ip4n)
2734         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2735         self.cli_verify_response("show bfd echo-source",
2736                                  "UDP echo source is: %s\n"
2737                                  "IPv4 address usable as echo source: %s\n"
2738                                  "IPv6 address usable as echo source: none" %
2739                                  (self.loopback0.name, echo_ip4))
2740         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2741         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2742                                             unpacked[2], unpacked[3] ^ 1))
2743         self.loopback0.config_ip6()
2744         self.cli_verify_response("show bfd echo-source",
2745                                  "UDP echo source is: %s\n"
2746                                  "IPv4 address usable as echo source: %s\n"
2747                                  "IPv6 address usable as echo source: %s" %
2748                                  (self.loopback0.name, echo_ip4, echo_ip6))
2749         cli_del = "bfd udp echo-source del"
2750         self.cli_verify_no_response(cli_del)
2751         self.cli_verify_response("show bfd echo-source",
2752                                  "UDP echo source is not set.")
2753
2754
2755 if __name__ == '__main__':
2756     unittest.main(testRunner=VppTestRunner)