36356c7d4ad86ea02a5771459c0b3cf5fe19458a
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
29     CliFailedCommandError
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
33
34 USEC_IN_SEC = 1000000
35
36
37 class AuthKeyFactory(object):
38     """Factory class for creating auth keys with unique conf key ID"""
39
40     def __init__(self):
41         self._conf_key_ids = {}
42
43     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44         """ create a random key with unique conf key id """
45         conf_key_id = randint(0, 0xFFFFFFFF)
46         while conf_key_id in self._conf_key_ids:
47             conf_key_id = randint(0, 0xFFFFFFFF)
48         self._conf_key_ids[conf_key_id] = 1
49         key = scapy.compat.raw(
50             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51         return VppBFDAuthKey(test=test, auth_type=auth_type,
52                              conf_key_id=conf_key_id, key=key)
53
54
55 class BFDAPITestCase(VppTestCase):
56     """Bidirectional Forwarding Detection (BFD) - API"""
57
58     pg0 = None
59     pg1 = None
60
61     @classmethod
62     def setUpClass(cls):
63         super(BFDAPITestCase, cls).setUpClass()
64         cls.vapi.cli("set log class bfd level debug")
65         try:
66             cls.create_pg_interfaces(range(2))
67             for i in cls.pg_interfaces:
68                 i.config_ip4()
69                 i.config_ip6()
70                 i.resolve_arp()
71
72         except Exception:
73             super(BFDAPITestCase, cls).tearDownClass()
74             raise
75
76     @classmethod
77     def tearDownClass(cls):
78         super(BFDAPITestCase, cls).tearDownClass()
79
80     def setUp(self):
81         super(BFDAPITestCase, self).setUp()
82         self.factory = AuthKeyFactory()
83
84     def test_add_bfd(self):
85         """ create a BFD session """
86         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
87         session.add_vpp_config()
88         self.logger.debug("Session state is %s", session.state)
89         session.remove_vpp_config()
90         session.add_vpp_config()
91         self.logger.debug("Session state is %s", session.state)
92         session.remove_vpp_config()
93
94     def test_double_add(self):
95         """ create the same BFD session twice (negative case) """
96         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
97         session.add_vpp_config()
98
99         with self.vapi.assert_negative_api_retval():
100             session.add_vpp_config()
101
102         session.remove_vpp_config()
103
104     def test_add_bfd6(self):
105         """ create IPv6 BFD session """
106         session = VppBFDUDPSession(
107             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
108         session.add_vpp_config()
109         self.logger.debug("Session state is %s", session.state)
110         session.remove_vpp_config()
111         session.add_vpp_config()
112         self.logger.debug("Session state is %s", session.state)
113         session.remove_vpp_config()
114
115     def test_mod_bfd(self):
116         """ modify BFD session parameters """
117         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
118                                    desired_min_tx=50000,
119                                    required_min_rx=10000,
120                                    detect_mult=1)
121         session.add_vpp_config()
122         s = session.get_bfd_udp_session_dump_entry()
123         self.assert_equal(session.desired_min_tx,
124                           s.desired_min_tx,
125                           "desired min transmit interval")
126         self.assert_equal(session.required_min_rx,
127                           s.required_min_rx,
128                           "required min receive interval")
129         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
131                                   required_min_rx=session.required_min_rx * 2,
132                                   detect_mult=session.detect_mult * 2)
133         s = session.get_bfd_udp_session_dump_entry()
134         self.assert_equal(session.desired_min_tx,
135                           s.desired_min_tx,
136                           "desired min transmit interval")
137         self.assert_equal(session.required_min_rx,
138                           s.required_min_rx,
139                           "required min receive interval")
140         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
141
142     def test_add_sha1_keys(self):
143         """ add SHA1 keys """
144         key_count = 10
145         keys = [self.factory.create_random_key(
146             self) for i in range(0, key_count)]
147         for key in keys:
148             self.assertFalse(key.query_vpp_config())
149         for key in keys:
150             key.add_vpp_config()
151         for key in keys:
152             self.assertTrue(key.query_vpp_config())
153         # remove randomly
154         indexes = list(range(key_count))
155         shuffle(indexes)
156         removed = []
157         for i in indexes:
158             key = keys[i]
159             key.remove_vpp_config()
160             removed.append(i)
161             for j in range(key_count):
162                 key = keys[j]
163                 if j in removed:
164                     self.assertFalse(key.query_vpp_config())
165                 else:
166                     self.assertTrue(key.query_vpp_config())
167         # should be removed now
168         for key in keys:
169             self.assertFalse(key.query_vpp_config())
170         # add back and remove again
171         for key in keys:
172             key.add_vpp_config()
173         for key in keys:
174             self.assertTrue(key.query_vpp_config())
175         for key in keys:
176             key.remove_vpp_config()
177         for key in keys:
178             self.assertFalse(key.query_vpp_config())
179
180     def test_add_bfd_sha1(self):
181         """ create a BFD session (SHA1) """
182         key = self.factory.create_random_key(self)
183         key.add_vpp_config()
184         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185                                    sha1_key=key)
186         session.add_vpp_config()
187         self.logger.debug("Session state is %s", session.state)
188         session.remove_vpp_config()
189         session.add_vpp_config()
190         self.logger.debug("Session state is %s", session.state)
191         session.remove_vpp_config()
192
193     def test_double_add_sha1(self):
194         """ create the same BFD session twice (negative case) (SHA1) """
195         key = self.factory.create_random_key(self)
196         key.add_vpp_config()
197         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
198                                    sha1_key=key)
199         session.add_vpp_config()
200         with self.assertRaises(Exception):
201             session.add_vpp_config()
202
203     def test_add_auth_nonexistent_key(self):
204         """ create BFD session using non-existent SHA1 (negative case) """
205         session = VppBFDUDPSession(
206             self, self.pg0, self.pg0.remote_ip4,
207             sha1_key=self.factory.create_random_key(self))
208         with self.assertRaises(Exception):
209             session.add_vpp_config()
210
211     def test_shared_sha1_key(self):
212         """ share single SHA1 key between multiple BFD sessions """
213         key = self.factory.create_random_key(self)
214         key.add_vpp_config()
215         sessions = [
216             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217                              sha1_key=key),
218             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
219                              sha1_key=key, af=AF_INET6),
220             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
221                              sha1_key=key),
222             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
223                              sha1_key=key, af=AF_INET6)]
224         for s in sessions:
225             s.add_vpp_config()
226         removed = 0
227         for s in sessions:
228             e = key.get_bfd_auth_keys_dump_entry()
229             self.assert_equal(e.use_count, len(sessions) - removed,
230                               "Use count for shared key")
231             s.remove_vpp_config()
232             removed += 1
233         e = key.get_bfd_auth_keys_dump_entry()
234         self.assert_equal(e.use_count, len(sessions) - removed,
235                           "Use count for shared key")
236
237     def test_activate_auth(self):
238         """ activate SHA1 authentication """
239         key = self.factory.create_random_key(self)
240         key.add_vpp_config()
241         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242         session.add_vpp_config()
243         session.activate_auth(key)
244
245     def test_deactivate_auth(self):
246         """ deactivate SHA1 authentication """
247         key = self.factory.create_random_key(self)
248         key.add_vpp_config()
249         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
250         session.add_vpp_config()
251         session.activate_auth(key)
252         session.deactivate_auth()
253
254     def test_change_key(self):
255         """ change SHA1 key """
256         key1 = self.factory.create_random_key(self)
257         key2 = self.factory.create_random_key(self)
258         while key2.conf_key_id == key1.conf_key_id:
259             key2 = self.factory.create_random_key(self)
260         key1.add_vpp_config()
261         key2.add_vpp_config()
262         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
263                                    sha1_key=key1)
264         session.add_vpp_config()
265         session.activate_auth(key2)
266
267     def test_set_del_udp_echo_source(self):
268         """ set/del udp echo source """
269         self.create_loopback_interfaces(1)
270         self.loopback0 = self.lo_interfaces[0]
271         self.loopback0.admin_up()
272         echo_source = self.vapi.bfd_udp_get_echo_source()
273         self.assertFalse(echo_source.is_set)
274         self.assertFalse(echo_source.have_usable_ip4)
275         self.assertFalse(echo_source.have_usable_ip6)
276
277         self.vapi.bfd_udp_set_echo_source(
278             sw_if_index=self.loopback0.sw_if_index)
279         echo_source = self.vapi.bfd_udp_get_echo_source()
280         self.assertTrue(echo_source.is_set)
281         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282         self.assertFalse(echo_source.have_usable_ip4)
283         self.assertFalse(echo_source.have_usable_ip6)
284
285         self.loopback0.config_ip4()
286         unpacked = unpack("!L", self.loopback0.local_ip4n)
287         echo_ip4 = pack("!L", unpacked[0] ^ 1)
288         echo_source = self.vapi.bfd_udp_get_echo_source()
289         self.assertTrue(echo_source.is_set)
290         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
291         self.assertTrue(echo_source.have_usable_ip4)
292         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
293         self.assertFalse(echo_source.have_usable_ip6)
294
295         self.loopback0.config_ip6()
296         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
297         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
298                         unpacked[3] ^ 1)
299         echo_source = self.vapi.bfd_udp_get_echo_source()
300         self.assertTrue(echo_source.is_set)
301         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
302         self.assertTrue(echo_source.have_usable_ip4)
303         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
304         self.assertTrue(echo_source.have_usable_ip6)
305         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
306
307         self.vapi.bfd_udp_del_echo_source()
308         echo_source = self.vapi.bfd_udp_get_echo_source()
309         self.assertFalse(echo_source.is_set)
310         self.assertFalse(echo_source.have_usable_ip4)
311         self.assertFalse(echo_source.have_usable_ip6)
312
313
314 class BFDTestSession(object):
315     """ BFD session as seen from test framework side """
316
317     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
318                  bfd_key_id=None, our_seq_number=None,
319                  tunnel_header=None, phy_interface=None):
320         self.test = test
321         self.af = af
322         self.sha1_key = sha1_key
323         self.bfd_key_id = bfd_key_id
324         self.interface = interface
325         if phy_interface:
326             self.phy_interface = phy_interface
327         else:
328             self.phy_interface = self.interface
329         self.udp_sport = randint(49152, 65535)
330         if our_seq_number is None:
331             self.our_seq_number = randint(0, 40000000)
332         else:
333             self.our_seq_number = our_seq_number
334         self.vpp_seq_number = None
335         self.my_discriminator = 0
336         self.desired_min_tx = 300000
337         self.required_min_rx = 300000
338         self.required_min_echo_rx = None
339         self.detect_mult = detect_mult
340         self.diag = BFDDiagCode.no_diagnostic
341         self.your_discriminator = None
342         self.state = BFDState.down
343         self.auth_type = BFDAuthType.no_auth
344         self.tunnel_header = tunnel_header
345
346     def inc_seq_num(self):
347         """ increment sequence number, wrapping if needed """
348         if self.our_seq_number == 0xFFFFFFFF:
349             self.our_seq_number = 0
350         else:
351             self.our_seq_number += 1
352
353     def update(self, my_discriminator=None, your_discriminator=None,
354                desired_min_tx=None, required_min_rx=None,
355                required_min_echo_rx=None, detect_mult=None,
356                diag=None, state=None, auth_type=None):
357         """ update BFD parameters associated with session """
358         if my_discriminator is not None:
359             self.my_discriminator = my_discriminator
360         if your_discriminator is not None:
361             self.your_discriminator = your_discriminator
362         if required_min_rx is not None:
363             self.required_min_rx = required_min_rx
364         if required_min_echo_rx is not None:
365             self.required_min_echo_rx = required_min_echo_rx
366         if desired_min_tx is not None:
367             self.desired_min_tx = desired_min_tx
368         if detect_mult is not None:
369             self.detect_mult = detect_mult
370         if diag is not None:
371             self.diag = diag
372         if state is not None:
373             self.state = state
374         if auth_type is not None:
375             self.auth_type = auth_type
376
377     def fill_packet_fields(self, packet):
378         """ set packet fields with known values in packet """
379         bfd = packet[BFD]
380         if self.my_discriminator:
381             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
382                                    self.my_discriminator)
383             bfd.my_discriminator = self.my_discriminator
384         if self.your_discriminator:
385             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
386                                    self.your_discriminator)
387             bfd.your_discriminator = self.your_discriminator
388         if self.required_min_rx:
389             self.test.logger.debug(
390                 "BFD: setting packet.required_min_rx_interval=%s",
391                 self.required_min_rx)
392             bfd.required_min_rx_interval = self.required_min_rx
393         if self.required_min_echo_rx:
394             self.test.logger.debug(
395                 "BFD: setting packet.required_min_echo_rx=%s",
396                 self.required_min_echo_rx)
397             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
398         if self.desired_min_tx:
399             self.test.logger.debug(
400                 "BFD: setting packet.desired_min_tx_interval=%s",
401                 self.desired_min_tx)
402             bfd.desired_min_tx_interval = self.desired_min_tx
403         if self.detect_mult:
404             self.test.logger.debug(
405                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
406             bfd.detect_mult = self.detect_mult
407         if self.diag:
408             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
409             bfd.diag = self.diag
410         if self.state:
411             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
412             bfd.state = self.state
413         if self.auth_type:
414             # this is used by a negative test-case
415             self.test.logger.debug("BFD: setting packet.auth_type=%s",
416                                    self.auth_type)
417             bfd.auth_type = self.auth_type
418
419     def create_packet(self):
420         """ create a BFD packet, reflecting the current state of session """
421         if self.sha1_key:
422             bfd = BFD(flags="A")
423             bfd.auth_type = self.sha1_key.auth_type
424             bfd.auth_len = BFD.sha1_auth_len
425             bfd.auth_key_id = self.bfd_key_id
426             bfd.auth_seq_num = self.our_seq_number
427             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
428         else:
429             bfd = BFD()
430         packet = Ether(src=self.phy_interface.remote_mac,
431                        dst=self.phy_interface.local_mac)
432         if self.tunnel_header:
433             packet = packet / self.tunnel_header
434         if self.af == AF_INET6:
435             packet = (packet /
436                       IPv6(src=self.interface.remote_ip6,
437                            dst=self.interface.local_ip6,
438                            hlim=255) /
439                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
440                       bfd)
441         else:
442             packet = (packet /
443                       IP(src=self.interface.remote_ip4,
444                          dst=self.interface.local_ip4,
445                          ttl=255) /
446                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
447                       bfd)
448         self.test.logger.debug("BFD: Creating packet")
449         self.fill_packet_fields(packet)
450         if self.sha1_key:
451             hash_material = scapy.compat.raw(
452                 packet[BFD])[:32] + self.sha1_key.key + \
453                 b"\0" * (20 - len(self.sha1_key.key))
454             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
455                                    hashlib.sha1(hash_material).hexdigest())
456             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
457         return packet
458
459     def send_packet(self, packet=None, interface=None):
460         """ send packet on interface, creating the packet if needed """
461         if packet is None:
462             packet = self.create_packet()
463         if interface is None:
464             interface = self.phy_interface
465         self.test.logger.debug(ppp("Sending packet:", packet))
466         interface.add_stream(packet)
467         self.test.pg_start()
468
469     def verify_sha1_auth(self, packet):
470         """ Verify correctness of authentication in BFD layer. """
471         bfd = packet[BFD]
472         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
473         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
474                                BFDAuthType)
475         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
476         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
477         if self.vpp_seq_number is None:
478             self.vpp_seq_number = bfd.auth_seq_num
479             self.test.logger.debug("Received initial sequence number: %s" %
480                                    self.vpp_seq_number)
481         else:
482             recvd_seq_num = bfd.auth_seq_num
483             self.test.logger.debug("Received followup sequence number: %s" %
484                                    recvd_seq_num)
485             if self.vpp_seq_number < 0xffffffff:
486                 if self.sha1_key.auth_type == \
487                         BFDAuthType.meticulous_keyed_sha1:
488                     self.test.assert_equal(recvd_seq_num,
489                                            self.vpp_seq_number + 1,
490                                            "BFD sequence number")
491                 else:
492                     self.test.assert_in_range(recvd_seq_num,
493                                               self.vpp_seq_number,
494                                               self.vpp_seq_number + 1,
495                                               "BFD sequence number")
496             else:
497                 if self.sha1_key.auth_type == \
498                         BFDAuthType.meticulous_keyed_sha1:
499                     self.test.assert_equal(recvd_seq_num, 0,
500                                            "BFD sequence number")
501                 else:
502                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
503                                        "BFD sequence number not one of "
504                                        "(%s, 0)" % self.vpp_seq_number)
505             self.vpp_seq_number = recvd_seq_num
506         # last 20 bytes represent the hash - so replace them with the key,
507         # pad the result with zeros and hash the result
508         hash_material = bfd.original[:-20] + self.sha1_key.key + \
509             b"\0" * (20 - len(self.sha1_key.key))
510         expected_hash = hashlib.sha1(hash_material).hexdigest()
511         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
512                                expected_hash.encode(), "Auth key hash")
513
514     def verify_bfd(self, packet):
515         """ Verify correctness of BFD layer. """
516         bfd = packet[BFD]
517         self.test.assert_equal(bfd.version, 1, "BFD version")
518         self.test.assert_equal(bfd.your_discriminator,
519                                self.my_discriminator,
520                                "BFD - your discriminator")
521         if self.sha1_key:
522             self.verify_sha1_auth(packet)
523
524
525 def bfd_session_up(test):
526     """ Bring BFD session up """
527     test.logger.info("BFD: Waiting for slow hello")
528     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
529     old_offset = None
530     if hasattr(test, 'vpp_clock_offset'):
531         old_offset = test.vpp_clock_offset
532     test.vpp_clock_offset = time.time() - float(p.time)
533     test.logger.debug("BFD: Calculated vpp clock offset: %s",
534                       test.vpp_clock_offset)
535     if old_offset:
536         test.assertAlmostEqual(
537             old_offset, test.vpp_clock_offset, delta=0.5,
538             msg="vpp clock offset not stable (new: %s, old: %s)" %
539             (test.vpp_clock_offset, old_offset))
540     test.logger.info("BFD: Sending Init")
541     test.test_session.update(my_discriminator=randint(0, 40000000),
542                              your_discriminator=p[BFD].my_discriminator,
543                              state=BFDState.init)
544     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
545             BFDAuthType.meticulous_keyed_sha1:
546         test.test_session.inc_seq_num()
547     test.test_session.send_packet()
548     test.logger.info("BFD: Waiting for event")
549     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
550     verify_event(test, e, expected_state=BFDState.up)
551     test.logger.info("BFD: Session is Up")
552     test.test_session.update(state=BFDState.up)
553     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554             BFDAuthType.meticulous_keyed_sha1:
555         test.test_session.inc_seq_num()
556     test.test_session.send_packet()
557     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
558
559
560 def bfd_session_down(test):
561     """ Bring BFD session down """
562     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563     test.test_session.update(state=BFDState.down)
564     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
565             BFDAuthType.meticulous_keyed_sha1:
566         test.test_session.inc_seq_num()
567     test.test_session.send_packet()
568     test.logger.info("BFD: Waiting for event")
569     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
570     verify_event(test, e, expected_state=BFDState.down)
571     test.logger.info("BFD: Session is Down")
572     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
573
574
575 def verify_bfd_session_config(test, session, state=None):
576     dump = session.get_bfd_udp_session_dump_entry()
577     test.assertIsNotNone(dump)
578     # since dump is not none, we have verified that sw_if_index and addresses
579     # are valid (in get_bfd_udp_session_dump_entry)
580     if state:
581         test.assert_equal(dump.state, state, "session state")
582     test.assert_equal(dump.required_min_rx, session.required_min_rx,
583                       "required min rx interval")
584     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
585                       "desired min tx interval")
586     test.assert_equal(dump.detect_mult, session.detect_mult,
587                       "detect multiplier")
588     if session.sha1_key is None:
589         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
590     else:
591         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
592         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
593                           "bfd key id")
594         test.assert_equal(dump.conf_key_id,
595                           session.sha1_key.conf_key_id,
596                           "config key id")
597
598
599 def verify_ip(test, packet):
600     """ Verify correctness of IP layer. """
601     if test.vpp_session.af == AF_INET6:
602         ip = packet[IPv6]
603         local_ip = test.vpp_session.interface.local_ip6
604         remote_ip = test.vpp_session.interface.remote_ip6
605         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
606     else:
607         ip = packet[IP]
608         local_ip = test.vpp_session.interface.local_ip4
609         remote_ip = test.vpp_session.interface.remote_ip4
610         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
611     test.assert_equal(ip.src, local_ip, "IP source address")
612     test.assert_equal(ip.dst, remote_ip, "IP destination address")
613
614
615 def verify_udp(test, packet):
616     """ Verify correctness of UDP layer. """
617     udp = packet[UDP]
618     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
619     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
620                          "UDP source port")
621
622
623 def verify_event(test, event, expected_state):
624     """ Verify correctness of event values. """
625     e = event
626     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
627     test.assert_equal(e.sw_if_index,
628                       test.vpp_session.interface.sw_if_index,
629                       "BFD interface index")
630
631     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
632                       "Local IPv6 address")
633     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
634                       "Peer IPv6 address")
635     test.assert_equal(e.state, expected_state, BFDState)
636
637
638 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
639     """ wait for BFD packet and verify its correctness
640
641     :param timeout: how long to wait
642     :param pcap_time_min: ignore packets with pcap timestamp lower than this
643
644     :returns: tuple (packet, time spent waiting for packet)
645     """
646     test.logger.info("BFD: Waiting for BFD packet")
647     deadline = time.time() + timeout
648     counter = 0
649     while True:
650         counter += 1
651         # sanity check
652         test.assert_in_range(counter, 0, 100, "number of packets ignored")
653         time_left = deadline - time.time()
654         if time_left < 0:
655             raise CaptureTimeoutError("Packet did not arrive within timeout")
656         p = test.pg0.wait_for_packet(timeout=time_left)
657         test.logger.debug(ppp("BFD: Got packet:", p))
658         if pcap_time_min is not None and p.time < pcap_time_min:
659             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
660                                   "pcap time min %s):" %
661                                   (p.time, pcap_time_min), p))
662         else:
663             break
664     if is_tunnel:
665         # strip an IP layer and move to the next
666         p = p[IP].payload
667
668     bfd = p[BFD]
669     if bfd is None:
670         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
671     if bfd.payload:
672         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
673     verify_ip(test, p)
674     verify_udp(test, p)
675     test.test_session.verify_bfd(p)
676     return p
677
678
679 class BFD4TestCase(VppTestCase):
680     """Bidirectional Forwarding Detection (BFD)"""
681
682     pg0 = None
683     vpp_clock_offset = None
684     vpp_session = None
685     test_session = None
686
687     @classmethod
688     def setUpClass(cls):
689         super(BFD4TestCase, cls).setUpClass()
690         cls.vapi.cli("set log class bfd level debug")
691         try:
692             cls.create_pg_interfaces([0])
693             cls.create_loopback_interfaces(1)
694             cls.loopback0 = cls.lo_interfaces[0]
695             cls.loopback0.config_ip4()
696             cls.loopback0.admin_up()
697             cls.pg0.config_ip4()
698             cls.pg0.configure_ipv4_neighbors()
699             cls.pg0.admin_up()
700             cls.pg0.resolve_arp()
701
702         except Exception:
703             super(BFD4TestCase, cls).tearDownClass()
704             raise
705
706     @classmethod
707     def tearDownClass(cls):
708         super(BFD4TestCase, cls).tearDownClass()
709
710     def setUp(self):
711         super(BFD4TestCase, self).setUp()
712         self.factory = AuthKeyFactory()
713         self.vapi.want_bfd_events()
714         self.pg0.enable_capture()
715         try:
716             self.vpp_session = VppBFDUDPSession(self, self.pg0,
717                                                 self.pg0.remote_ip4)
718             self.vpp_session.add_vpp_config()
719             self.vpp_session.admin_up()
720             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
721         except BaseException:
722             self.vapi.want_bfd_events(enable_disable=0)
723             raise
724
725     def tearDown(self):
726         if not self.vpp_dead:
727             self.vapi.want_bfd_events(enable_disable=0)
728         self.vapi.collect_events()  # clear the event queue
729         super(BFD4TestCase, self).tearDown()
730
731     def test_session_up(self):
732         """ bring BFD session up """
733         bfd_session_up(self)
734
735     def test_session_up_by_ip(self):
736         """ bring BFD session up - first frame looked up by address pair """
737         self.logger.info("BFD: Sending Slow control frame")
738         self.test_session.update(my_discriminator=randint(0, 40000000))
739         self.test_session.send_packet()
740         self.pg0.enable_capture()
741         p = self.pg0.wait_for_packet(1)
742         self.assert_equal(p[BFD].your_discriminator,
743                           self.test_session.my_discriminator,
744                           "BFD - your discriminator")
745         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
746         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
747                                  state=BFDState.up)
748         self.logger.info("BFD: Waiting for event")
749         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750         verify_event(self, e, expected_state=BFDState.init)
751         self.logger.info("BFD: Sending Up")
752         self.test_session.send_packet()
753         self.logger.info("BFD: Waiting for event")
754         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755         verify_event(self, e, expected_state=BFDState.up)
756         self.logger.info("BFD: Session is Up")
757         self.test_session.update(state=BFDState.up)
758         self.test_session.send_packet()
759         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
760
761     def test_session_down(self):
762         """ bring BFD session down """
763         bfd_session_up(self)
764         bfd_session_down(self)
765
766     def test_hold_up(self):
767         """ hold BFD session up """
768         bfd_session_up(self)
769         for dummy in range(self.test_session.detect_mult * 2):
770             wait_for_bfd_packet(self)
771             self.test_session.send_packet()
772         self.assert_equal(len(self.vapi.collect_events()), 0,
773                           "number of bfd events")
774
775     def test_slow_timer(self):
776         """ verify slow periodic control frames while session down """
777         packet_count = 3
778         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
779         prev_packet = wait_for_bfd_packet(self, 2)
780         for dummy in range(packet_count):
781             next_packet = wait_for_bfd_packet(self, 2)
782             time_diff = next_packet.time - prev_packet.time
783             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
784             # to work around timing issues
785             self.assert_in_range(
786                 time_diff, 0.70, 1.05, "time between slow packets")
787             prev_packet = next_packet
788
789     def test_zero_remote_min_rx(self):
790         """ no packets when zero remote required min rx interval """
791         bfd_session_up(self)
792         self.test_session.update(required_min_rx=0)
793         self.test_session.send_packet()
794         for dummy in range(self.test_session.detect_mult):
795             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
796                        "sleep before transmitting bfd packet")
797             self.test_session.send_packet()
798             try:
799                 p = wait_for_bfd_packet(self, timeout=0)
800                 self.logger.error(ppp("Received unexpected packet:", p))
801             except CaptureTimeoutError:
802                 pass
803         self.assert_equal(
804             len(self.vapi.collect_events()), 0, "number of bfd events")
805         self.test_session.update(required_min_rx=300000)
806         for dummy in range(3):
807             self.test_session.send_packet()
808             wait_for_bfd_packet(
809                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
810         self.assert_equal(
811             len(self.vapi.collect_events()), 0, "number of bfd events")
812
813     def test_conn_down(self):
814         """ verify session goes down after inactivity """
815         bfd_session_up(self)
816         detection_time = self.test_session.detect_mult *\
817             self.vpp_session.required_min_rx / USEC_IN_SEC
818         self.sleep(detection_time, "waiting for BFD session time-out")
819         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
820         verify_event(self, e, expected_state=BFDState.down)
821
822     def test_large_required_min_rx(self):
823         """ large remote required min rx interval """
824         bfd_session_up(self)
825         p = wait_for_bfd_packet(self)
826         interval = 3000000
827         self.test_session.update(required_min_rx=interval)
828         self.test_session.send_packet()
829         time_mark = time.time()
830         count = 0
831         # busy wait here, trying to collect a packet or event, vpp is not
832         # allowed to send packets and the session will timeout first - so the
833         # Up->Down event must arrive before any packets do
834         while time.time() < time_mark + interval / USEC_IN_SEC:
835             try:
836                 p = wait_for_bfd_packet(self, timeout=0)
837                 # if vpp managed to send a packet before we did the session
838                 # session update, then that's fine, ignore it
839                 if p.time < time_mark - self.vpp_clock_offset:
840                     continue
841                 self.logger.error(ppp("Received unexpected packet:", p))
842                 count += 1
843             except CaptureTimeoutError:
844                 pass
845             events = self.vapi.collect_events()
846             if len(events) > 0:
847                 verify_event(self, events[0], BFDState.down)
848                 break
849         self.assert_equal(count, 0, "number of packets received")
850
851     def test_immediate_remote_min_rx_reduction(self):
852         """ immediately honor remote required min rx reduction """
853         self.vpp_session.remove_vpp_config()
854         self.vpp_session = VppBFDUDPSession(
855             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
856         self.pg0.enable_capture()
857         self.vpp_session.add_vpp_config()
858         self.test_session.update(desired_min_tx=1000000,
859                                  required_min_rx=1000000)
860         bfd_session_up(self)
861         reference_packet = wait_for_bfd_packet(self)
862         time_mark = time.time()
863         interval = 300000
864         self.test_session.update(required_min_rx=interval)
865         self.test_session.send_packet()
866         extra_time = time.time() - time_mark
867         p = wait_for_bfd_packet(self)
868         # first packet is allowed to be late by time we spent doing the update
869         # calculated in extra_time
870         self.assert_in_range(p.time - reference_packet.time,
871                              .95 * 0.75 * interval / USEC_IN_SEC,
872                              1.05 * interval / USEC_IN_SEC + extra_time,
873                              "time between BFD packets")
874         reference_packet = p
875         for dummy in range(3):
876             p = wait_for_bfd_packet(self)
877             diff = p.time - reference_packet.time
878             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
879                                  1.05 * interval / USEC_IN_SEC,
880                                  "time between BFD packets")
881             reference_packet = p
882
883     def test_modify_req_min_rx_double(self):
884         """ modify session - double required min rx """
885         bfd_session_up(self)
886         p = wait_for_bfd_packet(self)
887         self.test_session.update(desired_min_tx=10000,
888                                  required_min_rx=10000)
889         self.test_session.send_packet()
890         # double required min rx
891         self.vpp_session.modify_parameters(
892             required_min_rx=2 * self.vpp_session.required_min_rx)
893         p = wait_for_bfd_packet(
894             self, pcap_time_min=time.time() - self.vpp_clock_offset)
895         # poll bit needs to be set
896         self.assertIn("P", p.sprintf("%BFD.flags%"),
897                       "Poll bit not set in BFD packet")
898         # finish poll sequence with final packet
899         final = self.test_session.create_packet()
900         final[BFD].flags = "F"
901         timeout = self.test_session.detect_mult * \
902             max(self.test_session.desired_min_tx,
903                 self.vpp_session.required_min_rx) / USEC_IN_SEC
904         self.test_session.send_packet(final)
905         time_mark = time.time()
906         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
907         verify_event(self, e, expected_state=BFDState.down)
908         time_to_event = time.time() - time_mark
909         self.assert_in_range(time_to_event, .9 * timeout,
910                              1.1 * timeout, "session timeout")
911
912     def test_modify_req_min_rx_halve(self):
913         """ modify session - halve required min rx """
914         self.vpp_session.modify_parameters(
915             required_min_rx=2 * self.vpp_session.required_min_rx)
916         bfd_session_up(self)
917         p = wait_for_bfd_packet(self)
918         self.test_session.update(desired_min_tx=10000,
919                                  required_min_rx=10000)
920         self.test_session.send_packet()
921         p = wait_for_bfd_packet(
922             self, pcap_time_min=time.time() - self.vpp_clock_offset)
923         # halve required min rx
924         old_required_min_rx = self.vpp_session.required_min_rx
925         self.vpp_session.modify_parameters(
926             required_min_rx=self.vpp_session.required_min_rx // 2)
927         # now we wait 0.8*3*old-req-min-rx and the session should still be up
928         self.sleep(0.8 * self.vpp_session.detect_mult *
929                    old_required_min_rx / USEC_IN_SEC,
930                    "wait before finishing poll sequence")
931         self.assert_equal(len(self.vapi.collect_events()), 0,
932                           "number of bfd events")
933         p = wait_for_bfd_packet(self)
934         # poll bit needs to be set
935         self.assertIn("P", p.sprintf("%BFD.flags%"),
936                       "Poll bit not set in BFD packet")
937         # finish poll sequence with final packet
938         final = self.test_session.create_packet()
939         final[BFD].flags = "F"
940         self.test_session.send_packet(final)
941         # now the session should time out under new conditions
942         detection_time = self.test_session.detect_mult *\
943             self.vpp_session.required_min_rx / USEC_IN_SEC
944         before = time.time()
945         e = self.vapi.wait_for_event(
946             2 * detection_time, "bfd_udp_session_details")
947         after = time.time()
948         self.assert_in_range(after - before,
949                              0.9 * detection_time,
950                              1.1 * detection_time,
951                              "time before bfd session goes down")
952         verify_event(self, e, expected_state=BFDState.down)
953
954     def test_modify_detect_mult(self):
955         """ modify detect multiplier """
956         bfd_session_up(self)
957         p = wait_for_bfd_packet(self)
958         self.vpp_session.modify_parameters(detect_mult=1)
959         p = wait_for_bfd_packet(
960             self, pcap_time_min=time.time() - self.vpp_clock_offset)
961         self.assert_equal(self.vpp_session.detect_mult,
962                           p[BFD].detect_mult,
963                           "detect mult")
964         # poll bit must not be set
965         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
966                          "Poll bit not set in BFD packet")
967         self.vpp_session.modify_parameters(detect_mult=10)
968         p = wait_for_bfd_packet(
969             self, pcap_time_min=time.time() - self.vpp_clock_offset)
970         self.assert_equal(self.vpp_session.detect_mult,
971                           p[BFD].detect_mult,
972                           "detect mult")
973         # poll bit must not be set
974         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
975                          "Poll bit not set in BFD packet")
976
977     def test_queued_poll(self):
978         """ test poll sequence queueing """
979         bfd_session_up(self)
980         p = wait_for_bfd_packet(self)
981         self.vpp_session.modify_parameters(
982             required_min_rx=2 * self.vpp_session.required_min_rx)
983         p = wait_for_bfd_packet(self)
984         poll_sequence_start = time.time()
985         poll_sequence_length_min = 0.5
986         send_final_after = time.time() + poll_sequence_length_min
987         # poll bit needs to be set
988         self.assertIn("P", p.sprintf("%BFD.flags%"),
989                       "Poll bit not set in BFD packet")
990         self.assert_equal(p[BFD].required_min_rx_interval,
991                           self.vpp_session.required_min_rx,
992                           "BFD required min rx interval")
993         self.vpp_session.modify_parameters(
994             required_min_rx=2 * self.vpp_session.required_min_rx)
995         # 2nd poll sequence should be queued now
996         # don't send the reply back yet, wait for some time to emulate
997         # longer round-trip time
998         packet_count = 0
999         while time.time() < send_final_after:
1000             self.test_session.send_packet()
1001             p = wait_for_bfd_packet(self)
1002             self.assert_equal(len(self.vapi.collect_events()), 0,
1003                               "number of bfd events")
1004             self.assert_equal(p[BFD].required_min_rx_interval,
1005                               self.vpp_session.required_min_rx,
1006                               "BFD required min rx interval")
1007             packet_count += 1
1008             # poll bit must be set
1009             self.assertIn("P", p.sprintf("%BFD.flags%"),
1010                           "Poll bit not set in BFD packet")
1011         final = self.test_session.create_packet()
1012         final[BFD].flags = "F"
1013         self.test_session.send_packet(final)
1014         # finish 1st with final
1015         poll_sequence_length = time.time() - poll_sequence_start
1016         # vpp must wait for some time before starting new poll sequence
1017         poll_no_2_started = False
1018         for dummy in range(2 * packet_count):
1019             p = wait_for_bfd_packet(self)
1020             self.assert_equal(len(self.vapi.collect_events()), 0,
1021                               "number of bfd events")
1022             if "P" in p.sprintf("%BFD.flags%"):
1023                 poll_no_2_started = True
1024                 if time.time() < poll_sequence_start + poll_sequence_length:
1025                     raise Exception("VPP started 2nd poll sequence too soon")
1026                 final = self.test_session.create_packet()
1027                 final[BFD].flags = "F"
1028                 self.test_session.send_packet(final)
1029                 break
1030             else:
1031                 self.test_session.send_packet()
1032         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1033         # finish 2nd with final
1034         final = self.test_session.create_packet()
1035         final[BFD].flags = "F"
1036         self.test_session.send_packet(final)
1037         p = wait_for_bfd_packet(self)
1038         # poll bit must not be set
1039         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1040                          "Poll bit set in BFD packet")
1041
1042     def test_poll_response(self):
1043         """ test correct response to control frame with poll bit set """
1044         bfd_session_up(self)
1045         poll = self.test_session.create_packet()
1046         poll[BFD].flags = "P"
1047         self.test_session.send_packet(poll)
1048         final = wait_for_bfd_packet(
1049             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1050         self.assertIn("F", final.sprintf("%BFD.flags%"))
1051
1052     def test_no_periodic_if_remote_demand(self):
1053         """ no periodic frames outside poll sequence if remote demand set """
1054         bfd_session_up(self)
1055         demand = self.test_session.create_packet()
1056         demand[BFD].flags = "D"
1057         self.test_session.send_packet(demand)
1058         transmit_time = 0.9 \
1059             * max(self.vpp_session.required_min_rx,
1060                   self.test_session.desired_min_tx) \
1061             / USEC_IN_SEC
1062         count = 0
1063         for dummy in range(self.test_session.detect_mult * 2):
1064             self.sleep(transmit_time)
1065             self.test_session.send_packet(demand)
1066             try:
1067                 p = wait_for_bfd_packet(self, timeout=0)
1068                 self.logger.error(ppp("Received unexpected packet:", p))
1069                 count += 1
1070             except CaptureTimeoutError:
1071                 pass
1072         events = self.vapi.collect_events()
1073         for e in events:
1074             self.logger.error("Received unexpected event: %s", e)
1075         self.assert_equal(count, 0, "number of packets received")
1076         self.assert_equal(len(events), 0, "number of events received")
1077
1078     def test_echo_looped_back(self):
1079         """ echo packets looped back """
1080         # don't need a session in this case..
1081         self.vpp_session.remove_vpp_config()
1082         self.pg0.enable_capture()
1083         echo_packet_count = 10
1084         # random source port low enough to increment a few times..
1085         udp_sport_tx = randint(1, 50000)
1086         udp_sport_rx = udp_sport_tx
1087         echo_packet = (Ether(src=self.pg0.remote_mac,
1088                              dst=self.pg0.local_mac) /
1089                        IP(src=self.pg0.remote_ip4,
1090                           dst=self.pg0.remote_ip4) /
1091                        UDP(dport=BFD.udp_dport_echo) /
1092                        Raw("this should be looped back"))
1093         for dummy in range(echo_packet_count):
1094             self.sleep(.01, "delay between echo packets")
1095             echo_packet[UDP].sport = udp_sport_tx
1096             udp_sport_tx += 1
1097             self.logger.debug(ppp("Sending packet:", echo_packet))
1098             self.pg0.add_stream(echo_packet)
1099             self.pg_start()
1100         for dummy in range(echo_packet_count):
1101             p = self.pg0.wait_for_packet(1)
1102             self.logger.debug(ppp("Got packet:", p))
1103             ether = p[Ether]
1104             self.assert_equal(self.pg0.remote_mac,
1105                               ether.dst, "Destination MAC")
1106             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1107             ip = p[IP]
1108             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1109             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1110             udp = p[UDP]
1111             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1112                               "UDP destination port")
1113             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1114             udp_sport_rx += 1
1115             # need to compare the hex payload here, otherwise BFD_vpp_echo
1116             # gets in way
1117             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1118                              scapy.compat.raw(echo_packet[UDP].payload),
1119                              "Received packet is not the echo packet sent")
1120         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1121                           "ECHO packet identifier for test purposes)")
1122
1123     def test_echo(self):
1124         """ echo function """
1125         bfd_session_up(self)
1126         self.test_session.update(required_min_echo_rx=150000)
1127         self.test_session.send_packet()
1128         detection_time = self.test_session.detect_mult *\
1129             self.vpp_session.required_min_rx / USEC_IN_SEC
1130         # echo shouldn't work without echo source set
1131         for dummy in range(10):
1132             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1133             self.sleep(sleep, "delay before sending bfd packet")
1134             self.test_session.send_packet()
1135         p = wait_for_bfd_packet(
1136             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1137         self.assert_equal(p[BFD].required_min_rx_interval,
1138                           self.vpp_session.required_min_rx,
1139                           "BFD required min rx interval")
1140         self.test_session.send_packet()
1141         self.vapi.bfd_udp_set_echo_source(
1142             sw_if_index=self.loopback0.sw_if_index)
1143         echo_seen = False
1144         # should be turned on - loopback echo packets
1145         for dummy in range(3):
1146             loop_until = time.time() + 0.75 * detection_time
1147             while time.time() < loop_until:
1148                 p = self.pg0.wait_for_packet(1)
1149                 self.logger.debug(ppp("Got packet:", p))
1150                 if p[UDP].dport == BFD.udp_dport_echo:
1151                     self.assert_equal(
1152                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1153                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1154                                         "BFD ECHO src IP equal to loopback IP")
1155                     self.logger.debug(ppp("Looping back packet:", p))
1156                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1157                                       "ECHO packet destination MAC address")
1158                     p[Ether].dst = self.pg0.local_mac
1159                     self.pg0.add_stream(p)
1160                     self.pg_start()
1161                     echo_seen = True
1162                 elif p.haslayer(BFD):
1163                     if echo_seen:
1164                         self.assertGreaterEqual(
1165                             p[BFD].required_min_rx_interval,
1166                             1000000)
1167                     if "P" in p.sprintf("%BFD.flags%"):
1168                         final = self.test_session.create_packet()
1169                         final[BFD].flags = "F"
1170                         self.test_session.send_packet(final)
1171                 else:
1172                     raise Exception(ppp("Received unknown packet:", p))
1173
1174                 self.assert_equal(len(self.vapi.collect_events()), 0,
1175                                   "number of bfd events")
1176             self.test_session.send_packet()
1177         self.assertTrue(echo_seen, "No echo packets received")
1178
1179     def test_echo_fail(self):
1180         """ session goes down if echo function fails """
1181         bfd_session_up(self)
1182         self.test_session.update(required_min_echo_rx=150000)
1183         self.test_session.send_packet()
1184         detection_time = self.test_session.detect_mult *\
1185             self.vpp_session.required_min_rx / USEC_IN_SEC
1186         self.vapi.bfd_udp_set_echo_source(
1187             sw_if_index=self.loopback0.sw_if_index)
1188         # echo function should be used now, but we will drop the echo packets
1189         verified_diag = False
1190         for dummy in range(3):
1191             loop_until = time.time() + 0.75 * detection_time
1192             while time.time() < loop_until:
1193                 p = self.pg0.wait_for_packet(1)
1194                 self.logger.debug(ppp("Got packet:", p))
1195                 if p[UDP].dport == BFD.udp_dport_echo:
1196                     # dropped
1197                     pass
1198                 elif p.haslayer(BFD):
1199                     if "P" in p.sprintf("%BFD.flags%"):
1200                         self.assertGreaterEqual(
1201                             p[BFD].required_min_rx_interval,
1202                             1000000)
1203                         final = self.test_session.create_packet()
1204                         final[BFD].flags = "F"
1205                         self.test_session.send_packet(final)
1206                     if p[BFD].state == BFDState.down:
1207                         self.assert_equal(p[BFD].diag,
1208                                           BFDDiagCode.echo_function_failed,
1209                                           BFDDiagCode)
1210                         verified_diag = True
1211                 else:
1212                     raise Exception(ppp("Received unknown packet:", p))
1213             self.test_session.send_packet()
1214         events = self.vapi.collect_events()
1215         self.assert_equal(len(events), 1, "number of bfd events")
1216         self.assert_equal(events[0].state, BFDState.down, BFDState)
1217         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1218
1219     def test_echo_stop(self):
1220         """ echo function stops if peer sets required min echo rx zero """
1221         bfd_session_up(self)
1222         self.test_session.update(required_min_echo_rx=150000)
1223         self.test_session.send_packet()
1224         self.vapi.bfd_udp_set_echo_source(
1225             sw_if_index=self.loopback0.sw_if_index)
1226         # wait for first echo packet
1227         while True:
1228             p = self.pg0.wait_for_packet(1)
1229             self.logger.debug(ppp("Got packet:", p))
1230             if p[UDP].dport == BFD.udp_dport_echo:
1231                 self.logger.debug(ppp("Looping back packet:", p))
1232                 p[Ether].dst = self.pg0.local_mac
1233                 self.pg0.add_stream(p)
1234                 self.pg_start()
1235                 break
1236             elif p.haslayer(BFD):
1237                 # ignore BFD
1238                 pass
1239             else:
1240                 raise Exception(ppp("Received unknown packet:", p))
1241         self.test_session.update(required_min_echo_rx=0)
1242         self.test_session.send_packet()
1243         # echo packets shouldn't arrive anymore
1244         for dummy in range(5):
1245             wait_for_bfd_packet(
1246                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1247             self.test_session.send_packet()
1248             events = self.vapi.collect_events()
1249             self.assert_equal(len(events), 0, "number of bfd events")
1250
1251     def test_echo_source_removed(self):
1252         """ echo function stops if echo source is removed """
1253         bfd_session_up(self)
1254         self.test_session.update(required_min_echo_rx=150000)
1255         self.test_session.send_packet()
1256         self.vapi.bfd_udp_set_echo_source(
1257             sw_if_index=self.loopback0.sw_if_index)
1258         # wait for first echo packet
1259         while True:
1260             p = self.pg0.wait_for_packet(1)
1261             self.logger.debug(ppp("Got packet:", p))
1262             if p[UDP].dport == BFD.udp_dport_echo:
1263                 self.logger.debug(ppp("Looping back packet:", p))
1264                 p[Ether].dst = self.pg0.local_mac
1265                 self.pg0.add_stream(p)
1266                 self.pg_start()
1267                 break
1268             elif p.haslayer(BFD):
1269                 # ignore BFD
1270                 pass
1271             else:
1272                 raise Exception(ppp("Received unknown packet:", p))
1273         self.vapi.bfd_udp_del_echo_source()
1274         self.test_session.send_packet()
1275         # echo packets shouldn't arrive anymore
1276         for dummy in range(5):
1277             wait_for_bfd_packet(
1278                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1279             self.test_session.send_packet()
1280             events = self.vapi.collect_events()
1281             self.assert_equal(len(events), 0, "number of bfd events")
1282
1283     def test_stale_echo(self):
1284         """ stale echo packets don't keep a session up """
1285         bfd_session_up(self)
1286         self.test_session.update(required_min_echo_rx=150000)
1287         self.vapi.bfd_udp_set_echo_source(
1288             sw_if_index=self.loopback0.sw_if_index)
1289         self.test_session.send_packet()
1290         # should be turned on - loopback echo packets
1291         echo_packet = None
1292         timeout_at = None
1293         timeout_ok = False
1294         for dummy in range(10 * self.vpp_session.detect_mult):
1295             p = self.pg0.wait_for_packet(1)
1296             if p[UDP].dport == BFD.udp_dport_echo:
1297                 if echo_packet is None:
1298                     self.logger.debug(ppp("Got first echo packet:", p))
1299                     echo_packet = p
1300                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1301                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1302                 else:
1303                     self.logger.debug(ppp("Got followup echo packet:", p))
1304                 self.logger.debug(ppp("Looping back first echo packet:", p))
1305                 echo_packet[Ether].dst = self.pg0.local_mac
1306                 self.pg0.add_stream(echo_packet)
1307                 self.pg_start()
1308             elif p.haslayer(BFD):
1309                 self.logger.debug(ppp("Got packet:", p))
1310                 if "P" in p.sprintf("%BFD.flags%"):
1311                     final = self.test_session.create_packet()
1312                     final[BFD].flags = "F"
1313                     self.test_session.send_packet(final)
1314                 if p[BFD].state == BFDState.down:
1315                     self.assertIsNotNone(
1316                         timeout_at,
1317                         "Session went down before first echo packet received")
1318                     now = time.time()
1319                     self.assertGreaterEqual(
1320                         now, timeout_at,
1321                         "Session timeout at %s, but is expected at %s" %
1322                         (now, timeout_at))
1323                     self.assert_equal(p[BFD].diag,
1324                                       BFDDiagCode.echo_function_failed,
1325                                       BFDDiagCode)
1326                     events = self.vapi.collect_events()
1327                     self.assert_equal(len(events), 1, "number of bfd events")
1328                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1329                     timeout_ok = True
1330                     break
1331             else:
1332                 raise Exception(ppp("Received unknown packet:", p))
1333             self.test_session.send_packet()
1334         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1335
1336     def test_invalid_echo_checksum(self):
1337         """ echo packets with invalid checksum don't keep a session up """
1338         bfd_session_up(self)
1339         self.test_session.update(required_min_echo_rx=150000)
1340         self.vapi.bfd_udp_set_echo_source(
1341             sw_if_index=self.loopback0.sw_if_index)
1342         self.test_session.send_packet()
1343         # should be turned on - loopback echo packets
1344         timeout_at = None
1345         timeout_ok = False
1346         for dummy in range(10 * self.vpp_session.detect_mult):
1347             p = self.pg0.wait_for_packet(1)
1348             if p[UDP].dport == BFD.udp_dport_echo:
1349                 self.logger.debug(ppp("Got echo packet:", p))
1350                 if timeout_at is None:
1351                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1352                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1353                 p[BFD_vpp_echo].checksum = getrandbits(64)
1354                 p[Ether].dst = self.pg0.local_mac
1355                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1356                 self.pg0.add_stream(p)
1357                 self.pg_start()
1358             elif p.haslayer(BFD):
1359                 self.logger.debug(ppp("Got packet:", p))
1360                 if "P" in p.sprintf("%BFD.flags%"):
1361                     final = self.test_session.create_packet()
1362                     final[BFD].flags = "F"
1363                     self.test_session.send_packet(final)
1364                 if p[BFD].state == BFDState.down:
1365                     self.assertIsNotNone(
1366                         timeout_at,
1367                         "Session went down before first echo packet received")
1368                     now = time.time()
1369                     self.assertGreaterEqual(
1370                         now, timeout_at,
1371                         "Session timeout at %s, but is expected at %s" %
1372                         (now, timeout_at))
1373                     self.assert_equal(p[BFD].diag,
1374                                       BFDDiagCode.echo_function_failed,
1375                                       BFDDiagCode)
1376                     events = self.vapi.collect_events()
1377                     self.assert_equal(len(events), 1, "number of bfd events")
1378                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1379                     timeout_ok = True
1380                     break
1381             else:
1382                 raise Exception(ppp("Received unknown packet:", p))
1383             self.test_session.send_packet()
1384         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1385
1386     def test_admin_up_down(self):
1387         """ put session admin-up and admin-down """
1388         bfd_session_up(self)
1389         self.vpp_session.admin_down()
1390         self.pg0.enable_capture()
1391         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1392         verify_event(self, e, expected_state=BFDState.admin_down)
1393         for dummy in range(2):
1394             p = wait_for_bfd_packet(self)
1395             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1396         # try to bring session up - shouldn't be possible
1397         self.test_session.update(state=BFDState.init)
1398         self.test_session.send_packet()
1399         for dummy in range(2):
1400             p = wait_for_bfd_packet(self)
1401             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1402         self.vpp_session.admin_up()
1403         self.test_session.update(state=BFDState.down)
1404         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1405         verify_event(self, e, expected_state=BFDState.down)
1406         p = wait_for_bfd_packet(
1407             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1408         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1409         self.test_session.send_packet()
1410         p = wait_for_bfd_packet(
1411             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1412         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1413         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1414         verify_event(self, e, expected_state=BFDState.init)
1415         self.test_session.update(state=BFDState.up)
1416         self.test_session.send_packet()
1417         p = wait_for_bfd_packet(
1418             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1419         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1420         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1421         verify_event(self, e, expected_state=BFDState.up)
1422
1423     def test_config_change_remote_demand(self):
1424         """ configuration change while peer in demand mode """
1425         bfd_session_up(self)
1426         demand = self.test_session.create_packet()
1427         demand[BFD].flags = "D"
1428         self.test_session.send_packet(demand)
1429         self.vpp_session.modify_parameters(
1430             required_min_rx=2 * self.vpp_session.required_min_rx)
1431         p = wait_for_bfd_packet(
1432             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1433         # poll bit must be set
1434         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1435         # terminate poll sequence
1436         final = self.test_session.create_packet()
1437         final[BFD].flags = "D+F"
1438         self.test_session.send_packet(final)
1439         # vpp should be quiet now again
1440         transmit_time = 0.9 \
1441             * max(self.vpp_session.required_min_rx,
1442                   self.test_session.desired_min_tx) \
1443             / USEC_IN_SEC
1444         count = 0
1445         for dummy in range(self.test_session.detect_mult * 2):
1446             self.sleep(transmit_time)
1447             self.test_session.send_packet(demand)
1448             try:
1449                 p = wait_for_bfd_packet(self, timeout=0)
1450                 self.logger.error(ppp("Received unexpected packet:", p))
1451                 count += 1
1452             except CaptureTimeoutError:
1453                 pass
1454         events = self.vapi.collect_events()
1455         for e in events:
1456             self.logger.error("Received unexpected event: %s", e)
1457         self.assert_equal(count, 0, "number of packets received")
1458         self.assert_equal(len(events), 0, "number of events received")
1459
1460     def test_intf_deleted(self):
1461         """ interface with bfd session deleted """
1462         intf = VppLoInterface(self)
1463         intf.config_ip4()
1464         intf.admin_up()
1465         sw_if_index = intf.sw_if_index
1466         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1467         vpp_session.add_vpp_config()
1468         vpp_session.admin_up()
1469         intf.remove_vpp_config()
1470         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1471         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1472         self.assertFalse(vpp_session.query_vpp_config())
1473
1474
1475 class BFD6TestCase(VppTestCase):
1476     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1477
1478     pg0 = None
1479     vpp_clock_offset = None
1480     vpp_session = None
1481     test_session = None
1482
1483     @classmethod
1484     def setUpClass(cls):
1485         super(BFD6TestCase, cls).setUpClass()
1486         cls.vapi.cli("set log class bfd level debug")
1487         try:
1488             cls.create_pg_interfaces([0])
1489             cls.pg0.config_ip6()
1490             cls.pg0.configure_ipv6_neighbors()
1491             cls.pg0.admin_up()
1492             cls.pg0.resolve_ndp()
1493             cls.create_loopback_interfaces(1)
1494             cls.loopback0 = cls.lo_interfaces[0]
1495             cls.loopback0.config_ip6()
1496             cls.loopback0.admin_up()
1497
1498         except Exception:
1499             super(BFD6TestCase, cls).tearDownClass()
1500             raise
1501
1502     @classmethod
1503     def tearDownClass(cls):
1504         super(BFD6TestCase, cls).tearDownClass()
1505
1506     def setUp(self):
1507         super(BFD6TestCase, self).setUp()
1508         self.factory = AuthKeyFactory()
1509         self.vapi.want_bfd_events()
1510         self.pg0.enable_capture()
1511         try:
1512             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1513                                                 self.pg0.remote_ip6,
1514                                                 af=AF_INET6)
1515             self.vpp_session.add_vpp_config()
1516             self.vpp_session.admin_up()
1517             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1518             self.logger.debug(self.vapi.cli("show adj nbr"))
1519         except BaseException:
1520             self.vapi.want_bfd_events(enable_disable=0)
1521             raise
1522
1523     def tearDown(self):
1524         if not self.vpp_dead:
1525             self.vapi.want_bfd_events(enable_disable=0)
1526         self.vapi.collect_events()  # clear the event queue
1527         super(BFD6TestCase, self).tearDown()
1528
1529     def test_session_up(self):
1530         """ bring BFD session up """
1531         bfd_session_up(self)
1532
1533     def test_session_up_by_ip(self):
1534         """ bring BFD session up - first frame looked up by address pair """
1535         self.logger.info("BFD: Sending Slow control frame")
1536         self.test_session.update(my_discriminator=randint(0, 40000000))
1537         self.test_session.send_packet()
1538         self.pg0.enable_capture()
1539         p = self.pg0.wait_for_packet(1)
1540         self.assert_equal(p[BFD].your_discriminator,
1541                           self.test_session.my_discriminator,
1542                           "BFD - your discriminator")
1543         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1544         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1545                                  state=BFDState.up)
1546         self.logger.info("BFD: Waiting for event")
1547         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1548         verify_event(self, e, expected_state=BFDState.init)
1549         self.logger.info("BFD: Sending Up")
1550         self.test_session.send_packet()
1551         self.logger.info("BFD: Waiting for event")
1552         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1553         verify_event(self, e, expected_state=BFDState.up)
1554         self.logger.info("BFD: Session is Up")
1555         self.test_session.update(state=BFDState.up)
1556         self.test_session.send_packet()
1557         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1558
1559     def test_hold_up(self):
1560         """ hold BFD session up """
1561         bfd_session_up(self)
1562         for dummy in range(self.test_session.detect_mult * 2):
1563             wait_for_bfd_packet(self)
1564             self.test_session.send_packet()
1565         self.assert_equal(len(self.vapi.collect_events()), 0,
1566                           "number of bfd events")
1567         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1568
1569     def test_echo_looped_back(self):
1570         """ echo packets looped back """
1571         # don't need a session in this case..
1572         self.vpp_session.remove_vpp_config()
1573         self.pg0.enable_capture()
1574         echo_packet_count = 10
1575         # random source port low enough to increment a few times..
1576         udp_sport_tx = randint(1, 50000)
1577         udp_sport_rx = udp_sport_tx
1578         echo_packet = (Ether(src=self.pg0.remote_mac,
1579                              dst=self.pg0.local_mac) /
1580                        IPv6(src=self.pg0.remote_ip6,
1581                             dst=self.pg0.remote_ip6) /
1582                        UDP(dport=BFD.udp_dport_echo) /
1583                        Raw("this should be looped back"))
1584         for dummy in range(echo_packet_count):
1585             self.sleep(.01, "delay between echo packets")
1586             echo_packet[UDP].sport = udp_sport_tx
1587             udp_sport_tx += 1
1588             self.logger.debug(ppp("Sending packet:", echo_packet))
1589             self.pg0.add_stream(echo_packet)
1590             self.pg_start()
1591         for dummy in range(echo_packet_count):
1592             p = self.pg0.wait_for_packet(1)
1593             self.logger.debug(ppp("Got packet:", p))
1594             ether = p[Ether]
1595             self.assert_equal(self.pg0.remote_mac,
1596                               ether.dst, "Destination MAC")
1597             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1598             ip = p[IPv6]
1599             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1600             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1601             udp = p[UDP]
1602             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1603                               "UDP destination port")
1604             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1605             udp_sport_rx += 1
1606             # need to compare the hex payload here, otherwise BFD_vpp_echo
1607             # gets in way
1608             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1609                              scapy.compat.raw(echo_packet[UDP].payload),
1610                              "Received packet is not the echo packet sent")
1611         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1612                           "ECHO packet identifier for test purposes)")
1613         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1614                           "ECHO packet identifier for test purposes)")
1615
1616     def test_echo(self):
1617         """ echo function """
1618         bfd_session_up(self)
1619         self.test_session.update(required_min_echo_rx=150000)
1620         self.test_session.send_packet()
1621         detection_time = self.test_session.detect_mult *\
1622             self.vpp_session.required_min_rx / USEC_IN_SEC
1623         # echo shouldn't work without echo source set
1624         for dummy in range(10):
1625             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1626             self.sleep(sleep, "delay before sending bfd packet")
1627             self.test_session.send_packet()
1628         p = wait_for_bfd_packet(
1629             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1630         self.assert_equal(p[BFD].required_min_rx_interval,
1631                           self.vpp_session.required_min_rx,
1632                           "BFD required min rx interval")
1633         self.test_session.send_packet()
1634         self.vapi.bfd_udp_set_echo_source(
1635             sw_if_index=self.loopback0.sw_if_index)
1636         echo_seen = False
1637         # should be turned on - loopback echo packets
1638         for dummy in range(3):
1639             loop_until = time.time() + 0.75 * detection_time
1640             while time.time() < loop_until:
1641                 p = self.pg0.wait_for_packet(1)
1642                 self.logger.debug(ppp("Got packet:", p))
1643                 if p[UDP].dport == BFD.udp_dport_echo:
1644                     self.assert_equal(
1645                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1646                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1647                                         "BFD ECHO src IP equal to loopback IP")
1648                     self.logger.debug(ppp("Looping back packet:", p))
1649                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1650                                       "ECHO packet destination MAC address")
1651                     p[Ether].dst = self.pg0.local_mac
1652                     self.pg0.add_stream(p)
1653                     self.pg_start()
1654                     echo_seen = True
1655                 elif p.haslayer(BFD):
1656                     if echo_seen:
1657                         self.assertGreaterEqual(
1658                             p[BFD].required_min_rx_interval,
1659                             1000000)
1660                     if "P" in p.sprintf("%BFD.flags%"):
1661                         final = self.test_session.create_packet()
1662                         final[BFD].flags = "F"
1663                         self.test_session.send_packet(final)
1664                 else:
1665                     raise Exception(ppp("Received unknown packet:", p))
1666
1667                 self.assert_equal(len(self.vapi.collect_events()), 0,
1668                                   "number of bfd events")
1669             self.test_session.send_packet()
1670         self.assertTrue(echo_seen, "No echo packets received")
1671
1672     def test_intf_deleted(self):
1673         """ interface with bfd session deleted """
1674         intf = VppLoInterface(self)
1675         intf.config_ip6()
1676         intf.admin_up()
1677         sw_if_index = intf.sw_if_index
1678         vpp_session = VppBFDUDPSession(
1679             self, intf, intf.remote_ip6, af=AF_INET6)
1680         vpp_session.add_vpp_config()
1681         vpp_session.admin_up()
1682         intf.remove_vpp_config()
1683         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1684         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1685         self.assertFalse(vpp_session.query_vpp_config())
1686
1687
1688 class BFDFIBTestCase(VppTestCase):
1689     """ BFD-FIB interactions (IPv6) """
1690
1691     vpp_session = None
1692     test_session = None
1693
1694     @classmethod
1695     def setUpClass(cls):
1696         super(BFDFIBTestCase, cls).setUpClass()
1697
1698     @classmethod
1699     def tearDownClass(cls):
1700         super(BFDFIBTestCase, cls).tearDownClass()
1701
1702     def setUp(self):
1703         super(BFDFIBTestCase, self).setUp()
1704         self.create_pg_interfaces(range(1))
1705
1706         self.vapi.want_bfd_events()
1707         self.pg0.enable_capture()
1708
1709         for i in self.pg_interfaces:
1710             i.admin_up()
1711             i.config_ip6()
1712             i.configure_ipv6_neighbors()
1713
1714     def tearDown(self):
1715         if not self.vpp_dead:
1716             self.vapi.want_bfd_events(enable_disable=False)
1717
1718         super(BFDFIBTestCase, self).tearDown()
1719
1720     @staticmethod
1721     def pkt_is_not_data_traffic(p):
1722         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1723         if p.haslayer(BFD) or is_ipv6_misc(p):
1724             return True
1725         return False
1726
1727     def test_session_with_fib(self):
1728         """ BFD-FIB interactions """
1729
1730         # packets to match against both of the routes
1731         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1732               IPv6(src="3001::1", dst="2001::1") /
1733               UDP(sport=1234, dport=1234) /
1734               Raw(b'\xa5' * 100)),
1735              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1736               IPv6(src="3001::1", dst="2002::1") /
1737               UDP(sport=1234, dport=1234) /
1738               Raw(b'\xa5' * 100))]
1739
1740         # A recursive and a non-recursive route via a next-hop that
1741         # will have a BFD session
1742         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1743                                   [VppRoutePath(self.pg0.remote_ip6,
1744                                                 self.pg0.sw_if_index)])
1745         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1746                                   [VppRoutePath(self.pg0.remote_ip6,
1747                                                 0xffffffff)])
1748         ip_2001_s_64.add_vpp_config()
1749         ip_2002_s_64.add_vpp_config()
1750
1751         # bring the session up now the routes are present
1752         self.vpp_session = VppBFDUDPSession(self,
1753                                             self.pg0,
1754                                             self.pg0.remote_ip6,
1755                                             af=AF_INET6)
1756         self.vpp_session.add_vpp_config()
1757         self.vpp_session.admin_up()
1758         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1759
1760         # session is up - traffic passes
1761         bfd_session_up(self)
1762
1763         self.pg0.add_stream(p)
1764         self.pg_start()
1765         for packet in p:
1766             captured = self.pg0.wait_for_packet(
1767                 1,
1768                 filter_out_fn=self.pkt_is_not_data_traffic)
1769             self.assertEqual(captured[IPv6].dst,
1770                              packet[IPv6].dst)
1771
1772         # session is up - traffic is dropped
1773         bfd_session_down(self)
1774
1775         self.pg0.add_stream(p)
1776         self.pg_start()
1777         with self.assertRaises(CaptureTimeoutError):
1778             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1779
1780         # session is up - traffic passes
1781         bfd_session_up(self)
1782
1783         self.pg0.add_stream(p)
1784         self.pg_start()
1785         for packet in p:
1786             captured = self.pg0.wait_for_packet(
1787                 1,
1788                 filter_out_fn=self.pkt_is_not_data_traffic)
1789             self.assertEqual(captured[IPv6].dst,
1790                              packet[IPv6].dst)
1791
1792
1793 class BFDTunTestCase(VppTestCase):
1794     """ BFD over GRE tunnel """
1795
1796     vpp_session = None
1797     test_session = None
1798
1799     @classmethod
1800     def setUpClass(cls):
1801         super(BFDTunTestCase, cls).setUpClass()
1802
1803     @classmethod
1804     def tearDownClass(cls):
1805         super(BFDTunTestCase, cls).tearDownClass()
1806
1807     def setUp(self):
1808         super(BFDTunTestCase, self).setUp()
1809         self.create_pg_interfaces(range(1))
1810
1811         self.vapi.want_bfd_events()
1812         self.pg0.enable_capture()
1813
1814         for i in self.pg_interfaces:
1815             i.admin_up()
1816             i.config_ip4()
1817             i.resolve_arp()
1818
1819     def tearDown(self):
1820         if not self.vpp_dead:
1821             self.vapi.want_bfd_events(enable_disable=0)
1822
1823         super(BFDTunTestCase, self).tearDown()
1824
1825     @staticmethod
1826     def pkt_is_not_data_traffic(p):
1827         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1828         if p.haslayer(BFD) or is_ipv6_misc(p):
1829             return True
1830         return False
1831
1832     def test_bfd_o_gre(self):
1833         """ BFD-o-GRE  """
1834
1835         # A GRE interface over which to run a BFD session
1836         gre_if = VppGreInterface(self,
1837                                  self.pg0.local_ip4,
1838                                  self.pg0.remote_ip4)
1839         gre_if.add_vpp_config()
1840         gre_if.admin_up()
1841         gre_if.config_ip4()
1842
1843         # bring the session up now the routes are present
1844         self.vpp_session = VppBFDUDPSession(self,
1845                                             gre_if,
1846                                             gre_if.remote_ip4,
1847                                             is_tunnel=True)
1848         self.vpp_session.add_vpp_config()
1849         self.vpp_session.admin_up()
1850
1851         self.test_session = BFDTestSession(
1852             self, gre_if, AF_INET,
1853             tunnel_header=(IP(src=self.pg0.remote_ip4,
1854                               dst=self.pg0.local_ip4) /
1855                            GRE()),
1856             phy_interface=self.pg0)
1857
1858         # packets to match against both of the routes
1859         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1860               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1861               UDP(sport=1234, dport=1234) /
1862               Raw(b'\xa5' * 100))]
1863
1864         # session is up - traffic passes
1865         bfd_session_up(self)
1866
1867         self.send_and_expect(self.pg0, p, self.pg0)
1868
1869         # bring session down
1870         bfd_session_down(self)
1871
1872
1873 class BFDSHA1TestCase(VppTestCase):
1874     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1875
1876     pg0 = None
1877     vpp_clock_offset = None
1878     vpp_session = None
1879     test_session = None
1880
1881     @classmethod
1882     def setUpClass(cls):
1883         super(BFDSHA1TestCase, cls).setUpClass()
1884         cls.vapi.cli("set log class bfd level debug")
1885         try:
1886             cls.create_pg_interfaces([0])
1887             cls.pg0.config_ip4()
1888             cls.pg0.admin_up()
1889             cls.pg0.resolve_arp()
1890
1891         except Exception:
1892             super(BFDSHA1TestCase, cls).tearDownClass()
1893             raise
1894
1895     @classmethod
1896     def tearDownClass(cls):
1897         super(BFDSHA1TestCase, cls).tearDownClass()
1898
1899     def setUp(self):
1900         super(BFDSHA1TestCase, self).setUp()
1901         self.factory = AuthKeyFactory()
1902         self.vapi.want_bfd_events()
1903         self.pg0.enable_capture()
1904
1905     def tearDown(self):
1906         if not self.vpp_dead:
1907             self.vapi.want_bfd_events(enable_disable=False)
1908         self.vapi.collect_events()  # clear the event queue
1909         super(BFDSHA1TestCase, self).tearDown()
1910
1911     def test_session_up(self):
1912         """ bring BFD session up """
1913         key = self.factory.create_random_key(self)
1914         key.add_vpp_config()
1915         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1916                                             self.pg0.remote_ip4,
1917                                             sha1_key=key)
1918         self.vpp_session.add_vpp_config()
1919         self.vpp_session.admin_up()
1920         self.test_session = BFDTestSession(
1921             self, self.pg0, AF_INET, sha1_key=key,
1922             bfd_key_id=self.vpp_session.bfd_key_id)
1923         bfd_session_up(self)
1924
1925     def test_hold_up(self):
1926         """ hold BFD session up """
1927         key = self.factory.create_random_key(self)
1928         key.add_vpp_config()
1929         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1930                                             self.pg0.remote_ip4,
1931                                             sha1_key=key)
1932         self.vpp_session.add_vpp_config()
1933         self.vpp_session.admin_up()
1934         self.test_session = BFDTestSession(
1935             self, self.pg0, AF_INET, sha1_key=key,
1936             bfd_key_id=self.vpp_session.bfd_key_id)
1937         bfd_session_up(self)
1938         for dummy in range(self.test_session.detect_mult * 2):
1939             wait_for_bfd_packet(self)
1940             self.test_session.send_packet()
1941         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1942
1943     def test_hold_up_meticulous(self):
1944         """ hold BFD session up - meticulous auth """
1945         key = self.factory.create_random_key(
1946             self, BFDAuthType.meticulous_keyed_sha1)
1947         key.add_vpp_config()
1948         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1949                                             self.pg0.remote_ip4, sha1_key=key)
1950         self.vpp_session.add_vpp_config()
1951         self.vpp_session.admin_up()
1952         # specify sequence number so that it wraps
1953         self.test_session = BFDTestSession(
1954             self, self.pg0, AF_INET, sha1_key=key,
1955             bfd_key_id=self.vpp_session.bfd_key_id,
1956             our_seq_number=0xFFFFFFFF - 4)
1957         bfd_session_up(self)
1958         for dummy in range(30):
1959             wait_for_bfd_packet(self)
1960             self.test_session.inc_seq_num()
1961             self.test_session.send_packet()
1962         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1963
1964     def test_send_bad_seq_number(self):
1965         """ session is not kept alive by msgs with bad sequence numbers"""
1966         key = self.factory.create_random_key(
1967             self, BFDAuthType.meticulous_keyed_sha1)
1968         key.add_vpp_config()
1969         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1970                                             self.pg0.remote_ip4, sha1_key=key)
1971         self.vpp_session.add_vpp_config()
1972         self.test_session = BFDTestSession(
1973             self, self.pg0, AF_INET, sha1_key=key,
1974             bfd_key_id=self.vpp_session.bfd_key_id)
1975         bfd_session_up(self)
1976         detection_time = self.test_session.detect_mult *\
1977             self.vpp_session.required_min_rx / USEC_IN_SEC
1978         send_until = time.time() + 2 * detection_time
1979         while time.time() < send_until:
1980             self.test_session.send_packet()
1981             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1982                        "time between bfd packets")
1983         e = self.vapi.collect_events()
1984         # session should be down now, because the sequence numbers weren't
1985         # updated
1986         self.assert_equal(len(e), 1, "number of bfd events")
1987         verify_event(self, e[0], expected_state=BFDState.down)
1988
1989     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1990                                        legitimate_test_session,
1991                                        rogue_test_session,
1992                                        rogue_bfd_values=None):
1993         """ execute a rogue session interaction scenario
1994
1995         1. create vpp session, add config
1996         2. bring the legitimate session up
1997         3. copy the bfd values from legitimate session to rogue session
1998         4. apply rogue_bfd_values to rogue session
1999         5. set rogue session state to down
2000         6. send message to take the session down from the rogue session
2001         7. assert that the legitimate session is unaffected
2002         """
2003
2004         self.vpp_session = vpp_bfd_udp_session
2005         self.vpp_session.add_vpp_config()
2006         self.test_session = legitimate_test_session
2007         # bring vpp session up
2008         bfd_session_up(self)
2009         # send packet from rogue session
2010         rogue_test_session.update(
2011             my_discriminator=self.test_session.my_discriminator,
2012             your_discriminator=self.test_session.your_discriminator,
2013             desired_min_tx=self.test_session.desired_min_tx,
2014             required_min_rx=self.test_session.required_min_rx,
2015             detect_mult=self.test_session.detect_mult,
2016             diag=self.test_session.diag,
2017             state=self.test_session.state,
2018             auth_type=self.test_session.auth_type)
2019         if rogue_bfd_values:
2020             rogue_test_session.update(**rogue_bfd_values)
2021         rogue_test_session.update(state=BFDState.down)
2022         rogue_test_session.send_packet()
2023         wait_for_bfd_packet(self)
2024         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2025
2026     def test_mismatch_auth(self):
2027         """ session is not brought down by unauthenticated msg """
2028         key = self.factory.create_random_key(self)
2029         key.add_vpp_config()
2030         vpp_session = VppBFDUDPSession(
2031             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2032         legitimate_test_session = BFDTestSession(
2033             self, self.pg0, AF_INET, sha1_key=key,
2034             bfd_key_id=vpp_session.bfd_key_id)
2035         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2036         self.execute_rogue_session_scenario(vpp_session,
2037                                             legitimate_test_session,
2038                                             rogue_test_session)
2039
2040     def test_mismatch_bfd_key_id(self):
2041         """ session is not brought down by msg with non-existent key-id """
2042         key = self.factory.create_random_key(self)
2043         key.add_vpp_config()
2044         vpp_session = VppBFDUDPSession(
2045             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2046         # pick a different random bfd key id
2047         x = randint(0, 255)
2048         while x == vpp_session.bfd_key_id:
2049             x = randint(0, 255)
2050         legitimate_test_session = BFDTestSession(
2051             self, self.pg0, AF_INET, sha1_key=key,
2052             bfd_key_id=vpp_session.bfd_key_id)
2053         rogue_test_session = BFDTestSession(
2054             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2055         self.execute_rogue_session_scenario(vpp_session,
2056                                             legitimate_test_session,
2057                                             rogue_test_session)
2058
2059     def test_mismatched_auth_type(self):
2060         """ session is not brought down by msg with wrong auth type """
2061         key = self.factory.create_random_key(self)
2062         key.add_vpp_config()
2063         vpp_session = VppBFDUDPSession(
2064             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2065         legitimate_test_session = BFDTestSession(
2066             self, self.pg0, AF_INET, sha1_key=key,
2067             bfd_key_id=vpp_session.bfd_key_id)
2068         rogue_test_session = BFDTestSession(
2069             self, self.pg0, AF_INET, sha1_key=key,
2070             bfd_key_id=vpp_session.bfd_key_id)
2071         self.execute_rogue_session_scenario(
2072             vpp_session, legitimate_test_session, rogue_test_session,
2073             {'auth_type': BFDAuthType.keyed_md5})
2074
2075     def test_restart(self):
2076         """ simulate remote peer restart and resynchronization """
2077         key = self.factory.create_random_key(
2078             self, BFDAuthType.meticulous_keyed_sha1)
2079         key.add_vpp_config()
2080         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2081                                             self.pg0.remote_ip4, sha1_key=key)
2082         self.vpp_session.add_vpp_config()
2083         self.test_session = BFDTestSession(
2084             self, self.pg0, AF_INET, sha1_key=key,
2085             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2086         bfd_session_up(self)
2087         # don't send any packets for 2*detection_time
2088         detection_time = self.test_session.detect_mult *\
2089             self.vpp_session.required_min_rx / USEC_IN_SEC
2090         self.sleep(2 * detection_time, "simulating peer restart")
2091         events = self.vapi.collect_events()
2092         self.assert_equal(len(events), 1, "number of bfd events")
2093         verify_event(self, events[0], expected_state=BFDState.down)
2094         self.test_session.update(state=BFDState.down)
2095         # reset sequence number
2096         self.test_session.our_seq_number = 0
2097         self.test_session.vpp_seq_number = None
2098         # now throw away any pending packets
2099         self.pg0.enable_capture()
2100         bfd_session_up(self)
2101
2102
2103 class BFDAuthOnOffTestCase(VppTestCase):
2104     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2105
2106     pg0 = None
2107     vpp_session = None
2108     test_session = None
2109
2110     @classmethod
2111     def setUpClass(cls):
2112         super(BFDAuthOnOffTestCase, cls).setUpClass()
2113         cls.vapi.cli("set log class bfd level debug")
2114         try:
2115             cls.create_pg_interfaces([0])
2116             cls.pg0.config_ip4()
2117             cls.pg0.admin_up()
2118             cls.pg0.resolve_arp()
2119
2120         except Exception:
2121             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2122             raise
2123
2124     @classmethod
2125     def tearDownClass(cls):
2126         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2127
2128     def setUp(self):
2129         super(BFDAuthOnOffTestCase, self).setUp()
2130         self.factory = AuthKeyFactory()
2131         self.vapi.want_bfd_events()
2132         self.pg0.enable_capture()
2133
2134     def tearDown(self):
2135         if not self.vpp_dead:
2136             self.vapi.want_bfd_events(enable_disable=False)
2137         self.vapi.collect_events()  # clear the event queue
2138         super(BFDAuthOnOffTestCase, self).tearDown()
2139
2140     def test_auth_on_immediate(self):
2141         """ turn auth on without disturbing session state (immediate) """
2142         key = self.factory.create_random_key(self)
2143         key.add_vpp_config()
2144         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2145                                             self.pg0.remote_ip4)
2146         self.vpp_session.add_vpp_config()
2147         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2148         bfd_session_up(self)
2149         for dummy in range(self.test_session.detect_mult * 2):
2150             p = wait_for_bfd_packet(self)
2151             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2152             self.test_session.send_packet()
2153         self.vpp_session.activate_auth(key)
2154         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2155         self.test_session.sha1_key = key
2156         for dummy in range(self.test_session.detect_mult * 2):
2157             p = wait_for_bfd_packet(self)
2158             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2159             self.test_session.send_packet()
2160         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2161         self.assert_equal(len(self.vapi.collect_events()), 0,
2162                           "number of bfd events")
2163
2164     def test_auth_off_immediate(self):
2165         """ turn auth off without disturbing session state (immediate) """
2166         key = self.factory.create_random_key(self)
2167         key.add_vpp_config()
2168         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2169                                             self.pg0.remote_ip4, sha1_key=key)
2170         self.vpp_session.add_vpp_config()
2171         self.test_session = BFDTestSession(
2172             self, self.pg0, AF_INET, sha1_key=key,
2173             bfd_key_id=self.vpp_session.bfd_key_id)
2174         bfd_session_up(self)
2175         # self.vapi.want_bfd_events(enable_disable=0)
2176         for dummy in range(self.test_session.detect_mult * 2):
2177             p = wait_for_bfd_packet(self)
2178             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2179             self.test_session.inc_seq_num()
2180             self.test_session.send_packet()
2181         self.vpp_session.deactivate_auth()
2182         self.test_session.bfd_key_id = None
2183         self.test_session.sha1_key = None
2184         for dummy in range(self.test_session.detect_mult * 2):
2185             p = wait_for_bfd_packet(self)
2186             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2187             self.test_session.inc_seq_num()
2188             self.test_session.send_packet()
2189         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2190         self.assert_equal(len(self.vapi.collect_events()), 0,
2191                           "number of bfd events")
2192
2193     def test_auth_change_key_immediate(self):
2194         """ change auth key without disturbing session state (immediate) """
2195         key1 = self.factory.create_random_key(self)
2196         key1.add_vpp_config()
2197         key2 = self.factory.create_random_key(self)
2198         key2.add_vpp_config()
2199         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2200                                             self.pg0.remote_ip4, sha1_key=key1)
2201         self.vpp_session.add_vpp_config()
2202         self.test_session = BFDTestSession(
2203             self, self.pg0, AF_INET, sha1_key=key1,
2204             bfd_key_id=self.vpp_session.bfd_key_id)
2205         bfd_session_up(self)
2206         for dummy in range(self.test_session.detect_mult * 2):
2207             p = wait_for_bfd_packet(self)
2208             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2209             self.test_session.send_packet()
2210         self.vpp_session.activate_auth(key2)
2211         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2212         self.test_session.sha1_key = key2
2213         for dummy in range(self.test_session.detect_mult * 2):
2214             p = wait_for_bfd_packet(self)
2215             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2216             self.test_session.send_packet()
2217         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2218         self.assert_equal(len(self.vapi.collect_events()), 0,
2219                           "number of bfd events")
2220
2221     def test_auth_on_delayed(self):
2222         """ turn auth on without disturbing session state (delayed) """
2223         key = self.factory.create_random_key(self)
2224         key.add_vpp_config()
2225         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2226                                             self.pg0.remote_ip4)
2227         self.vpp_session.add_vpp_config()
2228         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2229         bfd_session_up(self)
2230         for dummy in range(self.test_session.detect_mult * 2):
2231             wait_for_bfd_packet(self)
2232             self.test_session.send_packet()
2233         self.vpp_session.activate_auth(key, delayed=True)
2234         for dummy in range(self.test_session.detect_mult * 2):
2235             p = wait_for_bfd_packet(self)
2236             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2237             self.test_session.send_packet()
2238         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2239         self.test_session.sha1_key = key
2240         self.test_session.send_packet()
2241         for dummy in range(self.test_session.detect_mult * 2):
2242             p = wait_for_bfd_packet(self)
2243             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2244             self.test_session.send_packet()
2245         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2246         self.assert_equal(len(self.vapi.collect_events()), 0,
2247                           "number of bfd events")
2248
2249     def test_auth_off_delayed(self):
2250         """ turn auth off without disturbing session state (delayed) """
2251         key = self.factory.create_random_key(self)
2252         key.add_vpp_config()
2253         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2254                                             self.pg0.remote_ip4, sha1_key=key)
2255         self.vpp_session.add_vpp_config()
2256         self.test_session = BFDTestSession(
2257             self, self.pg0, AF_INET, sha1_key=key,
2258             bfd_key_id=self.vpp_session.bfd_key_id)
2259         bfd_session_up(self)
2260         for dummy in range(self.test_session.detect_mult * 2):
2261             p = wait_for_bfd_packet(self)
2262             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2263             self.test_session.send_packet()
2264         self.vpp_session.deactivate_auth(delayed=True)
2265         for dummy in range(self.test_session.detect_mult * 2):
2266             p = wait_for_bfd_packet(self)
2267             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2268             self.test_session.send_packet()
2269         self.test_session.bfd_key_id = None
2270         self.test_session.sha1_key = None
2271         self.test_session.send_packet()
2272         for dummy in range(self.test_session.detect_mult * 2):
2273             p = wait_for_bfd_packet(self)
2274             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2275             self.test_session.send_packet()
2276         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2277         self.assert_equal(len(self.vapi.collect_events()), 0,
2278                           "number of bfd events")
2279
2280     def test_auth_change_key_delayed(self):
2281         """ change auth key without disturbing session state (delayed) """
2282         key1 = self.factory.create_random_key(self)
2283         key1.add_vpp_config()
2284         key2 = self.factory.create_random_key(self)
2285         key2.add_vpp_config()
2286         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2287                                             self.pg0.remote_ip4, sha1_key=key1)
2288         self.vpp_session.add_vpp_config()
2289         self.vpp_session.admin_up()
2290         self.test_session = BFDTestSession(
2291             self, self.pg0, AF_INET, sha1_key=key1,
2292             bfd_key_id=self.vpp_session.bfd_key_id)
2293         bfd_session_up(self)
2294         for dummy in range(self.test_session.detect_mult * 2):
2295             p = wait_for_bfd_packet(self)
2296             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2297             self.test_session.send_packet()
2298         self.vpp_session.activate_auth(key2, delayed=True)
2299         for dummy in range(self.test_session.detect_mult * 2):
2300             p = wait_for_bfd_packet(self)
2301             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2302             self.test_session.send_packet()
2303         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2304         self.test_session.sha1_key = key2
2305         self.test_session.send_packet()
2306         for dummy in range(self.test_session.detect_mult * 2):
2307             p = wait_for_bfd_packet(self)
2308             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2309             self.test_session.send_packet()
2310         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2311         self.assert_equal(len(self.vapi.collect_events()), 0,
2312                           "number of bfd events")
2313
2314
2315 class BFDCLITestCase(VppTestCase):
2316     """Bidirectional Forwarding Detection (BFD) (CLI) """
2317     pg0 = None
2318
2319     @classmethod
2320     def setUpClass(cls):
2321         super(BFDCLITestCase, cls).setUpClass()
2322         cls.vapi.cli("set log class bfd level debug")
2323         try:
2324             cls.create_pg_interfaces((0,))
2325             cls.pg0.config_ip4()
2326             cls.pg0.config_ip6()
2327             cls.pg0.resolve_arp()
2328             cls.pg0.resolve_ndp()
2329
2330         except Exception:
2331             super(BFDCLITestCase, cls).tearDownClass()
2332             raise
2333
2334     @classmethod
2335     def tearDownClass(cls):
2336         super(BFDCLITestCase, cls).tearDownClass()
2337
2338     def setUp(self):
2339         super(BFDCLITestCase, self).setUp()
2340         self.factory = AuthKeyFactory()
2341         self.pg0.enable_capture()
2342
2343     def tearDown(self):
2344         try:
2345             self.vapi.want_bfd_events(enable_disable=False)
2346         except UnexpectedApiReturnValueError:
2347             # some tests aren't subscribed, so this is not an issue
2348             pass
2349         self.vapi.collect_events()  # clear the event queue
2350         super(BFDCLITestCase, self).tearDown()
2351
2352     def cli_verify_no_response(self, cli):
2353         """ execute a CLI, asserting that the response is empty """
2354         self.assert_equal(self.vapi.cli(cli),
2355                           "",
2356                           "CLI command response")
2357
2358     def cli_verify_response(self, cli, expected):
2359         """ execute a CLI, asserting that the response matches expectation """
2360         try:
2361             reply = self.vapi.cli(cli)
2362         except CliFailedCommandError as cli_error:
2363             reply = str(cli_error)
2364         self.assert_equal(reply.strip(),
2365                           expected,
2366                           "CLI command response")
2367
2368     def test_show(self):
2369         """ show commands """
2370         k1 = self.factory.create_random_key(self)
2371         k1.add_vpp_config()
2372         k2 = self.factory.create_random_key(
2373             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2374         k2.add_vpp_config()
2375         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2376         s1.add_vpp_config()
2377         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2378                               sha1_key=k2)
2379         s2.add_vpp_config()
2380         self.logger.info(self.vapi.ppcli("show bfd keys"))
2381         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2382         self.logger.info(self.vapi.ppcli("show bfd"))
2383
2384     def test_set_del_sha1_key(self):
2385         """ set/delete SHA1 auth key """
2386         k = self.factory.create_random_key(self)
2387         self.registry.register(k, self.logger)
2388         self.cli_verify_no_response(
2389             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2390             (k.conf_key_id,
2391                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2392         self.assertTrue(k.query_vpp_config())
2393         self.vpp_session = VppBFDUDPSession(
2394             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2395         self.vpp_session.add_vpp_config()
2396         self.test_session = \
2397             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2398                            bfd_key_id=self.vpp_session.bfd_key_id)
2399         self.vapi.want_bfd_events()
2400         bfd_session_up(self)
2401         bfd_session_down(self)
2402         # try to replace the secret for the key - should fail because the key
2403         # is in-use
2404         k2 = self.factory.create_random_key(self)
2405         self.cli_verify_response(
2406             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2407             (k.conf_key_id,
2408                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2409             "bfd key set: `bfd_auth_set_key' API call failed, "
2410             "rv=-103:BFD object in use")
2411         # manipulating the session using old secret should still work
2412         bfd_session_up(self)
2413         bfd_session_down(self)
2414         self.vpp_session.remove_vpp_config()
2415         self.cli_verify_no_response(
2416             "bfd key del conf-key-id %s" % k.conf_key_id)
2417         self.assertFalse(k.query_vpp_config())
2418
2419     def test_set_del_meticulous_sha1_key(self):
2420         """ set/delete meticulous SHA1 auth key """
2421         k = self.factory.create_random_key(
2422             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2423         self.registry.register(k, self.logger)
2424         self.cli_verify_no_response(
2425             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2426             (k.conf_key_id,
2427                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2428         self.assertTrue(k.query_vpp_config())
2429         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2430                                             self.pg0.remote_ip6, af=AF_INET6,
2431                                             sha1_key=k)
2432         self.vpp_session.add_vpp_config()
2433         self.vpp_session.admin_up()
2434         self.test_session = \
2435             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2436                            bfd_key_id=self.vpp_session.bfd_key_id)
2437         self.vapi.want_bfd_events()
2438         bfd_session_up(self)
2439         bfd_session_down(self)
2440         # try to replace the secret for the key - should fail because the key
2441         # is in-use
2442         k2 = self.factory.create_random_key(self)
2443         self.cli_verify_response(
2444             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2445             (k.conf_key_id,
2446                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2447             "bfd key set: `bfd_auth_set_key' API call failed, "
2448             "rv=-103:BFD object in use")
2449         # manipulating the session using old secret should still work
2450         bfd_session_up(self)
2451         bfd_session_down(self)
2452         self.vpp_session.remove_vpp_config()
2453         self.cli_verify_no_response(
2454             "bfd key del conf-key-id %s" % k.conf_key_id)
2455         self.assertFalse(k.query_vpp_config())
2456
2457     def test_add_mod_del_bfd_udp(self):
2458         """ create/modify/delete IPv4 BFD UDP session """
2459         vpp_session = VppBFDUDPSession(
2460             self, self.pg0, self.pg0.remote_ip4)
2461         self.registry.register(vpp_session, self.logger)
2462         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2463             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2464             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2465                                 self.pg0.remote_ip4,
2466                                 vpp_session.desired_min_tx,
2467                                 vpp_session.required_min_rx,
2468                                 vpp_session.detect_mult)
2469         self.cli_verify_no_response(cli_add_cmd)
2470         # 2nd add should fail
2471         self.cli_verify_response(
2472             cli_add_cmd,
2473             "bfd udp session add: `bfd_add_add_session' API call"
2474             " failed, rv=-101:Duplicate BFD object")
2475         verify_bfd_session_config(self, vpp_session)
2476         mod_session = VppBFDUDPSession(
2477             self, self.pg0, self.pg0.remote_ip4,
2478             required_min_rx=2 * vpp_session.required_min_rx,
2479             desired_min_tx=3 * vpp_session.desired_min_tx,
2480             detect_mult=4 * vpp_session.detect_mult)
2481         self.cli_verify_no_response(
2482             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2483             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2484             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2485              mod_session.desired_min_tx, mod_session.required_min_rx,
2486              mod_session.detect_mult))
2487         verify_bfd_session_config(self, mod_session)
2488         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2489             "peer-addr %s" % (self.pg0.name,
2490                               self.pg0.local_ip4, self.pg0.remote_ip4)
2491         self.cli_verify_no_response(cli_del_cmd)
2492         # 2nd del is expected to fail
2493         self.cli_verify_response(
2494             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2495             " failed, rv=-102:No such BFD object")
2496         self.assertFalse(vpp_session.query_vpp_config())
2497
2498     def test_add_mod_del_bfd_udp6(self):
2499         """ create/modify/delete IPv6 BFD UDP session """
2500         vpp_session = VppBFDUDPSession(
2501             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2502         self.registry.register(vpp_session, self.logger)
2503         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2504             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2505             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2506                                 self.pg0.remote_ip6,
2507                                 vpp_session.desired_min_tx,
2508                                 vpp_session.required_min_rx,
2509                                 vpp_session.detect_mult)
2510         self.cli_verify_no_response(cli_add_cmd)
2511         # 2nd add should fail
2512         self.cli_verify_response(
2513             cli_add_cmd,
2514             "bfd udp session add: `bfd_add_add_session' API call"
2515             " failed, rv=-101:Duplicate BFD object")
2516         verify_bfd_session_config(self, vpp_session)
2517         mod_session = VppBFDUDPSession(
2518             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2519             required_min_rx=2 * vpp_session.required_min_rx,
2520             desired_min_tx=3 * vpp_session.desired_min_tx,
2521             detect_mult=4 * vpp_session.detect_mult)
2522         self.cli_verify_no_response(
2523             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2524             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2525             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2526              mod_session.desired_min_tx,
2527              mod_session.required_min_rx, mod_session.detect_mult))
2528         verify_bfd_session_config(self, mod_session)
2529         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2530             "peer-addr %s" % (self.pg0.name,
2531                               self.pg0.local_ip6, self.pg0.remote_ip6)
2532         self.cli_verify_no_response(cli_del_cmd)
2533         # 2nd del is expected to fail
2534         self.cli_verify_response(
2535             cli_del_cmd,
2536             "bfd udp session del: `bfd_udp_del_session' API call"
2537             " failed, rv=-102:No such BFD object")
2538         self.assertFalse(vpp_session.query_vpp_config())
2539
2540     def test_add_mod_del_bfd_udp_auth(self):
2541         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2542         key = self.factory.create_random_key(self)
2543         key.add_vpp_config()
2544         vpp_session = VppBFDUDPSession(
2545             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2546         self.registry.register(vpp_session, self.logger)
2547         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2548             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2549             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2550             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2551                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2552                vpp_session.detect_mult, key.conf_key_id,
2553                vpp_session.bfd_key_id)
2554         self.cli_verify_no_response(cli_add_cmd)
2555         # 2nd add should fail
2556         self.cli_verify_response(
2557             cli_add_cmd,
2558             "bfd udp session add: `bfd_add_add_session' API call"
2559             " failed, rv=-101:Duplicate BFD object")
2560         verify_bfd_session_config(self, vpp_session)
2561         mod_session = VppBFDUDPSession(
2562             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2563             bfd_key_id=vpp_session.bfd_key_id,
2564             required_min_rx=2 * vpp_session.required_min_rx,
2565             desired_min_tx=3 * vpp_session.desired_min_tx,
2566             detect_mult=4 * vpp_session.detect_mult)
2567         self.cli_verify_no_response(
2568             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2569             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2570             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2571              mod_session.desired_min_tx,
2572              mod_session.required_min_rx, mod_session.detect_mult))
2573         verify_bfd_session_config(self, mod_session)
2574         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2575             "peer-addr %s" % (self.pg0.name,
2576                               self.pg0.local_ip4, self.pg0.remote_ip4)
2577         self.cli_verify_no_response(cli_del_cmd)
2578         # 2nd del is expected to fail
2579         self.cli_verify_response(
2580             cli_del_cmd,
2581             "bfd udp session del: `bfd_udp_del_session' API call"
2582             " failed, rv=-102:No such BFD object")
2583         self.assertFalse(vpp_session.query_vpp_config())
2584
2585     def test_add_mod_del_bfd_udp6_auth(self):
2586         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2587         key = self.factory.create_random_key(
2588             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2589         key.add_vpp_config()
2590         vpp_session = VppBFDUDPSession(
2591             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2592         self.registry.register(vpp_session, self.logger)
2593         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2594             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2595             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2596             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2597                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2598                vpp_session.detect_mult, key.conf_key_id,
2599                vpp_session.bfd_key_id)
2600         self.cli_verify_no_response(cli_add_cmd)
2601         # 2nd add should fail
2602         self.cli_verify_response(
2603             cli_add_cmd,
2604             "bfd udp session add: `bfd_add_add_session' API call"
2605             " failed, rv=-101:Duplicate BFD object")
2606         verify_bfd_session_config(self, vpp_session)
2607         mod_session = VppBFDUDPSession(
2608             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2609             bfd_key_id=vpp_session.bfd_key_id,
2610             required_min_rx=2 * vpp_session.required_min_rx,
2611             desired_min_tx=3 * vpp_session.desired_min_tx,
2612             detect_mult=4 * vpp_session.detect_mult)
2613         self.cli_verify_no_response(
2614             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2615             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2616             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2617              mod_session.desired_min_tx,
2618              mod_session.required_min_rx, mod_session.detect_mult))
2619         verify_bfd_session_config(self, mod_session)
2620         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2621             "peer-addr %s" % (self.pg0.name,
2622                               self.pg0.local_ip6, self.pg0.remote_ip6)
2623         self.cli_verify_no_response(cli_del_cmd)
2624         # 2nd del is expected to fail
2625         self.cli_verify_response(
2626             cli_del_cmd,
2627             "bfd udp session del: `bfd_udp_del_session' API call"
2628             " failed, rv=-102:No such BFD object")
2629         self.assertFalse(vpp_session.query_vpp_config())
2630
2631     def test_auth_on_off(self):
2632         """ turn authentication on and off """
2633         key = self.factory.create_random_key(
2634             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2635         key.add_vpp_config()
2636         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2637         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2638                                         sha1_key=key)
2639         session.add_vpp_config()
2640         cli_activate = \
2641             "bfd udp session auth activate interface %s local-addr %s "\
2642             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2643             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2644                key.conf_key_id, auth_session.bfd_key_id)
2645         self.cli_verify_no_response(cli_activate)
2646         verify_bfd_session_config(self, auth_session)
2647         self.cli_verify_no_response(cli_activate)
2648         verify_bfd_session_config(self, auth_session)
2649         cli_deactivate = \
2650             "bfd udp session auth deactivate interface %s local-addr %s "\
2651             "peer-addr %s "\
2652             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2653         self.cli_verify_no_response(cli_deactivate)
2654         verify_bfd_session_config(self, session)
2655         self.cli_verify_no_response(cli_deactivate)
2656         verify_bfd_session_config(self, session)
2657
2658     def test_auth_on_off_delayed(self):
2659         """ turn authentication on and off (delayed) """
2660         key = self.factory.create_random_key(
2661             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2662         key.add_vpp_config()
2663         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2664         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2665                                         sha1_key=key)
2666         session.add_vpp_config()
2667         cli_activate = \
2668             "bfd udp session auth activate interface %s local-addr %s "\
2669             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2670             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2671                key.conf_key_id, auth_session.bfd_key_id)
2672         self.cli_verify_no_response(cli_activate)
2673         verify_bfd_session_config(self, auth_session)
2674         self.cli_verify_no_response(cli_activate)
2675         verify_bfd_session_config(self, auth_session)
2676         cli_deactivate = \
2677             "bfd udp session auth deactivate interface %s local-addr %s "\
2678             "peer-addr %s delayed yes"\
2679             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2680         self.cli_verify_no_response(cli_deactivate)
2681         verify_bfd_session_config(self, session)
2682         self.cli_verify_no_response(cli_deactivate)
2683         verify_bfd_session_config(self, session)
2684
2685     def test_admin_up_down(self):
2686         """ put session admin-up and admin-down """
2687         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2688         session.add_vpp_config()
2689         cli_down = \
2690             "bfd udp session set-flags admin down interface %s local-addr %s "\
2691             "peer-addr %s "\
2692             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2693         cli_up = \
2694             "bfd udp session set-flags admin up interface %s local-addr %s "\
2695             "peer-addr %s "\
2696             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2697         self.cli_verify_no_response(cli_down)
2698         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2699         self.cli_verify_no_response(cli_up)
2700         verify_bfd_session_config(self, session, state=BFDState.down)
2701
2702     def test_set_del_udp_echo_source(self):
2703         """ set/del udp echo source """
2704         self.create_loopback_interfaces(1)
2705         self.loopback0 = self.lo_interfaces[0]
2706         self.loopback0.admin_up()
2707         self.cli_verify_response("show bfd echo-source",
2708                                  "UDP echo source is not set.")
2709         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2710         self.cli_verify_no_response(cli_set)
2711         self.cli_verify_response("show bfd echo-source",
2712                                  "UDP echo source is: %s\n"
2713                                  "IPv4 address usable as echo source: none\n"
2714                                  "IPv6 address usable as echo source: none" %
2715                                  self.loopback0.name)
2716         self.loopback0.config_ip4()
2717         unpacked = unpack("!L", self.loopback0.local_ip4n)
2718         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2719         self.cli_verify_response("show bfd echo-source",
2720                                  "UDP echo source is: %s\n"
2721                                  "IPv4 address usable as echo source: %s\n"
2722                                  "IPv6 address usable as echo source: none" %
2723                                  (self.loopback0.name, echo_ip4))
2724         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2725         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2726                                             unpacked[2], unpacked[3] ^ 1))
2727         self.loopback0.config_ip6()
2728         self.cli_verify_response("show bfd echo-source",
2729                                  "UDP echo source is: %s\n"
2730                                  "IPv4 address usable as echo source: %s\n"
2731                                  "IPv6 address usable as echo source: %s" %
2732                                  (self.loopback0.name, echo_ip4, echo_ip6))
2733         cli_del = "bfd udp echo-source del"
2734         self.cli_verify_no_response(cli_del)
2735         self.cli_verify_response("show bfd echo-source",
2736                                  "UDP echo source is not set.")
2737
2738
2739 if __name__ == '__main__':
2740     unittest.main(testRunner=VppTestRunner)