tests: move bfd over gre to extended tests
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
29     CliFailedCommandError
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
33
34 USEC_IN_SEC = 1000000
35
36
37 class AuthKeyFactory(object):
38     """Factory class for creating auth keys with unique conf key ID"""
39
40     def __init__(self):
41         self._conf_key_ids = {}
42
43     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44         """ create a random key with unique conf key id """
45         conf_key_id = randint(0, 0xFFFFFFFF)
46         while conf_key_id in self._conf_key_ids:
47             conf_key_id = randint(0, 0xFFFFFFFF)
48         self._conf_key_ids[conf_key_id] = 1
49         key = scapy.compat.raw(
50             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51         return VppBFDAuthKey(test=test, auth_type=auth_type,
52                              conf_key_id=conf_key_id, key=key)
53
54
55 class BFDAPITestCase(VppTestCase):
56     """Bidirectional Forwarding Detection (BFD) - API"""
57
58     pg0 = None
59     pg1 = None
60
61     @classmethod
62     def setUpClass(cls):
63         super(BFDAPITestCase, cls).setUpClass()
64         cls.vapi.cli("set log class bfd level debug")
65         try:
66             cls.create_pg_interfaces(range(2))
67             for i in cls.pg_interfaces:
68                 i.config_ip4()
69                 i.config_ip6()
70                 i.resolve_arp()
71
72         except Exception:
73             super(BFDAPITestCase, cls).tearDownClass()
74             raise
75
76     @classmethod
77     def tearDownClass(cls):
78         super(BFDAPITestCase, cls).tearDownClass()
79
80     def setUp(self):
81         super(BFDAPITestCase, self).setUp()
82         self.factory = AuthKeyFactory()
83
84     def test_add_bfd(self):
85         """ create a BFD session """
86         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
87         session.add_vpp_config()
88         self.logger.debug("Session state is %s", session.state)
89         session.remove_vpp_config()
90         session.add_vpp_config()
91         self.logger.debug("Session state is %s", session.state)
92         session.remove_vpp_config()
93
94     def test_double_add(self):
95         """ create the same BFD session twice (negative case) """
96         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
97         session.add_vpp_config()
98
99         with self.vapi.assert_negative_api_retval():
100             session.add_vpp_config()
101
102         session.remove_vpp_config()
103
104     def test_add_bfd6(self):
105         """ create IPv6 BFD session """
106         session = VppBFDUDPSession(
107             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
108         session.add_vpp_config()
109         self.logger.debug("Session state is %s", session.state)
110         session.remove_vpp_config()
111         session.add_vpp_config()
112         self.logger.debug("Session state is %s", session.state)
113         session.remove_vpp_config()
114
115     def test_mod_bfd(self):
116         """ modify BFD session parameters """
117         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
118                                    desired_min_tx=50000,
119                                    required_min_rx=10000,
120                                    detect_mult=1)
121         session.add_vpp_config()
122         s = session.get_bfd_udp_session_dump_entry()
123         self.assert_equal(session.desired_min_tx,
124                           s.desired_min_tx,
125                           "desired min transmit interval")
126         self.assert_equal(session.required_min_rx,
127                           s.required_min_rx,
128                           "required min receive interval")
129         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
131                                   required_min_rx=session.required_min_rx * 2,
132                                   detect_mult=session.detect_mult * 2)
133         s = session.get_bfd_udp_session_dump_entry()
134         self.assert_equal(session.desired_min_tx,
135                           s.desired_min_tx,
136                           "desired min transmit interval")
137         self.assert_equal(session.required_min_rx,
138                           s.required_min_rx,
139                           "required min receive interval")
140         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
141
142     def test_add_sha1_keys(self):
143         """ add SHA1 keys """
144         key_count = 10
145         keys = [self.factory.create_random_key(
146             self) for i in range(0, key_count)]
147         for key in keys:
148             self.assertFalse(key.query_vpp_config())
149         for key in keys:
150             key.add_vpp_config()
151         for key in keys:
152             self.assertTrue(key.query_vpp_config())
153         # remove randomly
154         indexes = list(range(key_count))
155         shuffle(indexes)
156         removed = []
157         for i in indexes:
158             key = keys[i]
159             key.remove_vpp_config()
160             removed.append(i)
161             for j in range(key_count):
162                 key = keys[j]
163                 if j in removed:
164                     self.assertFalse(key.query_vpp_config())
165                 else:
166                     self.assertTrue(key.query_vpp_config())
167         # should be removed now
168         for key in keys:
169             self.assertFalse(key.query_vpp_config())
170         # add back and remove again
171         for key in keys:
172             key.add_vpp_config()
173         for key in keys:
174             self.assertTrue(key.query_vpp_config())
175         for key in keys:
176             key.remove_vpp_config()
177         for key in keys:
178             self.assertFalse(key.query_vpp_config())
179
180     def test_add_bfd_sha1(self):
181         """ create a BFD session (SHA1) """
182         key = self.factory.create_random_key(self)
183         key.add_vpp_config()
184         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185                                    sha1_key=key)
186         session.add_vpp_config()
187         self.logger.debug("Session state is %s", session.state)
188         session.remove_vpp_config()
189         session.add_vpp_config()
190         self.logger.debug("Session state is %s", session.state)
191         session.remove_vpp_config()
192
193     def test_double_add_sha1(self):
194         """ create the same BFD session twice (negative case) (SHA1) """
195         key = self.factory.create_random_key(self)
196         key.add_vpp_config()
197         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
198                                    sha1_key=key)
199         session.add_vpp_config()
200         with self.assertRaises(Exception):
201             session.add_vpp_config()
202
203     def test_add_auth_nonexistent_key(self):
204         """ create BFD session using non-existent SHA1 (negative case) """
205         session = VppBFDUDPSession(
206             self, self.pg0, self.pg0.remote_ip4,
207             sha1_key=self.factory.create_random_key(self))
208         with self.assertRaises(Exception):
209             session.add_vpp_config()
210
211     def test_shared_sha1_key(self):
212         """ share single SHA1 key between multiple BFD sessions """
213         key = self.factory.create_random_key(self)
214         key.add_vpp_config()
215         sessions = [
216             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217                              sha1_key=key),
218             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
219                              sha1_key=key, af=AF_INET6),
220             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
221                              sha1_key=key),
222             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
223                              sha1_key=key, af=AF_INET6)]
224         for s in sessions:
225             s.add_vpp_config()
226         removed = 0
227         for s in sessions:
228             e = key.get_bfd_auth_keys_dump_entry()
229             self.assert_equal(e.use_count, len(sessions) - removed,
230                               "Use count for shared key")
231             s.remove_vpp_config()
232             removed += 1
233         e = key.get_bfd_auth_keys_dump_entry()
234         self.assert_equal(e.use_count, len(sessions) - removed,
235                           "Use count for shared key")
236
237     def test_activate_auth(self):
238         """ activate SHA1 authentication """
239         key = self.factory.create_random_key(self)
240         key.add_vpp_config()
241         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242         session.add_vpp_config()
243         session.activate_auth(key)
244
245     def test_deactivate_auth(self):
246         """ deactivate SHA1 authentication """
247         key = self.factory.create_random_key(self)
248         key.add_vpp_config()
249         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
250         session.add_vpp_config()
251         session.activate_auth(key)
252         session.deactivate_auth()
253
254     def test_change_key(self):
255         """ change SHA1 key """
256         key1 = self.factory.create_random_key(self)
257         key2 = self.factory.create_random_key(self)
258         while key2.conf_key_id == key1.conf_key_id:
259             key2 = self.factory.create_random_key(self)
260         key1.add_vpp_config()
261         key2.add_vpp_config()
262         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
263                                    sha1_key=key1)
264         session.add_vpp_config()
265         session.activate_auth(key2)
266
267     def test_set_del_udp_echo_source(self):
268         """ set/del udp echo source """
269         self.create_loopback_interfaces(1)
270         self.loopback0 = self.lo_interfaces[0]
271         self.loopback0.admin_up()
272         echo_source = self.vapi.bfd_udp_get_echo_source()
273         self.assertFalse(echo_source.is_set)
274         self.assertFalse(echo_source.have_usable_ip4)
275         self.assertFalse(echo_source.have_usable_ip6)
276
277         self.vapi.bfd_udp_set_echo_source(
278             sw_if_index=self.loopback0.sw_if_index)
279         echo_source = self.vapi.bfd_udp_get_echo_source()
280         self.assertTrue(echo_source.is_set)
281         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282         self.assertFalse(echo_source.have_usable_ip4)
283         self.assertFalse(echo_source.have_usable_ip6)
284
285         self.loopback0.config_ip4()
286         unpacked = unpack("!L", self.loopback0.local_ip4n)
287         echo_ip4 = pack("!L", unpacked[0] ^ 1)
288         echo_source = self.vapi.bfd_udp_get_echo_source()
289         self.assertTrue(echo_source.is_set)
290         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
291         self.assertTrue(echo_source.have_usable_ip4)
292         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
293         self.assertFalse(echo_source.have_usable_ip6)
294
295         self.loopback0.config_ip6()
296         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
297         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
298                         unpacked[3] ^ 1)
299         echo_source = self.vapi.bfd_udp_get_echo_source()
300         self.assertTrue(echo_source.is_set)
301         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
302         self.assertTrue(echo_source.have_usable_ip4)
303         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
304         self.assertTrue(echo_source.have_usable_ip6)
305         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
306
307         self.vapi.bfd_udp_del_echo_source()
308         echo_source = self.vapi.bfd_udp_get_echo_source()
309         self.assertFalse(echo_source.is_set)
310         self.assertFalse(echo_source.have_usable_ip4)
311         self.assertFalse(echo_source.have_usable_ip6)
312
313
314 class BFDTestSession(object):
315     """ BFD session as seen from test framework side """
316
317     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
318                  bfd_key_id=None, our_seq_number=None,
319                  tunnel_header=None, phy_interface=None):
320         self.test = test
321         self.af = af
322         self.sha1_key = sha1_key
323         self.bfd_key_id = bfd_key_id
324         self.interface = interface
325         if phy_interface:
326             self.phy_interface = phy_interface
327         else:
328             self.phy_interface = self.interface
329         self.udp_sport = randint(49152, 65535)
330         if our_seq_number is None:
331             self.our_seq_number = randint(0, 40000000)
332         else:
333             self.our_seq_number = our_seq_number
334         self.vpp_seq_number = None
335         self.my_discriminator = 0
336         self.desired_min_tx = 300000
337         self.required_min_rx = 300000
338         self.required_min_echo_rx = None
339         self.detect_mult = detect_mult
340         self.diag = BFDDiagCode.no_diagnostic
341         self.your_discriminator = None
342         self.state = BFDState.down
343         self.auth_type = BFDAuthType.no_auth
344         self.tunnel_header = tunnel_header
345
346     def inc_seq_num(self):
347         """ increment sequence number, wrapping if needed """
348         if self.our_seq_number == 0xFFFFFFFF:
349             self.our_seq_number = 0
350         else:
351             self.our_seq_number += 1
352
353     def update(self, my_discriminator=None, your_discriminator=None,
354                desired_min_tx=None, required_min_rx=None,
355                required_min_echo_rx=None, detect_mult=None,
356                diag=None, state=None, auth_type=None):
357         """ update BFD parameters associated with session """
358         if my_discriminator is not None:
359             self.my_discriminator = my_discriminator
360         if your_discriminator is not None:
361             self.your_discriminator = your_discriminator
362         if required_min_rx is not None:
363             self.required_min_rx = required_min_rx
364         if required_min_echo_rx is not None:
365             self.required_min_echo_rx = required_min_echo_rx
366         if desired_min_tx is not None:
367             self.desired_min_tx = desired_min_tx
368         if detect_mult is not None:
369             self.detect_mult = detect_mult
370         if diag is not None:
371             self.diag = diag
372         if state is not None:
373             self.state = state
374         if auth_type is not None:
375             self.auth_type = auth_type
376
377     def fill_packet_fields(self, packet):
378         """ set packet fields with known values in packet """
379         bfd = packet[BFD]
380         if self.my_discriminator:
381             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
382                                    self.my_discriminator)
383             bfd.my_discriminator = self.my_discriminator
384         if self.your_discriminator:
385             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
386                                    self.your_discriminator)
387             bfd.your_discriminator = self.your_discriminator
388         if self.required_min_rx:
389             self.test.logger.debug(
390                 "BFD: setting packet.required_min_rx_interval=%s",
391                 self.required_min_rx)
392             bfd.required_min_rx_interval = self.required_min_rx
393         if self.required_min_echo_rx:
394             self.test.logger.debug(
395                 "BFD: setting packet.required_min_echo_rx=%s",
396                 self.required_min_echo_rx)
397             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
398         if self.desired_min_tx:
399             self.test.logger.debug(
400                 "BFD: setting packet.desired_min_tx_interval=%s",
401                 self.desired_min_tx)
402             bfd.desired_min_tx_interval = self.desired_min_tx
403         if self.detect_mult:
404             self.test.logger.debug(
405                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
406             bfd.detect_mult = self.detect_mult
407         if self.diag:
408             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
409             bfd.diag = self.diag
410         if self.state:
411             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
412             bfd.state = self.state
413         if self.auth_type:
414             # this is used by a negative test-case
415             self.test.logger.debug("BFD: setting packet.auth_type=%s",
416                                    self.auth_type)
417             bfd.auth_type = self.auth_type
418
419     def create_packet(self):
420         """ create a BFD packet, reflecting the current state of session """
421         if self.sha1_key:
422             bfd = BFD(flags="A")
423             bfd.auth_type = self.sha1_key.auth_type
424             bfd.auth_len = BFD.sha1_auth_len
425             bfd.auth_key_id = self.bfd_key_id
426             bfd.auth_seq_num = self.our_seq_number
427             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
428         else:
429             bfd = BFD()
430         packet = Ether(src=self.phy_interface.remote_mac,
431                        dst=self.phy_interface.local_mac)
432         if self.tunnel_header:
433             packet = packet / self.tunnel_header
434         if self.af == AF_INET6:
435             packet = (packet /
436                       IPv6(src=self.interface.remote_ip6,
437                            dst=self.interface.local_ip6,
438                            hlim=255) /
439                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
440                       bfd)
441         else:
442             packet = (packet /
443                       IP(src=self.interface.remote_ip4,
444                          dst=self.interface.local_ip4,
445                          ttl=255) /
446                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
447                       bfd)
448         self.test.logger.debug("BFD: Creating packet")
449         self.fill_packet_fields(packet)
450         if self.sha1_key:
451             hash_material = scapy.compat.raw(
452                 packet[BFD])[:32] + self.sha1_key.key + \
453                 b"\0" * (20 - len(self.sha1_key.key))
454             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
455                                    hashlib.sha1(hash_material).hexdigest())
456             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
457         return packet
458
459     def send_packet(self, packet=None, interface=None):
460         """ send packet on interface, creating the packet if needed """
461         if packet is None:
462             packet = self.create_packet()
463         if interface is None:
464             interface = self.phy_interface
465         self.test.logger.debug(ppp("Sending packet:", packet))
466         interface.add_stream(packet)
467         self.test.pg_start()
468
469     def verify_sha1_auth(self, packet):
470         """ Verify correctness of authentication in BFD layer. """
471         bfd = packet[BFD]
472         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
473         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
474                                BFDAuthType)
475         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
476         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
477         if self.vpp_seq_number is None:
478             self.vpp_seq_number = bfd.auth_seq_num
479             self.test.logger.debug("Received initial sequence number: %s" %
480                                    self.vpp_seq_number)
481         else:
482             recvd_seq_num = bfd.auth_seq_num
483             self.test.logger.debug("Received followup sequence number: %s" %
484                                    recvd_seq_num)
485             if self.vpp_seq_number < 0xffffffff:
486                 if self.sha1_key.auth_type == \
487                         BFDAuthType.meticulous_keyed_sha1:
488                     self.test.assert_equal(recvd_seq_num,
489                                            self.vpp_seq_number + 1,
490                                            "BFD sequence number")
491                 else:
492                     self.test.assert_in_range(recvd_seq_num,
493                                               self.vpp_seq_number,
494                                               self.vpp_seq_number + 1,
495                                               "BFD sequence number")
496             else:
497                 if self.sha1_key.auth_type == \
498                         BFDAuthType.meticulous_keyed_sha1:
499                     self.test.assert_equal(recvd_seq_num, 0,
500                                            "BFD sequence number")
501                 else:
502                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
503                                        "BFD sequence number not one of "
504                                        "(%s, 0)" % self.vpp_seq_number)
505             self.vpp_seq_number = recvd_seq_num
506         # last 20 bytes represent the hash - so replace them with the key,
507         # pad the result with zeros and hash the result
508         hash_material = bfd.original[:-20] + self.sha1_key.key + \
509             b"\0" * (20 - len(self.sha1_key.key))
510         expected_hash = hashlib.sha1(hash_material).hexdigest()
511         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
512                                expected_hash.encode(), "Auth key hash")
513
514     def verify_bfd(self, packet):
515         """ Verify correctness of BFD layer. """
516         bfd = packet[BFD]
517         self.test.assert_equal(bfd.version, 1, "BFD version")
518         self.test.assert_equal(bfd.your_discriminator,
519                                self.my_discriminator,
520                                "BFD - your discriminator")
521         if self.sha1_key:
522             self.verify_sha1_auth(packet)
523
524
525 def bfd_session_up(test):
526     """ Bring BFD session up """
527     test.logger.info("BFD: Waiting for slow hello")
528     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
529     old_offset = None
530     if hasattr(test, 'vpp_clock_offset'):
531         old_offset = test.vpp_clock_offset
532     test.vpp_clock_offset = time.time() - float(p.time)
533     test.logger.debug("BFD: Calculated vpp clock offset: %s",
534                       test.vpp_clock_offset)
535     if old_offset:
536         test.assertAlmostEqual(
537             old_offset, test.vpp_clock_offset, delta=0.5,
538             msg="vpp clock offset not stable (new: %s, old: %s)" %
539             (test.vpp_clock_offset, old_offset))
540     test.logger.info("BFD: Sending Init")
541     test.test_session.update(my_discriminator=randint(0, 40000000),
542                              your_discriminator=p[BFD].my_discriminator,
543                              state=BFDState.init)
544     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
545             BFDAuthType.meticulous_keyed_sha1:
546         test.test_session.inc_seq_num()
547     test.test_session.send_packet()
548     test.logger.info("BFD: Waiting for event")
549     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
550     verify_event(test, e, expected_state=BFDState.up)
551     test.logger.info("BFD: Session is Up")
552     test.test_session.update(state=BFDState.up)
553     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554             BFDAuthType.meticulous_keyed_sha1:
555         test.test_session.inc_seq_num()
556     test.test_session.send_packet()
557     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
558
559
560 def bfd_session_down(test):
561     """ Bring BFD session down """
562     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563     test.test_session.update(state=BFDState.down)
564     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
565             BFDAuthType.meticulous_keyed_sha1:
566         test.test_session.inc_seq_num()
567     test.test_session.send_packet()
568     test.logger.info("BFD: Waiting for event")
569     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
570     verify_event(test, e, expected_state=BFDState.down)
571     test.logger.info("BFD: Session is Down")
572     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
573
574
575 def verify_bfd_session_config(test, session, state=None):
576     dump = session.get_bfd_udp_session_dump_entry()
577     test.assertIsNotNone(dump)
578     # since dump is not none, we have verified that sw_if_index and addresses
579     # are valid (in get_bfd_udp_session_dump_entry)
580     if state:
581         test.assert_equal(dump.state, state, "session state")
582     test.assert_equal(dump.required_min_rx, session.required_min_rx,
583                       "required min rx interval")
584     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
585                       "desired min tx interval")
586     test.assert_equal(dump.detect_mult, session.detect_mult,
587                       "detect multiplier")
588     if session.sha1_key is None:
589         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
590     else:
591         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
592         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
593                           "bfd key id")
594         test.assert_equal(dump.conf_key_id,
595                           session.sha1_key.conf_key_id,
596                           "config key id")
597
598
599 def verify_ip(test, packet):
600     """ Verify correctness of IP layer. """
601     if test.vpp_session.af == AF_INET6:
602         ip = packet[IPv6]
603         local_ip = test.vpp_session.interface.local_ip6
604         remote_ip = test.vpp_session.interface.remote_ip6
605         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
606     else:
607         ip = packet[IP]
608         local_ip = test.vpp_session.interface.local_ip4
609         remote_ip = test.vpp_session.interface.remote_ip4
610         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
611     test.assert_equal(ip.src, local_ip, "IP source address")
612     test.assert_equal(ip.dst, remote_ip, "IP destination address")
613
614
615 def verify_udp(test, packet):
616     """ Verify correctness of UDP layer. """
617     udp = packet[UDP]
618     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
619     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
620                          "UDP source port")
621
622
623 def verify_event(test, event, expected_state):
624     """ Verify correctness of event values. """
625     e = event
626     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
627     test.assert_equal(e.sw_if_index,
628                       test.vpp_session.interface.sw_if_index,
629                       "BFD interface index")
630
631     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
632                       "Local IPv6 address")
633     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
634                       "Peer IPv6 address")
635     test.assert_equal(e.state, expected_state, BFDState)
636
637
638 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
639     """ wait for BFD packet and verify its correctness
640
641     :param timeout: how long to wait
642     :param pcap_time_min: ignore packets with pcap timestamp lower than this
643
644     :returns: tuple (packet, time spent waiting for packet)
645     """
646     test.logger.info("BFD: Waiting for BFD packet")
647     deadline = time.time() + timeout
648     counter = 0
649     while True:
650         counter += 1
651         # sanity check
652         test.assert_in_range(counter, 0, 100, "number of packets ignored")
653         time_left = deadline - time.time()
654         if time_left < 0:
655             raise CaptureTimeoutError("Packet did not arrive within timeout")
656         p = test.pg0.wait_for_packet(timeout=time_left)
657         test.logger.debug(ppp("BFD: Got packet:", p))
658         if pcap_time_min is not None and p.time < pcap_time_min:
659             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
660                                   "pcap time min %s):" %
661                                   (p.time, pcap_time_min), p))
662         else:
663             break
664     if is_tunnel:
665         # strip an IP layer and move to the next
666         p = p[IP].payload
667
668     bfd = p[BFD]
669     if bfd is None:
670         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
671     if bfd.payload:
672         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
673     verify_ip(test, p)
674     verify_udp(test, p)
675     test.test_session.verify_bfd(p)
676     return p
677
678
679 class BFD4TestCase(VppTestCase):
680     """Bidirectional Forwarding Detection (BFD)"""
681
682     pg0 = None
683     vpp_clock_offset = None
684     vpp_session = None
685     test_session = None
686
687     @classmethod
688     def setUpClass(cls):
689         super(BFD4TestCase, cls).setUpClass()
690         cls.vapi.cli("set log class bfd level debug")
691         try:
692             cls.create_pg_interfaces([0])
693             cls.create_loopback_interfaces(1)
694             cls.loopback0 = cls.lo_interfaces[0]
695             cls.loopback0.config_ip4()
696             cls.loopback0.admin_up()
697             cls.pg0.config_ip4()
698             cls.pg0.configure_ipv4_neighbors()
699             cls.pg0.admin_up()
700             cls.pg0.resolve_arp()
701
702         except Exception:
703             super(BFD4TestCase, cls).tearDownClass()
704             raise
705
706     @classmethod
707     def tearDownClass(cls):
708         super(BFD4TestCase, cls).tearDownClass()
709
710     def setUp(self):
711         super(BFD4TestCase, self).setUp()
712         self.factory = AuthKeyFactory()
713         self.vapi.want_bfd_events()
714         self.pg0.enable_capture()
715         try:
716             self.vpp_session = VppBFDUDPSession(self, self.pg0,
717                                                 self.pg0.remote_ip4)
718             self.vpp_session.add_vpp_config()
719             self.vpp_session.admin_up()
720             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
721         except BaseException:
722             self.vapi.want_bfd_events(enable_disable=0)
723             raise
724
725     def tearDown(self):
726         if not self.vpp_dead:
727             self.vapi.want_bfd_events(enable_disable=0)
728         self.vapi.collect_events()  # clear the event queue
729         super(BFD4TestCase, self).tearDown()
730
731     def test_session_up(self):
732         """ bring BFD session up """
733         bfd_session_up(self)
734
735     def test_session_up_by_ip(self):
736         """ bring BFD session up - first frame looked up by address pair """
737         self.logger.info("BFD: Sending Slow control frame")
738         self.test_session.update(my_discriminator=randint(0, 40000000))
739         self.test_session.send_packet()
740         self.pg0.enable_capture()
741         p = self.pg0.wait_for_packet(1)
742         self.assert_equal(p[BFD].your_discriminator,
743                           self.test_session.my_discriminator,
744                           "BFD - your discriminator")
745         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
746         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
747                                  state=BFDState.up)
748         self.logger.info("BFD: Waiting for event")
749         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750         verify_event(self, e, expected_state=BFDState.init)
751         self.logger.info("BFD: Sending Up")
752         self.test_session.send_packet()
753         self.logger.info("BFD: Waiting for event")
754         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755         verify_event(self, e, expected_state=BFDState.up)
756         self.logger.info("BFD: Session is Up")
757         self.test_session.update(state=BFDState.up)
758         self.test_session.send_packet()
759         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
760
761     def test_session_down(self):
762         """ bring BFD session down """
763         bfd_session_up(self)
764         bfd_session_down(self)
765
766     def test_hold_up(self):
767         """ hold BFD session up """
768         bfd_session_up(self)
769         for dummy in range(self.test_session.detect_mult * 2):
770             wait_for_bfd_packet(self)
771             self.test_session.send_packet()
772         self.assert_equal(len(self.vapi.collect_events()), 0,
773                           "number of bfd events")
774
775     def test_slow_timer(self):
776         """ verify slow periodic control frames while session down """
777         packet_count = 3
778         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
779         prev_packet = wait_for_bfd_packet(self, 2)
780         for dummy in range(packet_count):
781             next_packet = wait_for_bfd_packet(self, 2)
782             time_diff = next_packet.time - prev_packet.time
783             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
784             # to work around timing issues
785             self.assert_in_range(
786                 time_diff, 0.70, 1.05, "time between slow packets")
787             prev_packet = next_packet
788
789     def test_zero_remote_min_rx(self):
790         """ no packets when zero remote required min rx interval """
791         bfd_session_up(self)
792         self.test_session.update(required_min_rx=0)
793         self.test_session.send_packet()
794         for dummy in range(self.test_session.detect_mult):
795             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
796                        "sleep before transmitting bfd packet")
797             self.test_session.send_packet()
798             try:
799                 p = wait_for_bfd_packet(self, timeout=0)
800                 self.logger.error(ppp("Received unexpected packet:", p))
801             except CaptureTimeoutError:
802                 pass
803         self.assert_equal(
804             len(self.vapi.collect_events()), 0, "number of bfd events")
805         self.test_session.update(required_min_rx=300000)
806         for dummy in range(3):
807             self.test_session.send_packet()
808             wait_for_bfd_packet(
809                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
810         self.assert_equal(
811             len(self.vapi.collect_events()), 0, "number of bfd events")
812
813     def test_conn_down(self):
814         """ verify session goes down after inactivity """
815         bfd_session_up(self)
816         detection_time = self.test_session.detect_mult *\
817             self.vpp_session.required_min_rx / USEC_IN_SEC
818         self.sleep(detection_time, "waiting for BFD session time-out")
819         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
820         verify_event(self, e, expected_state=BFDState.down)
821
822     def test_large_required_min_rx(self):
823         """ large remote required min rx interval """
824         bfd_session_up(self)
825         p = wait_for_bfd_packet(self)
826         interval = 3000000
827         self.test_session.update(required_min_rx=interval)
828         self.test_session.send_packet()
829         time_mark = time.time()
830         count = 0
831         # busy wait here, trying to collect a packet or event, vpp is not
832         # allowed to send packets and the session will timeout first - so the
833         # Up->Down event must arrive before any packets do
834         while time.time() < time_mark + interval / USEC_IN_SEC:
835             try:
836                 p = wait_for_bfd_packet(self, timeout=0)
837                 # if vpp managed to send a packet before we did the session
838                 # session update, then that's fine, ignore it
839                 if p.time < time_mark - self.vpp_clock_offset:
840                     continue
841                 self.logger.error(ppp("Received unexpected packet:", p))
842                 count += 1
843             except CaptureTimeoutError:
844                 pass
845             events = self.vapi.collect_events()
846             if len(events) > 0:
847                 verify_event(self, events[0], BFDState.down)
848                 break
849         self.assert_equal(count, 0, "number of packets received")
850
851     def test_immediate_remote_min_rx_reduction(self):
852         """ immediately honor remote required min rx reduction """
853         self.vpp_session.remove_vpp_config()
854         self.vpp_session = VppBFDUDPSession(
855             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
856         self.pg0.enable_capture()
857         self.vpp_session.add_vpp_config()
858         self.test_session.update(desired_min_tx=1000000,
859                                  required_min_rx=1000000)
860         bfd_session_up(self)
861         reference_packet = wait_for_bfd_packet(self)
862         time_mark = time.time()
863         interval = 300000
864         self.test_session.update(required_min_rx=interval)
865         self.test_session.send_packet()
866         extra_time = time.time() - time_mark
867         p = wait_for_bfd_packet(self)
868         # first packet is allowed to be late by time we spent doing the update
869         # calculated in extra_time
870         self.assert_in_range(p.time - reference_packet.time,
871                              .95 * 0.75 * interval / USEC_IN_SEC,
872                              1.05 * interval / USEC_IN_SEC + extra_time,
873                              "time between BFD packets")
874         reference_packet = p
875         for dummy in range(3):
876             p = wait_for_bfd_packet(self)
877             diff = p.time - reference_packet.time
878             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
879                                  1.05 * interval / USEC_IN_SEC,
880                                  "time between BFD packets")
881             reference_packet = p
882
883     def test_modify_req_min_rx_double(self):
884         """ modify session - double required min rx """
885         bfd_session_up(self)
886         p = wait_for_bfd_packet(self)
887         self.test_session.update(desired_min_tx=10000,
888                                  required_min_rx=10000)
889         self.test_session.send_packet()
890         # double required min rx
891         self.vpp_session.modify_parameters(
892             required_min_rx=2 * self.vpp_session.required_min_rx)
893         p = wait_for_bfd_packet(
894             self, pcap_time_min=time.time() - self.vpp_clock_offset)
895         # poll bit needs to be set
896         self.assertIn("P", p.sprintf("%BFD.flags%"),
897                       "Poll bit not set in BFD packet")
898         # finish poll sequence with final packet
899         final = self.test_session.create_packet()
900         final[BFD].flags = "F"
901         timeout = self.test_session.detect_mult * \
902             max(self.test_session.desired_min_tx,
903                 self.vpp_session.required_min_rx) / USEC_IN_SEC
904         self.test_session.send_packet(final)
905         time_mark = time.time()
906         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
907         verify_event(self, e, expected_state=BFDState.down)
908         time_to_event = time.time() - time_mark
909         self.assert_in_range(time_to_event, .9 * timeout,
910                              1.1 * timeout, "session timeout")
911
912     def test_modify_req_min_rx_halve(self):
913         """ modify session - halve required min rx """
914         self.vpp_session.modify_parameters(
915             required_min_rx=2 * self.vpp_session.required_min_rx)
916         bfd_session_up(self)
917         p = wait_for_bfd_packet(self)
918         self.test_session.update(desired_min_tx=10000,
919                                  required_min_rx=10000)
920         self.test_session.send_packet()
921         p = wait_for_bfd_packet(
922             self, pcap_time_min=time.time() - self.vpp_clock_offset)
923         # halve required min rx
924         old_required_min_rx = self.vpp_session.required_min_rx
925         self.vpp_session.modify_parameters(
926             required_min_rx=self.vpp_session.required_min_rx // 2)
927         # now we wait 0.8*3*old-req-min-rx and the session should still be up
928         self.sleep(0.8 * self.vpp_session.detect_mult *
929                    old_required_min_rx / USEC_IN_SEC,
930                    "wait before finishing poll sequence")
931         self.assert_equal(len(self.vapi.collect_events()), 0,
932                           "number of bfd events")
933         p = wait_for_bfd_packet(self)
934         # poll bit needs to be set
935         self.assertIn("P", p.sprintf("%BFD.flags%"),
936                       "Poll bit not set in BFD packet")
937         # finish poll sequence with final packet
938         final = self.test_session.create_packet()
939         final[BFD].flags = "F"
940         self.test_session.send_packet(final)
941         # now the session should time out under new conditions
942         detection_time = self.test_session.detect_mult *\
943             self.vpp_session.required_min_rx / USEC_IN_SEC
944         before = time.time()
945         e = self.vapi.wait_for_event(
946             2 * detection_time, "bfd_udp_session_details")
947         after = time.time()
948         self.assert_in_range(after - before,
949                              0.9 * detection_time,
950                              1.1 * detection_time,
951                              "time before bfd session goes down")
952         verify_event(self, e, expected_state=BFDState.down)
953
954     def test_modify_detect_mult(self):
955         """ modify detect multiplier """
956         bfd_session_up(self)
957         p = wait_for_bfd_packet(self)
958         self.vpp_session.modify_parameters(detect_mult=1)
959         p = wait_for_bfd_packet(
960             self, pcap_time_min=time.time() - self.vpp_clock_offset)
961         self.assert_equal(self.vpp_session.detect_mult,
962                           p[BFD].detect_mult,
963                           "detect mult")
964         # poll bit must not be set
965         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
966                          "Poll bit not set in BFD packet")
967         self.vpp_session.modify_parameters(detect_mult=10)
968         p = wait_for_bfd_packet(
969             self, pcap_time_min=time.time() - self.vpp_clock_offset)
970         self.assert_equal(self.vpp_session.detect_mult,
971                           p[BFD].detect_mult,
972                           "detect mult")
973         # poll bit must not be set
974         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
975                          "Poll bit not set in BFD packet")
976
977     def test_queued_poll(self):
978         """ test poll sequence queueing """
979         bfd_session_up(self)
980         p = wait_for_bfd_packet(self)
981         self.vpp_session.modify_parameters(
982             required_min_rx=2 * self.vpp_session.required_min_rx)
983         p = wait_for_bfd_packet(self)
984         poll_sequence_start = time.time()
985         poll_sequence_length_min = 0.5
986         send_final_after = time.time() + poll_sequence_length_min
987         # poll bit needs to be set
988         self.assertIn("P", p.sprintf("%BFD.flags%"),
989                       "Poll bit not set in BFD packet")
990         self.assert_equal(p[BFD].required_min_rx_interval,
991                           self.vpp_session.required_min_rx,
992                           "BFD required min rx interval")
993         self.vpp_session.modify_parameters(
994             required_min_rx=2 * self.vpp_session.required_min_rx)
995         # 2nd poll sequence should be queued now
996         # don't send the reply back yet, wait for some time to emulate
997         # longer round-trip time
998         packet_count = 0
999         while time.time() < send_final_after:
1000             self.test_session.send_packet()
1001             p = wait_for_bfd_packet(self)
1002             self.assert_equal(len(self.vapi.collect_events()), 0,
1003                               "number of bfd events")
1004             self.assert_equal(p[BFD].required_min_rx_interval,
1005                               self.vpp_session.required_min_rx,
1006                               "BFD required min rx interval")
1007             packet_count += 1
1008             # poll bit must be set
1009             self.assertIn("P", p.sprintf("%BFD.flags%"),
1010                           "Poll bit not set in BFD packet")
1011         final = self.test_session.create_packet()
1012         final[BFD].flags = "F"
1013         self.test_session.send_packet(final)
1014         # finish 1st with final
1015         poll_sequence_length = time.time() - poll_sequence_start
1016         # vpp must wait for some time before starting new poll sequence
1017         poll_no_2_started = False
1018         for dummy in range(2 * packet_count):
1019             p = wait_for_bfd_packet(self)
1020             self.assert_equal(len(self.vapi.collect_events()), 0,
1021                               "number of bfd events")
1022             if "P" in p.sprintf("%BFD.flags%"):
1023                 poll_no_2_started = True
1024                 if time.time() < poll_sequence_start + poll_sequence_length:
1025                     raise Exception("VPP started 2nd poll sequence too soon")
1026                 final = self.test_session.create_packet()
1027                 final[BFD].flags = "F"
1028                 self.test_session.send_packet(final)
1029                 break
1030             else:
1031                 self.test_session.send_packet()
1032         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1033         # finish 2nd with final
1034         final = self.test_session.create_packet()
1035         final[BFD].flags = "F"
1036         self.test_session.send_packet(final)
1037         p = wait_for_bfd_packet(self)
1038         # poll bit must not be set
1039         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1040                          "Poll bit set in BFD packet")
1041
1042     def test_poll_response(self):
1043         """ test correct response to control frame with poll bit set """
1044         bfd_session_up(self)
1045         poll = self.test_session.create_packet()
1046         poll[BFD].flags = "P"
1047         self.test_session.send_packet(poll)
1048         final = wait_for_bfd_packet(
1049             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1050         self.assertIn("F", final.sprintf("%BFD.flags%"))
1051
1052     def test_no_periodic_if_remote_demand(self):
1053         """ no periodic frames outside poll sequence if remote demand set """
1054         bfd_session_up(self)
1055         demand = self.test_session.create_packet()
1056         demand[BFD].flags = "D"
1057         self.test_session.send_packet(demand)
1058         transmit_time = 0.9 \
1059             * max(self.vpp_session.required_min_rx,
1060                   self.test_session.desired_min_tx) \
1061             / USEC_IN_SEC
1062         count = 0
1063         for dummy in range(self.test_session.detect_mult * 2):
1064             self.sleep(transmit_time)
1065             self.test_session.send_packet(demand)
1066             try:
1067                 p = wait_for_bfd_packet(self, timeout=0)
1068                 self.logger.error(ppp("Received unexpected packet:", p))
1069                 count += 1
1070             except CaptureTimeoutError:
1071                 pass
1072         events = self.vapi.collect_events()
1073         for e in events:
1074             self.logger.error("Received unexpected event: %s", e)
1075         self.assert_equal(count, 0, "number of packets received")
1076         self.assert_equal(len(events), 0, "number of events received")
1077
1078     def test_echo_looped_back(self):
1079         """ echo packets looped back """
1080         # don't need a session in this case..
1081         self.vpp_session.remove_vpp_config()
1082         self.pg0.enable_capture()
1083         echo_packet_count = 10
1084         # random source port low enough to increment a few times..
1085         udp_sport_tx = randint(1, 50000)
1086         udp_sport_rx = udp_sport_tx
1087         echo_packet = (Ether(src=self.pg0.remote_mac,
1088                              dst=self.pg0.local_mac) /
1089                        IP(src=self.pg0.remote_ip4,
1090                           dst=self.pg0.remote_ip4) /
1091                        UDP(dport=BFD.udp_dport_echo) /
1092                        Raw("this should be looped back"))
1093         for dummy in range(echo_packet_count):
1094             self.sleep(.01, "delay between echo packets")
1095             echo_packet[UDP].sport = udp_sport_tx
1096             udp_sport_tx += 1
1097             self.logger.debug(ppp("Sending packet:", echo_packet))
1098             self.pg0.add_stream(echo_packet)
1099             self.pg_start()
1100         for dummy in range(echo_packet_count):
1101             p = self.pg0.wait_for_packet(1)
1102             self.logger.debug(ppp("Got packet:", p))
1103             ether = p[Ether]
1104             self.assert_equal(self.pg0.remote_mac,
1105                               ether.dst, "Destination MAC")
1106             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1107             ip = p[IP]
1108             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1109             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1110             udp = p[UDP]
1111             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1112                               "UDP destination port")
1113             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1114             udp_sport_rx += 1
1115             # need to compare the hex payload here, otherwise BFD_vpp_echo
1116             # gets in way
1117             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1118                              scapy.compat.raw(echo_packet[UDP].payload),
1119                              "Received packet is not the echo packet sent")
1120         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1121                           "ECHO packet identifier for test purposes)")
1122
1123     def test_echo(self):
1124         """ echo function """
1125         bfd_session_up(self)
1126         self.test_session.update(required_min_echo_rx=150000)
1127         self.test_session.send_packet()
1128         detection_time = self.test_session.detect_mult *\
1129             self.vpp_session.required_min_rx / USEC_IN_SEC
1130         # echo shouldn't work without echo source set
1131         for dummy in range(10):
1132             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1133             self.sleep(sleep, "delay before sending bfd packet")
1134             self.test_session.send_packet()
1135         p = wait_for_bfd_packet(
1136             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1137         self.assert_equal(p[BFD].required_min_rx_interval,
1138                           self.vpp_session.required_min_rx,
1139                           "BFD required min rx interval")
1140         self.test_session.send_packet()
1141         self.vapi.bfd_udp_set_echo_source(
1142             sw_if_index=self.loopback0.sw_if_index)
1143         echo_seen = False
1144         # should be turned on - loopback echo packets
1145         for dummy in range(3):
1146             loop_until = time.time() + 0.75 * detection_time
1147             while time.time() < loop_until:
1148                 p = self.pg0.wait_for_packet(1)
1149                 self.logger.debug(ppp("Got packet:", p))
1150                 if p[UDP].dport == BFD.udp_dport_echo:
1151                     self.assert_equal(
1152                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1153                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1154                                         "BFD ECHO src IP equal to loopback IP")
1155                     self.logger.debug(ppp("Looping back packet:", p))
1156                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1157                                       "ECHO packet destination MAC address")
1158                     p[Ether].dst = self.pg0.local_mac
1159                     self.pg0.add_stream(p)
1160                     self.pg_start()
1161                     echo_seen = True
1162                 elif p.haslayer(BFD):
1163                     if echo_seen:
1164                         self.assertGreaterEqual(
1165                             p[BFD].required_min_rx_interval,
1166                             1000000)
1167                     if "P" in p.sprintf("%BFD.flags%"):
1168                         final = self.test_session.create_packet()
1169                         final[BFD].flags = "F"
1170                         self.test_session.send_packet(final)
1171                 else:
1172                     raise Exception(ppp("Received unknown packet:", p))
1173
1174                 self.assert_equal(len(self.vapi.collect_events()), 0,
1175                                   "number of bfd events")
1176             self.test_session.send_packet()
1177         self.assertTrue(echo_seen, "No echo packets received")
1178
1179     def test_echo_fail(self):
1180         """ session goes down if echo function fails """
1181         bfd_session_up(self)
1182         self.test_session.update(required_min_echo_rx=150000)
1183         self.test_session.send_packet()
1184         detection_time = self.test_session.detect_mult *\
1185             self.vpp_session.required_min_rx / USEC_IN_SEC
1186         self.vapi.bfd_udp_set_echo_source(
1187             sw_if_index=self.loopback0.sw_if_index)
1188         # echo function should be used now, but we will drop the echo packets
1189         verified_diag = False
1190         for dummy in range(3):
1191             loop_until = time.time() + 0.75 * detection_time
1192             while time.time() < loop_until:
1193                 p = self.pg0.wait_for_packet(1)
1194                 self.logger.debug(ppp("Got packet:", p))
1195                 if p[UDP].dport == BFD.udp_dport_echo:
1196                     # dropped
1197                     pass
1198                 elif p.haslayer(BFD):
1199                     if "P" in p.sprintf("%BFD.flags%"):
1200                         self.assertGreaterEqual(
1201                             p[BFD].required_min_rx_interval,
1202                             1000000)
1203                         final = self.test_session.create_packet()
1204                         final[BFD].flags = "F"
1205                         self.test_session.send_packet(final)
1206                     if p[BFD].state == BFDState.down:
1207                         self.assert_equal(p[BFD].diag,
1208                                           BFDDiagCode.echo_function_failed,
1209                                           BFDDiagCode)
1210                         verified_diag = True
1211                 else:
1212                     raise Exception(ppp("Received unknown packet:", p))
1213             self.test_session.send_packet()
1214         events = self.vapi.collect_events()
1215         self.assert_equal(len(events), 1, "number of bfd events")
1216         self.assert_equal(events[0].state, BFDState.down, BFDState)
1217         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1218
1219     def test_echo_stop(self):
1220         """ echo function stops if peer sets required min echo rx zero """
1221         bfd_session_up(self)
1222         self.test_session.update(required_min_echo_rx=150000)
1223         self.test_session.send_packet()
1224         self.vapi.bfd_udp_set_echo_source(
1225             sw_if_index=self.loopback0.sw_if_index)
1226         # wait for first echo packet
1227         while True:
1228             p = self.pg0.wait_for_packet(1)
1229             self.logger.debug(ppp("Got packet:", p))
1230             if p[UDP].dport == BFD.udp_dport_echo:
1231                 self.logger.debug(ppp("Looping back packet:", p))
1232                 p[Ether].dst = self.pg0.local_mac
1233                 self.pg0.add_stream(p)
1234                 self.pg_start()
1235                 break
1236             elif p.haslayer(BFD):
1237                 # ignore BFD
1238                 pass
1239             else:
1240                 raise Exception(ppp("Received unknown packet:", p))
1241         self.test_session.update(required_min_echo_rx=0)
1242         self.test_session.send_packet()
1243         # echo packets shouldn't arrive anymore
1244         for dummy in range(5):
1245             wait_for_bfd_packet(
1246                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1247             self.test_session.send_packet()
1248             events = self.vapi.collect_events()
1249             self.assert_equal(len(events), 0, "number of bfd events")
1250
1251     def test_echo_source_removed(self):
1252         """ echo function stops if echo source is removed """
1253         bfd_session_up(self)
1254         self.test_session.update(required_min_echo_rx=150000)
1255         self.test_session.send_packet()
1256         self.vapi.bfd_udp_set_echo_source(
1257             sw_if_index=self.loopback0.sw_if_index)
1258         # wait for first echo packet
1259         while True:
1260             p = self.pg0.wait_for_packet(1)
1261             self.logger.debug(ppp("Got packet:", p))
1262             if p[UDP].dport == BFD.udp_dport_echo:
1263                 self.logger.debug(ppp("Looping back packet:", p))
1264                 p[Ether].dst = self.pg0.local_mac
1265                 self.pg0.add_stream(p)
1266                 self.pg_start()
1267                 break
1268             elif p.haslayer(BFD):
1269                 # ignore BFD
1270                 pass
1271             else:
1272                 raise Exception(ppp("Received unknown packet:", p))
1273         self.vapi.bfd_udp_del_echo_source()
1274         self.test_session.send_packet()
1275         # echo packets shouldn't arrive anymore
1276         for dummy in range(5):
1277             wait_for_bfd_packet(
1278                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1279             self.test_session.send_packet()
1280             events = self.vapi.collect_events()
1281             self.assert_equal(len(events), 0, "number of bfd events")
1282
1283     def test_stale_echo(self):
1284         """ stale echo packets don't keep a session up """
1285         bfd_session_up(self)
1286         self.test_session.update(required_min_echo_rx=150000)
1287         self.vapi.bfd_udp_set_echo_source(
1288             sw_if_index=self.loopback0.sw_if_index)
1289         self.test_session.send_packet()
1290         # should be turned on - loopback echo packets
1291         echo_packet = None
1292         timeout_at = None
1293         timeout_ok = False
1294         for dummy in range(10 * self.vpp_session.detect_mult):
1295             p = self.pg0.wait_for_packet(1)
1296             if p[UDP].dport == BFD.udp_dport_echo:
1297                 if echo_packet is None:
1298                     self.logger.debug(ppp("Got first echo packet:", p))
1299                     echo_packet = p
1300                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1301                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1302                 else:
1303                     self.logger.debug(ppp("Got followup echo packet:", p))
1304                 self.logger.debug(ppp("Looping back first echo packet:", p))
1305                 echo_packet[Ether].dst = self.pg0.local_mac
1306                 self.pg0.add_stream(echo_packet)
1307                 self.pg_start()
1308             elif p.haslayer(BFD):
1309                 self.logger.debug(ppp("Got packet:", p))
1310                 if "P" in p.sprintf("%BFD.flags%"):
1311                     final = self.test_session.create_packet()
1312                     final[BFD].flags = "F"
1313                     self.test_session.send_packet(final)
1314                 if p[BFD].state == BFDState.down:
1315                     self.assertIsNotNone(
1316                         timeout_at,
1317                         "Session went down before first echo packet received")
1318                     now = time.time()
1319                     self.assertGreaterEqual(
1320                         now, timeout_at,
1321                         "Session timeout at %s, but is expected at %s" %
1322                         (now, timeout_at))
1323                     self.assert_equal(p[BFD].diag,
1324                                       BFDDiagCode.echo_function_failed,
1325                                       BFDDiagCode)
1326                     events = self.vapi.collect_events()
1327                     self.assert_equal(len(events), 1, "number of bfd events")
1328                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1329                     timeout_ok = True
1330                     break
1331             else:
1332                 raise Exception(ppp("Received unknown packet:", p))
1333             self.test_session.send_packet()
1334         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1335
1336     def test_invalid_echo_checksum(self):
1337         """ echo packets with invalid checksum don't keep a session up """
1338         bfd_session_up(self)
1339         self.test_session.update(required_min_echo_rx=150000)
1340         self.vapi.bfd_udp_set_echo_source(
1341             sw_if_index=self.loopback0.sw_if_index)
1342         self.test_session.send_packet()
1343         # should be turned on - loopback echo packets
1344         timeout_at = None
1345         timeout_ok = False
1346         for dummy in range(10 * self.vpp_session.detect_mult):
1347             p = self.pg0.wait_for_packet(1)
1348             if p[UDP].dport == BFD.udp_dport_echo:
1349                 self.logger.debug(ppp("Got echo packet:", p))
1350                 if timeout_at is None:
1351                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1352                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1353                 p[BFD_vpp_echo].checksum = getrandbits(64)
1354                 p[Ether].dst = self.pg0.local_mac
1355                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1356                 self.pg0.add_stream(p)
1357                 self.pg_start()
1358             elif p.haslayer(BFD):
1359                 self.logger.debug(ppp("Got packet:", p))
1360                 if "P" in p.sprintf("%BFD.flags%"):
1361                     final = self.test_session.create_packet()
1362                     final[BFD].flags = "F"
1363                     self.test_session.send_packet(final)
1364                 if p[BFD].state == BFDState.down:
1365                     self.assertIsNotNone(
1366                         timeout_at,
1367                         "Session went down before first echo packet received")
1368                     now = time.time()
1369                     self.assertGreaterEqual(
1370                         now, timeout_at,
1371                         "Session timeout at %s, but is expected at %s" %
1372                         (now, timeout_at))
1373                     self.assert_equal(p[BFD].diag,
1374                                       BFDDiagCode.echo_function_failed,
1375                                       BFDDiagCode)
1376                     events = self.vapi.collect_events()
1377                     self.assert_equal(len(events), 1, "number of bfd events")
1378                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1379                     timeout_ok = True
1380                     break
1381             else:
1382                 raise Exception(ppp("Received unknown packet:", p))
1383             self.test_session.send_packet()
1384         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1385
1386     def test_admin_up_down(self):
1387         """ put session admin-up and admin-down """
1388         bfd_session_up(self)
1389         self.vpp_session.admin_down()
1390         self.pg0.enable_capture()
1391         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1392         verify_event(self, e, expected_state=BFDState.admin_down)
1393         for dummy in range(2):
1394             p = wait_for_bfd_packet(self)
1395             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1396         # try to bring session up - shouldn't be possible
1397         self.test_session.update(state=BFDState.init)
1398         self.test_session.send_packet()
1399         for dummy in range(2):
1400             p = wait_for_bfd_packet(self)
1401             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1402         self.vpp_session.admin_up()
1403         self.test_session.update(state=BFDState.down)
1404         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1405         verify_event(self, e, expected_state=BFDState.down)
1406         p = wait_for_bfd_packet(
1407             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1408         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1409         self.test_session.send_packet()
1410         p = wait_for_bfd_packet(
1411             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1412         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1413         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1414         verify_event(self, e, expected_state=BFDState.init)
1415         self.test_session.update(state=BFDState.up)
1416         self.test_session.send_packet()
1417         p = wait_for_bfd_packet(
1418             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1419         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1420         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1421         verify_event(self, e, expected_state=BFDState.up)
1422
1423     def test_config_change_remote_demand(self):
1424         """ configuration change while peer in demand mode """
1425         bfd_session_up(self)
1426         demand = self.test_session.create_packet()
1427         demand[BFD].flags = "D"
1428         self.test_session.send_packet(demand)
1429         self.vpp_session.modify_parameters(
1430             required_min_rx=2 * self.vpp_session.required_min_rx)
1431         p = wait_for_bfd_packet(
1432             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1433         # poll bit must be set
1434         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1435         # terminate poll sequence
1436         final = self.test_session.create_packet()
1437         final[BFD].flags = "D+F"
1438         self.test_session.send_packet(final)
1439         # vpp should be quiet now again
1440         transmit_time = 0.9 \
1441             * max(self.vpp_session.required_min_rx,
1442                   self.test_session.desired_min_tx) \
1443             / USEC_IN_SEC
1444         count = 0
1445         for dummy in range(self.test_session.detect_mult * 2):
1446             self.sleep(transmit_time)
1447             self.test_session.send_packet(demand)
1448             try:
1449                 p = wait_for_bfd_packet(self, timeout=0)
1450                 self.logger.error(ppp("Received unexpected packet:", p))
1451                 count += 1
1452             except CaptureTimeoutError:
1453                 pass
1454         events = self.vapi.collect_events()
1455         for e in events:
1456             self.logger.error("Received unexpected event: %s", e)
1457         self.assert_equal(count, 0, "number of packets received")
1458         self.assert_equal(len(events), 0, "number of events received")
1459
1460     def test_intf_deleted(self):
1461         """ interface with bfd session deleted """
1462         intf = VppLoInterface(self)
1463         intf.config_ip4()
1464         intf.admin_up()
1465         sw_if_index = intf.sw_if_index
1466         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1467         vpp_session.add_vpp_config()
1468         vpp_session.admin_up()
1469         intf.remove_vpp_config()
1470         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1471         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1472         self.assertFalse(vpp_session.query_vpp_config())
1473
1474
1475 class BFD6TestCase(VppTestCase):
1476     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1477
1478     pg0 = None
1479     vpp_clock_offset = None
1480     vpp_session = None
1481     test_session = None
1482
1483     @classmethod
1484     def setUpClass(cls):
1485         super(BFD6TestCase, cls).setUpClass()
1486         cls.vapi.cli("set log class bfd level debug")
1487         try:
1488             cls.create_pg_interfaces([0])
1489             cls.pg0.config_ip6()
1490             cls.pg0.configure_ipv6_neighbors()
1491             cls.pg0.admin_up()
1492             cls.pg0.resolve_ndp()
1493             cls.create_loopback_interfaces(1)
1494             cls.loopback0 = cls.lo_interfaces[0]
1495             cls.loopback0.config_ip6()
1496             cls.loopback0.admin_up()
1497
1498         except Exception:
1499             super(BFD6TestCase, cls).tearDownClass()
1500             raise
1501
1502     @classmethod
1503     def tearDownClass(cls):
1504         super(BFD6TestCase, cls).tearDownClass()
1505
1506     def setUp(self):
1507         super(BFD6TestCase, self).setUp()
1508         self.factory = AuthKeyFactory()
1509         self.vapi.want_bfd_events()
1510         self.pg0.enable_capture()
1511         try:
1512             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1513                                                 self.pg0.remote_ip6,
1514                                                 af=AF_INET6)
1515             self.vpp_session.add_vpp_config()
1516             self.vpp_session.admin_up()
1517             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1518             self.logger.debug(self.vapi.cli("show adj nbr"))
1519         except BaseException:
1520             self.vapi.want_bfd_events(enable_disable=0)
1521             raise
1522
1523     def tearDown(self):
1524         if not self.vpp_dead:
1525             self.vapi.want_bfd_events(enable_disable=0)
1526         self.vapi.collect_events()  # clear the event queue
1527         super(BFD6TestCase, self).tearDown()
1528
1529     def test_session_up(self):
1530         """ bring BFD session up """
1531         bfd_session_up(self)
1532
1533     def test_session_up_by_ip(self):
1534         """ bring BFD session up - first frame looked up by address pair """
1535         self.logger.info("BFD: Sending Slow control frame")
1536         self.test_session.update(my_discriminator=randint(0, 40000000))
1537         self.test_session.send_packet()
1538         self.pg0.enable_capture()
1539         p = self.pg0.wait_for_packet(1)
1540         self.assert_equal(p[BFD].your_discriminator,
1541                           self.test_session.my_discriminator,
1542                           "BFD - your discriminator")
1543         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1544         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1545                                  state=BFDState.up)
1546         self.logger.info("BFD: Waiting for event")
1547         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1548         verify_event(self, e, expected_state=BFDState.init)
1549         self.logger.info("BFD: Sending Up")
1550         self.test_session.send_packet()
1551         self.logger.info("BFD: Waiting for event")
1552         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1553         verify_event(self, e, expected_state=BFDState.up)
1554         self.logger.info("BFD: Session is Up")
1555         self.test_session.update(state=BFDState.up)
1556         self.test_session.send_packet()
1557         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1558
1559     def test_hold_up(self):
1560         """ hold BFD session up """
1561         bfd_session_up(self)
1562         for dummy in range(self.test_session.detect_mult * 2):
1563             wait_for_bfd_packet(self)
1564             self.test_session.send_packet()
1565         self.assert_equal(len(self.vapi.collect_events()), 0,
1566                           "number of bfd events")
1567         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1568
1569     def test_echo_looped_back(self):
1570         """ echo packets looped back """
1571         # don't need a session in this case..
1572         self.vpp_session.remove_vpp_config()
1573         self.pg0.enable_capture()
1574         echo_packet_count = 10
1575         # random source port low enough to increment a few times..
1576         udp_sport_tx = randint(1, 50000)
1577         udp_sport_rx = udp_sport_tx
1578         echo_packet = (Ether(src=self.pg0.remote_mac,
1579                              dst=self.pg0.local_mac) /
1580                        IPv6(src=self.pg0.remote_ip6,
1581                             dst=self.pg0.remote_ip6) /
1582                        UDP(dport=BFD.udp_dport_echo) /
1583                        Raw("this should be looped back"))
1584         for dummy in range(echo_packet_count):
1585             self.sleep(.01, "delay between echo packets")
1586             echo_packet[UDP].sport = udp_sport_tx
1587             udp_sport_tx += 1
1588             self.logger.debug(ppp("Sending packet:", echo_packet))
1589             self.pg0.add_stream(echo_packet)
1590             self.pg_start()
1591         for dummy in range(echo_packet_count):
1592             p = self.pg0.wait_for_packet(1)
1593             self.logger.debug(ppp("Got packet:", p))
1594             ether = p[Ether]
1595             self.assert_equal(self.pg0.remote_mac,
1596                               ether.dst, "Destination MAC")
1597             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1598             ip = p[IPv6]
1599             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1600             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1601             udp = p[UDP]
1602             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1603                               "UDP destination port")
1604             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1605             udp_sport_rx += 1
1606             # need to compare the hex payload here, otherwise BFD_vpp_echo
1607             # gets in way
1608             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1609                              scapy.compat.raw(echo_packet[UDP].payload),
1610                              "Received packet is not the echo packet sent")
1611         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1612                           "ECHO packet identifier for test purposes)")
1613         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1614                           "ECHO packet identifier for test purposes)")
1615
1616     def test_echo(self):
1617         """ echo function """
1618         bfd_session_up(self)
1619         self.test_session.update(required_min_echo_rx=150000)
1620         self.test_session.send_packet()
1621         detection_time = self.test_session.detect_mult *\
1622             self.vpp_session.required_min_rx / USEC_IN_SEC
1623         # echo shouldn't work without echo source set
1624         for dummy in range(10):
1625             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1626             self.sleep(sleep, "delay before sending bfd packet")
1627             self.test_session.send_packet()
1628         p = wait_for_bfd_packet(
1629             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1630         self.assert_equal(p[BFD].required_min_rx_interval,
1631                           self.vpp_session.required_min_rx,
1632                           "BFD required min rx interval")
1633         self.test_session.send_packet()
1634         self.vapi.bfd_udp_set_echo_source(
1635             sw_if_index=self.loopback0.sw_if_index)
1636         echo_seen = False
1637         # should be turned on - loopback echo packets
1638         for dummy in range(3):
1639             loop_until = time.time() + 0.75 * detection_time
1640             while time.time() < loop_until:
1641                 p = self.pg0.wait_for_packet(1)
1642                 self.logger.debug(ppp("Got packet:", p))
1643                 if p[UDP].dport == BFD.udp_dport_echo:
1644                     self.assert_equal(
1645                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1646                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1647                                         "BFD ECHO src IP equal to loopback IP")
1648                     self.logger.debug(ppp("Looping back packet:", p))
1649                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1650                                       "ECHO packet destination MAC address")
1651                     p[Ether].dst = self.pg0.local_mac
1652                     self.pg0.add_stream(p)
1653                     self.pg_start()
1654                     echo_seen = True
1655                 elif p.haslayer(BFD):
1656                     if echo_seen:
1657                         self.assertGreaterEqual(
1658                             p[BFD].required_min_rx_interval,
1659                             1000000)
1660                     if "P" in p.sprintf("%BFD.flags%"):
1661                         final = self.test_session.create_packet()
1662                         final[BFD].flags = "F"
1663                         self.test_session.send_packet(final)
1664                 else:
1665                     raise Exception(ppp("Received unknown packet:", p))
1666
1667                 self.assert_equal(len(self.vapi.collect_events()), 0,
1668                                   "number of bfd events")
1669             self.test_session.send_packet()
1670         self.assertTrue(echo_seen, "No echo packets received")
1671
1672     def test_intf_deleted(self):
1673         """ interface with bfd session deleted """
1674         intf = VppLoInterface(self)
1675         intf.config_ip6()
1676         intf.admin_up()
1677         sw_if_index = intf.sw_if_index
1678         vpp_session = VppBFDUDPSession(
1679             self, intf, intf.remote_ip6, af=AF_INET6)
1680         vpp_session.add_vpp_config()
1681         vpp_session.admin_up()
1682         intf.remove_vpp_config()
1683         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1684         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1685         self.assertFalse(vpp_session.query_vpp_config())
1686
1687
1688 class BFDFIBTestCase(VppTestCase):
1689     """ BFD-FIB interactions (IPv6) """
1690
1691     vpp_session = None
1692     test_session = None
1693
1694     @classmethod
1695     def setUpClass(cls):
1696         super(BFDFIBTestCase, cls).setUpClass()
1697
1698     @classmethod
1699     def tearDownClass(cls):
1700         super(BFDFIBTestCase, cls).tearDownClass()
1701
1702     def setUp(self):
1703         super(BFDFIBTestCase, self).setUp()
1704         self.create_pg_interfaces(range(1))
1705
1706         self.vapi.want_bfd_events()
1707         self.pg0.enable_capture()
1708
1709         for i in self.pg_interfaces:
1710             i.admin_up()
1711             i.config_ip6()
1712             i.configure_ipv6_neighbors()
1713
1714     def tearDown(self):
1715         if not self.vpp_dead:
1716             self.vapi.want_bfd_events(enable_disable=False)
1717
1718         super(BFDFIBTestCase, self).tearDown()
1719
1720     @staticmethod
1721     def pkt_is_not_data_traffic(p):
1722         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1723         if p.haslayer(BFD) or is_ipv6_misc(p):
1724             return True
1725         return False
1726
1727     def test_session_with_fib(self):
1728         """ BFD-FIB interactions """
1729
1730         # packets to match against both of the routes
1731         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1732               IPv6(src="3001::1", dst="2001::1") /
1733               UDP(sport=1234, dport=1234) /
1734               Raw(b'\xa5' * 100)),
1735              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1736               IPv6(src="3001::1", dst="2002::1") /
1737               UDP(sport=1234, dport=1234) /
1738               Raw(b'\xa5' * 100))]
1739
1740         # A recursive and a non-recursive route via a next-hop that
1741         # will have a BFD session
1742         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1743                                   [VppRoutePath(self.pg0.remote_ip6,
1744                                                 self.pg0.sw_if_index)])
1745         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1746                                   [VppRoutePath(self.pg0.remote_ip6,
1747                                                 0xffffffff)])
1748         ip_2001_s_64.add_vpp_config()
1749         ip_2002_s_64.add_vpp_config()
1750
1751         # bring the session up now the routes are present
1752         self.vpp_session = VppBFDUDPSession(self,
1753                                             self.pg0,
1754                                             self.pg0.remote_ip6,
1755                                             af=AF_INET6)
1756         self.vpp_session.add_vpp_config()
1757         self.vpp_session.admin_up()
1758         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1759
1760         # session is up - traffic passes
1761         bfd_session_up(self)
1762
1763         self.pg0.add_stream(p)
1764         self.pg_start()
1765         for packet in p:
1766             captured = self.pg0.wait_for_packet(
1767                 1,
1768                 filter_out_fn=self.pkt_is_not_data_traffic)
1769             self.assertEqual(captured[IPv6].dst,
1770                              packet[IPv6].dst)
1771
1772         # session is up - traffic is dropped
1773         bfd_session_down(self)
1774
1775         self.pg0.add_stream(p)
1776         self.pg_start()
1777         with self.assertRaises(CaptureTimeoutError):
1778             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1779
1780         # session is up - traffic passes
1781         bfd_session_up(self)
1782
1783         self.pg0.add_stream(p)
1784         self.pg_start()
1785         for packet in p:
1786             captured = self.pg0.wait_for_packet(
1787                 1,
1788                 filter_out_fn=self.pkt_is_not_data_traffic)
1789             self.assertEqual(captured[IPv6].dst,
1790                              packet[IPv6].dst)
1791
1792
1793 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1794 class BFDTunTestCase(VppTestCase):
1795     """ BFD over GRE tunnel """
1796
1797     vpp_session = None
1798     test_session = None
1799
1800     @classmethod
1801     def setUpClass(cls):
1802         super(BFDTunTestCase, cls).setUpClass()
1803
1804     @classmethod
1805     def tearDownClass(cls):
1806         super(BFDTunTestCase, cls).tearDownClass()
1807
1808     def setUp(self):
1809         super(BFDTunTestCase, self).setUp()
1810         self.create_pg_interfaces(range(1))
1811
1812         self.vapi.want_bfd_events()
1813         self.pg0.enable_capture()
1814
1815         for i in self.pg_interfaces:
1816             i.admin_up()
1817             i.config_ip4()
1818             i.resolve_arp()
1819
1820     def tearDown(self):
1821         if not self.vpp_dead:
1822             self.vapi.want_bfd_events(enable_disable=0)
1823
1824         super(BFDTunTestCase, self).tearDown()
1825
1826     @staticmethod
1827     def pkt_is_not_data_traffic(p):
1828         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1829         if p.haslayer(BFD) or is_ipv6_misc(p):
1830             return True
1831         return False
1832
1833     def test_bfd_o_gre(self):
1834         """ BFD-o-GRE  """
1835
1836         # A GRE interface over which to run a BFD session
1837         gre_if = VppGreInterface(self,
1838                                  self.pg0.local_ip4,
1839                                  self.pg0.remote_ip4)
1840         gre_if.add_vpp_config()
1841         gre_if.admin_up()
1842         gre_if.config_ip4()
1843
1844         # bring the session up now the routes are present
1845         self.vpp_session = VppBFDUDPSession(self,
1846                                             gre_if,
1847                                             gre_if.remote_ip4,
1848                                             is_tunnel=True)
1849         self.vpp_session.add_vpp_config()
1850         self.vpp_session.admin_up()
1851
1852         self.test_session = BFDTestSession(
1853             self, gre_if, AF_INET,
1854             tunnel_header=(IP(src=self.pg0.remote_ip4,
1855                               dst=self.pg0.local_ip4) /
1856                            GRE()),
1857             phy_interface=self.pg0)
1858
1859         # packets to match against both of the routes
1860         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1861               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1862               UDP(sport=1234, dport=1234) /
1863               Raw(b'\xa5' * 100))]
1864
1865         # session is up - traffic passes
1866         bfd_session_up(self)
1867
1868         self.send_and_expect(self.pg0, p, self.pg0)
1869
1870         # bring session down
1871         bfd_session_down(self)
1872
1873
1874 class BFDSHA1TestCase(VppTestCase):
1875     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1876
1877     pg0 = None
1878     vpp_clock_offset = None
1879     vpp_session = None
1880     test_session = None
1881
1882     @classmethod
1883     def setUpClass(cls):
1884         super(BFDSHA1TestCase, cls).setUpClass()
1885         cls.vapi.cli("set log class bfd level debug")
1886         try:
1887             cls.create_pg_interfaces([0])
1888             cls.pg0.config_ip4()
1889             cls.pg0.admin_up()
1890             cls.pg0.resolve_arp()
1891
1892         except Exception:
1893             super(BFDSHA1TestCase, cls).tearDownClass()
1894             raise
1895
1896     @classmethod
1897     def tearDownClass(cls):
1898         super(BFDSHA1TestCase, cls).tearDownClass()
1899
1900     def setUp(self):
1901         super(BFDSHA1TestCase, self).setUp()
1902         self.factory = AuthKeyFactory()
1903         self.vapi.want_bfd_events()
1904         self.pg0.enable_capture()
1905
1906     def tearDown(self):
1907         if not self.vpp_dead:
1908             self.vapi.want_bfd_events(enable_disable=False)
1909         self.vapi.collect_events()  # clear the event queue
1910         super(BFDSHA1TestCase, self).tearDown()
1911
1912     def test_session_up(self):
1913         """ bring BFD session up """
1914         key = self.factory.create_random_key(self)
1915         key.add_vpp_config()
1916         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1917                                             self.pg0.remote_ip4,
1918                                             sha1_key=key)
1919         self.vpp_session.add_vpp_config()
1920         self.vpp_session.admin_up()
1921         self.test_session = BFDTestSession(
1922             self, self.pg0, AF_INET, sha1_key=key,
1923             bfd_key_id=self.vpp_session.bfd_key_id)
1924         bfd_session_up(self)
1925
1926     def test_hold_up(self):
1927         """ hold BFD session up """
1928         key = self.factory.create_random_key(self)
1929         key.add_vpp_config()
1930         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1931                                             self.pg0.remote_ip4,
1932                                             sha1_key=key)
1933         self.vpp_session.add_vpp_config()
1934         self.vpp_session.admin_up()
1935         self.test_session = BFDTestSession(
1936             self, self.pg0, AF_INET, sha1_key=key,
1937             bfd_key_id=self.vpp_session.bfd_key_id)
1938         bfd_session_up(self)
1939         for dummy in range(self.test_session.detect_mult * 2):
1940             wait_for_bfd_packet(self)
1941             self.test_session.send_packet()
1942         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1943
1944     def test_hold_up_meticulous(self):
1945         """ hold BFD session up - meticulous auth """
1946         key = self.factory.create_random_key(
1947             self, BFDAuthType.meticulous_keyed_sha1)
1948         key.add_vpp_config()
1949         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1950                                             self.pg0.remote_ip4, sha1_key=key)
1951         self.vpp_session.add_vpp_config()
1952         self.vpp_session.admin_up()
1953         # specify sequence number so that it wraps
1954         self.test_session = BFDTestSession(
1955             self, self.pg0, AF_INET, sha1_key=key,
1956             bfd_key_id=self.vpp_session.bfd_key_id,
1957             our_seq_number=0xFFFFFFFF - 4)
1958         bfd_session_up(self)
1959         for dummy in range(30):
1960             wait_for_bfd_packet(self)
1961             self.test_session.inc_seq_num()
1962             self.test_session.send_packet()
1963         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1964
1965     def test_send_bad_seq_number(self):
1966         """ session is not kept alive by msgs with bad sequence numbers"""
1967         key = self.factory.create_random_key(
1968             self, BFDAuthType.meticulous_keyed_sha1)
1969         key.add_vpp_config()
1970         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1971                                             self.pg0.remote_ip4, sha1_key=key)
1972         self.vpp_session.add_vpp_config()
1973         self.test_session = BFDTestSession(
1974             self, self.pg0, AF_INET, sha1_key=key,
1975             bfd_key_id=self.vpp_session.bfd_key_id)
1976         bfd_session_up(self)
1977         detection_time = self.test_session.detect_mult *\
1978             self.vpp_session.required_min_rx / USEC_IN_SEC
1979         send_until = time.time() + 2 * detection_time
1980         while time.time() < send_until:
1981             self.test_session.send_packet()
1982             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1983                        "time between bfd packets")
1984         e = self.vapi.collect_events()
1985         # session should be down now, because the sequence numbers weren't
1986         # updated
1987         self.assert_equal(len(e), 1, "number of bfd events")
1988         verify_event(self, e[0], expected_state=BFDState.down)
1989
1990     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1991                                        legitimate_test_session,
1992                                        rogue_test_session,
1993                                        rogue_bfd_values=None):
1994         """ execute a rogue session interaction scenario
1995
1996         1. create vpp session, add config
1997         2. bring the legitimate session up
1998         3. copy the bfd values from legitimate session to rogue session
1999         4. apply rogue_bfd_values to rogue session
2000         5. set rogue session state to down
2001         6. send message to take the session down from the rogue session
2002         7. assert that the legitimate session is unaffected
2003         """
2004
2005         self.vpp_session = vpp_bfd_udp_session
2006         self.vpp_session.add_vpp_config()
2007         self.test_session = legitimate_test_session
2008         # bring vpp session up
2009         bfd_session_up(self)
2010         # send packet from rogue session
2011         rogue_test_session.update(
2012             my_discriminator=self.test_session.my_discriminator,
2013             your_discriminator=self.test_session.your_discriminator,
2014             desired_min_tx=self.test_session.desired_min_tx,
2015             required_min_rx=self.test_session.required_min_rx,
2016             detect_mult=self.test_session.detect_mult,
2017             diag=self.test_session.diag,
2018             state=self.test_session.state,
2019             auth_type=self.test_session.auth_type)
2020         if rogue_bfd_values:
2021             rogue_test_session.update(**rogue_bfd_values)
2022         rogue_test_session.update(state=BFDState.down)
2023         rogue_test_session.send_packet()
2024         wait_for_bfd_packet(self)
2025         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2026
2027     def test_mismatch_auth(self):
2028         """ session is not brought down by unauthenticated msg """
2029         key = self.factory.create_random_key(self)
2030         key.add_vpp_config()
2031         vpp_session = VppBFDUDPSession(
2032             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2033         legitimate_test_session = BFDTestSession(
2034             self, self.pg0, AF_INET, sha1_key=key,
2035             bfd_key_id=vpp_session.bfd_key_id)
2036         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2037         self.execute_rogue_session_scenario(vpp_session,
2038                                             legitimate_test_session,
2039                                             rogue_test_session)
2040
2041     def test_mismatch_bfd_key_id(self):
2042         """ session is not brought down by msg with non-existent key-id """
2043         key = self.factory.create_random_key(self)
2044         key.add_vpp_config()
2045         vpp_session = VppBFDUDPSession(
2046             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2047         # pick a different random bfd key id
2048         x = randint(0, 255)
2049         while x == vpp_session.bfd_key_id:
2050             x = randint(0, 255)
2051         legitimate_test_session = BFDTestSession(
2052             self, self.pg0, AF_INET, sha1_key=key,
2053             bfd_key_id=vpp_session.bfd_key_id)
2054         rogue_test_session = BFDTestSession(
2055             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2056         self.execute_rogue_session_scenario(vpp_session,
2057                                             legitimate_test_session,
2058                                             rogue_test_session)
2059
2060     def test_mismatched_auth_type(self):
2061         """ session is not brought down by msg with wrong auth type """
2062         key = self.factory.create_random_key(self)
2063         key.add_vpp_config()
2064         vpp_session = VppBFDUDPSession(
2065             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2066         legitimate_test_session = BFDTestSession(
2067             self, self.pg0, AF_INET, sha1_key=key,
2068             bfd_key_id=vpp_session.bfd_key_id)
2069         rogue_test_session = BFDTestSession(
2070             self, self.pg0, AF_INET, sha1_key=key,
2071             bfd_key_id=vpp_session.bfd_key_id)
2072         self.execute_rogue_session_scenario(
2073             vpp_session, legitimate_test_session, rogue_test_session,
2074             {'auth_type': BFDAuthType.keyed_md5})
2075
2076     def test_restart(self):
2077         """ simulate remote peer restart and resynchronization """
2078         key = self.factory.create_random_key(
2079             self, BFDAuthType.meticulous_keyed_sha1)
2080         key.add_vpp_config()
2081         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2082                                             self.pg0.remote_ip4, sha1_key=key)
2083         self.vpp_session.add_vpp_config()
2084         self.test_session = BFDTestSession(
2085             self, self.pg0, AF_INET, sha1_key=key,
2086             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2087         bfd_session_up(self)
2088         # don't send any packets for 2*detection_time
2089         detection_time = self.test_session.detect_mult *\
2090             self.vpp_session.required_min_rx / USEC_IN_SEC
2091         self.sleep(2 * detection_time, "simulating peer restart")
2092         events = self.vapi.collect_events()
2093         self.assert_equal(len(events), 1, "number of bfd events")
2094         verify_event(self, events[0], expected_state=BFDState.down)
2095         self.test_session.update(state=BFDState.down)
2096         # reset sequence number
2097         self.test_session.our_seq_number = 0
2098         self.test_session.vpp_seq_number = None
2099         # now throw away any pending packets
2100         self.pg0.enable_capture()
2101         bfd_session_up(self)
2102
2103
2104 class BFDAuthOnOffTestCase(VppTestCase):
2105     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2106
2107     pg0 = None
2108     vpp_session = None
2109     test_session = None
2110
2111     @classmethod
2112     def setUpClass(cls):
2113         super(BFDAuthOnOffTestCase, cls).setUpClass()
2114         cls.vapi.cli("set log class bfd level debug")
2115         try:
2116             cls.create_pg_interfaces([0])
2117             cls.pg0.config_ip4()
2118             cls.pg0.admin_up()
2119             cls.pg0.resolve_arp()
2120
2121         except Exception:
2122             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2123             raise
2124
2125     @classmethod
2126     def tearDownClass(cls):
2127         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2128
2129     def setUp(self):
2130         super(BFDAuthOnOffTestCase, self).setUp()
2131         self.factory = AuthKeyFactory()
2132         self.vapi.want_bfd_events()
2133         self.pg0.enable_capture()
2134
2135     def tearDown(self):
2136         if not self.vpp_dead:
2137             self.vapi.want_bfd_events(enable_disable=False)
2138         self.vapi.collect_events()  # clear the event queue
2139         super(BFDAuthOnOffTestCase, self).tearDown()
2140
2141     def test_auth_on_immediate(self):
2142         """ turn auth on without disturbing session state (immediate) """
2143         key = self.factory.create_random_key(self)
2144         key.add_vpp_config()
2145         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2146                                             self.pg0.remote_ip4)
2147         self.vpp_session.add_vpp_config()
2148         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2149         bfd_session_up(self)
2150         for dummy in range(self.test_session.detect_mult * 2):
2151             p = wait_for_bfd_packet(self)
2152             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2153             self.test_session.send_packet()
2154         self.vpp_session.activate_auth(key)
2155         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2156         self.test_session.sha1_key = key
2157         for dummy in range(self.test_session.detect_mult * 2):
2158             p = wait_for_bfd_packet(self)
2159             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2160             self.test_session.send_packet()
2161         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2162         self.assert_equal(len(self.vapi.collect_events()), 0,
2163                           "number of bfd events")
2164
2165     def test_auth_off_immediate(self):
2166         """ turn auth off without disturbing session state (immediate) """
2167         key = self.factory.create_random_key(self)
2168         key.add_vpp_config()
2169         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2170                                             self.pg0.remote_ip4, sha1_key=key)
2171         self.vpp_session.add_vpp_config()
2172         self.test_session = BFDTestSession(
2173             self, self.pg0, AF_INET, sha1_key=key,
2174             bfd_key_id=self.vpp_session.bfd_key_id)
2175         bfd_session_up(self)
2176         # self.vapi.want_bfd_events(enable_disable=0)
2177         for dummy in range(self.test_session.detect_mult * 2):
2178             p = wait_for_bfd_packet(self)
2179             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2180             self.test_session.inc_seq_num()
2181             self.test_session.send_packet()
2182         self.vpp_session.deactivate_auth()
2183         self.test_session.bfd_key_id = None
2184         self.test_session.sha1_key = None
2185         for dummy in range(self.test_session.detect_mult * 2):
2186             p = wait_for_bfd_packet(self)
2187             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2188             self.test_session.inc_seq_num()
2189             self.test_session.send_packet()
2190         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2191         self.assert_equal(len(self.vapi.collect_events()), 0,
2192                           "number of bfd events")
2193
2194     def test_auth_change_key_immediate(self):
2195         """ change auth key without disturbing session state (immediate) """
2196         key1 = self.factory.create_random_key(self)
2197         key1.add_vpp_config()
2198         key2 = self.factory.create_random_key(self)
2199         key2.add_vpp_config()
2200         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2201                                             self.pg0.remote_ip4, sha1_key=key1)
2202         self.vpp_session.add_vpp_config()
2203         self.test_session = BFDTestSession(
2204             self, self.pg0, AF_INET, sha1_key=key1,
2205             bfd_key_id=self.vpp_session.bfd_key_id)
2206         bfd_session_up(self)
2207         for dummy in range(self.test_session.detect_mult * 2):
2208             p = wait_for_bfd_packet(self)
2209             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2210             self.test_session.send_packet()
2211         self.vpp_session.activate_auth(key2)
2212         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2213         self.test_session.sha1_key = key2
2214         for dummy in range(self.test_session.detect_mult * 2):
2215             p = wait_for_bfd_packet(self)
2216             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2217             self.test_session.send_packet()
2218         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2219         self.assert_equal(len(self.vapi.collect_events()), 0,
2220                           "number of bfd events")
2221
2222     def test_auth_on_delayed(self):
2223         """ turn auth on without disturbing session state (delayed) """
2224         key = self.factory.create_random_key(self)
2225         key.add_vpp_config()
2226         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2227                                             self.pg0.remote_ip4)
2228         self.vpp_session.add_vpp_config()
2229         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2230         bfd_session_up(self)
2231         for dummy in range(self.test_session.detect_mult * 2):
2232             wait_for_bfd_packet(self)
2233             self.test_session.send_packet()
2234         self.vpp_session.activate_auth(key, delayed=True)
2235         for dummy in range(self.test_session.detect_mult * 2):
2236             p = wait_for_bfd_packet(self)
2237             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2238             self.test_session.send_packet()
2239         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2240         self.test_session.sha1_key = key
2241         self.test_session.send_packet()
2242         for dummy in range(self.test_session.detect_mult * 2):
2243             p = wait_for_bfd_packet(self)
2244             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2245             self.test_session.send_packet()
2246         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2247         self.assert_equal(len(self.vapi.collect_events()), 0,
2248                           "number of bfd events")
2249
2250     def test_auth_off_delayed(self):
2251         """ turn auth off without disturbing session state (delayed) """
2252         key = self.factory.create_random_key(self)
2253         key.add_vpp_config()
2254         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2255                                             self.pg0.remote_ip4, sha1_key=key)
2256         self.vpp_session.add_vpp_config()
2257         self.test_session = BFDTestSession(
2258             self, self.pg0, AF_INET, sha1_key=key,
2259             bfd_key_id=self.vpp_session.bfd_key_id)
2260         bfd_session_up(self)
2261         for dummy in range(self.test_session.detect_mult * 2):
2262             p = wait_for_bfd_packet(self)
2263             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2264             self.test_session.send_packet()
2265         self.vpp_session.deactivate_auth(delayed=True)
2266         for dummy in range(self.test_session.detect_mult * 2):
2267             p = wait_for_bfd_packet(self)
2268             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2269             self.test_session.send_packet()
2270         self.test_session.bfd_key_id = None
2271         self.test_session.sha1_key = None
2272         self.test_session.send_packet()
2273         for dummy in range(self.test_session.detect_mult * 2):
2274             p = wait_for_bfd_packet(self)
2275             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2276             self.test_session.send_packet()
2277         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2278         self.assert_equal(len(self.vapi.collect_events()), 0,
2279                           "number of bfd events")
2280
2281     def test_auth_change_key_delayed(self):
2282         """ change auth key without disturbing session state (delayed) """
2283         key1 = self.factory.create_random_key(self)
2284         key1.add_vpp_config()
2285         key2 = self.factory.create_random_key(self)
2286         key2.add_vpp_config()
2287         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2288                                             self.pg0.remote_ip4, sha1_key=key1)
2289         self.vpp_session.add_vpp_config()
2290         self.vpp_session.admin_up()
2291         self.test_session = BFDTestSession(
2292             self, self.pg0, AF_INET, sha1_key=key1,
2293             bfd_key_id=self.vpp_session.bfd_key_id)
2294         bfd_session_up(self)
2295         for dummy in range(self.test_session.detect_mult * 2):
2296             p = wait_for_bfd_packet(self)
2297             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2298             self.test_session.send_packet()
2299         self.vpp_session.activate_auth(key2, delayed=True)
2300         for dummy in range(self.test_session.detect_mult * 2):
2301             p = wait_for_bfd_packet(self)
2302             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2303             self.test_session.send_packet()
2304         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2305         self.test_session.sha1_key = key2
2306         self.test_session.send_packet()
2307         for dummy in range(self.test_session.detect_mult * 2):
2308             p = wait_for_bfd_packet(self)
2309             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2310             self.test_session.send_packet()
2311         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2312         self.assert_equal(len(self.vapi.collect_events()), 0,
2313                           "number of bfd events")
2314
2315
2316 class BFDCLITestCase(VppTestCase):
2317     """Bidirectional Forwarding Detection (BFD) (CLI) """
2318     pg0 = None
2319
2320     @classmethod
2321     def setUpClass(cls):
2322         super(BFDCLITestCase, cls).setUpClass()
2323         cls.vapi.cli("set log class bfd level debug")
2324         try:
2325             cls.create_pg_interfaces((0,))
2326             cls.pg0.config_ip4()
2327             cls.pg0.config_ip6()
2328             cls.pg0.resolve_arp()
2329             cls.pg0.resolve_ndp()
2330
2331         except Exception:
2332             super(BFDCLITestCase, cls).tearDownClass()
2333             raise
2334
2335     @classmethod
2336     def tearDownClass(cls):
2337         super(BFDCLITestCase, cls).tearDownClass()
2338
2339     def setUp(self):
2340         super(BFDCLITestCase, self).setUp()
2341         self.factory = AuthKeyFactory()
2342         self.pg0.enable_capture()
2343
2344     def tearDown(self):
2345         try:
2346             self.vapi.want_bfd_events(enable_disable=False)
2347         except UnexpectedApiReturnValueError:
2348             # some tests aren't subscribed, so this is not an issue
2349             pass
2350         self.vapi.collect_events()  # clear the event queue
2351         super(BFDCLITestCase, self).tearDown()
2352
2353     def cli_verify_no_response(self, cli):
2354         """ execute a CLI, asserting that the response is empty """
2355         self.assert_equal(self.vapi.cli(cli),
2356                           "",
2357                           "CLI command response")
2358
2359     def cli_verify_response(self, cli, expected):
2360         """ execute a CLI, asserting that the response matches expectation """
2361         try:
2362             reply = self.vapi.cli(cli)
2363         except CliFailedCommandError as cli_error:
2364             reply = str(cli_error)
2365         self.assert_equal(reply.strip(),
2366                           expected,
2367                           "CLI command response")
2368
2369     def test_show(self):
2370         """ show commands """
2371         k1 = self.factory.create_random_key(self)
2372         k1.add_vpp_config()
2373         k2 = self.factory.create_random_key(
2374             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2375         k2.add_vpp_config()
2376         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2377         s1.add_vpp_config()
2378         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2379                               sha1_key=k2)
2380         s2.add_vpp_config()
2381         self.logger.info(self.vapi.ppcli("show bfd keys"))
2382         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2383         self.logger.info(self.vapi.ppcli("show bfd"))
2384
2385     def test_set_del_sha1_key(self):
2386         """ set/delete SHA1 auth key """
2387         k = self.factory.create_random_key(self)
2388         self.registry.register(k, self.logger)
2389         self.cli_verify_no_response(
2390             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2391             (k.conf_key_id,
2392                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2393         self.assertTrue(k.query_vpp_config())
2394         self.vpp_session = VppBFDUDPSession(
2395             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2396         self.vpp_session.add_vpp_config()
2397         self.test_session = \
2398             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2399                            bfd_key_id=self.vpp_session.bfd_key_id)
2400         self.vapi.want_bfd_events()
2401         bfd_session_up(self)
2402         bfd_session_down(self)
2403         # try to replace the secret for the key - should fail because the key
2404         # is in-use
2405         k2 = self.factory.create_random_key(self)
2406         self.cli_verify_response(
2407             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2408             (k.conf_key_id,
2409                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2410             "bfd key set: `bfd_auth_set_key' API call failed, "
2411             "rv=-103:BFD object in use")
2412         # manipulating the session using old secret should still work
2413         bfd_session_up(self)
2414         bfd_session_down(self)
2415         self.vpp_session.remove_vpp_config()
2416         self.cli_verify_no_response(
2417             "bfd key del conf-key-id %s" % k.conf_key_id)
2418         self.assertFalse(k.query_vpp_config())
2419
2420     def test_set_del_meticulous_sha1_key(self):
2421         """ set/delete meticulous SHA1 auth key """
2422         k = self.factory.create_random_key(
2423             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2424         self.registry.register(k, self.logger)
2425         self.cli_verify_no_response(
2426             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2427             (k.conf_key_id,
2428                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2429         self.assertTrue(k.query_vpp_config())
2430         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2431                                             self.pg0.remote_ip6, af=AF_INET6,
2432                                             sha1_key=k)
2433         self.vpp_session.add_vpp_config()
2434         self.vpp_session.admin_up()
2435         self.test_session = \
2436             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2437                            bfd_key_id=self.vpp_session.bfd_key_id)
2438         self.vapi.want_bfd_events()
2439         bfd_session_up(self)
2440         bfd_session_down(self)
2441         # try to replace the secret for the key - should fail because the key
2442         # is in-use
2443         k2 = self.factory.create_random_key(self)
2444         self.cli_verify_response(
2445             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2446             (k.conf_key_id,
2447                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2448             "bfd key set: `bfd_auth_set_key' API call failed, "
2449             "rv=-103:BFD object in use")
2450         # manipulating the session using old secret should still work
2451         bfd_session_up(self)
2452         bfd_session_down(self)
2453         self.vpp_session.remove_vpp_config()
2454         self.cli_verify_no_response(
2455             "bfd key del conf-key-id %s" % k.conf_key_id)
2456         self.assertFalse(k.query_vpp_config())
2457
2458     def test_add_mod_del_bfd_udp(self):
2459         """ create/modify/delete IPv4 BFD UDP session """
2460         vpp_session = VppBFDUDPSession(
2461             self, self.pg0, self.pg0.remote_ip4)
2462         self.registry.register(vpp_session, self.logger)
2463         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2464             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2465             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2466                                 self.pg0.remote_ip4,
2467                                 vpp_session.desired_min_tx,
2468                                 vpp_session.required_min_rx,
2469                                 vpp_session.detect_mult)
2470         self.cli_verify_no_response(cli_add_cmd)
2471         # 2nd add should fail
2472         self.cli_verify_response(
2473             cli_add_cmd,
2474             "bfd udp session add: `bfd_add_add_session' API call"
2475             " failed, rv=-101:Duplicate BFD object")
2476         verify_bfd_session_config(self, vpp_session)
2477         mod_session = VppBFDUDPSession(
2478             self, self.pg0, self.pg0.remote_ip4,
2479             required_min_rx=2 * vpp_session.required_min_rx,
2480             desired_min_tx=3 * vpp_session.desired_min_tx,
2481             detect_mult=4 * vpp_session.detect_mult)
2482         self.cli_verify_no_response(
2483             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2484             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2485             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2486              mod_session.desired_min_tx, mod_session.required_min_rx,
2487              mod_session.detect_mult))
2488         verify_bfd_session_config(self, mod_session)
2489         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2490             "peer-addr %s" % (self.pg0.name,
2491                               self.pg0.local_ip4, self.pg0.remote_ip4)
2492         self.cli_verify_no_response(cli_del_cmd)
2493         # 2nd del is expected to fail
2494         self.cli_verify_response(
2495             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2496             " failed, rv=-102:No such BFD object")
2497         self.assertFalse(vpp_session.query_vpp_config())
2498
2499     def test_add_mod_del_bfd_udp6(self):
2500         """ create/modify/delete IPv6 BFD UDP session """
2501         vpp_session = VppBFDUDPSession(
2502             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2503         self.registry.register(vpp_session, self.logger)
2504         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2505             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2506             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2507                                 self.pg0.remote_ip6,
2508                                 vpp_session.desired_min_tx,
2509                                 vpp_session.required_min_rx,
2510                                 vpp_session.detect_mult)
2511         self.cli_verify_no_response(cli_add_cmd)
2512         # 2nd add should fail
2513         self.cli_verify_response(
2514             cli_add_cmd,
2515             "bfd udp session add: `bfd_add_add_session' API call"
2516             " failed, rv=-101:Duplicate BFD object")
2517         verify_bfd_session_config(self, vpp_session)
2518         mod_session = VppBFDUDPSession(
2519             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2520             required_min_rx=2 * vpp_session.required_min_rx,
2521             desired_min_tx=3 * vpp_session.desired_min_tx,
2522             detect_mult=4 * vpp_session.detect_mult)
2523         self.cli_verify_no_response(
2524             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2525             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2526             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2527              mod_session.desired_min_tx,
2528              mod_session.required_min_rx, mod_session.detect_mult))
2529         verify_bfd_session_config(self, mod_session)
2530         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2531             "peer-addr %s" % (self.pg0.name,
2532                               self.pg0.local_ip6, self.pg0.remote_ip6)
2533         self.cli_verify_no_response(cli_del_cmd)
2534         # 2nd del is expected to fail
2535         self.cli_verify_response(
2536             cli_del_cmd,
2537             "bfd udp session del: `bfd_udp_del_session' API call"
2538             " failed, rv=-102:No such BFD object")
2539         self.assertFalse(vpp_session.query_vpp_config())
2540
2541     def test_add_mod_del_bfd_udp_auth(self):
2542         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2543         key = self.factory.create_random_key(self)
2544         key.add_vpp_config()
2545         vpp_session = VppBFDUDPSession(
2546             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2547         self.registry.register(vpp_session, self.logger)
2548         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2549             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2550             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2551             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2552                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2553                vpp_session.detect_mult, key.conf_key_id,
2554                vpp_session.bfd_key_id)
2555         self.cli_verify_no_response(cli_add_cmd)
2556         # 2nd add should fail
2557         self.cli_verify_response(
2558             cli_add_cmd,
2559             "bfd udp session add: `bfd_add_add_session' API call"
2560             " failed, rv=-101:Duplicate BFD object")
2561         verify_bfd_session_config(self, vpp_session)
2562         mod_session = VppBFDUDPSession(
2563             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2564             bfd_key_id=vpp_session.bfd_key_id,
2565             required_min_rx=2 * vpp_session.required_min_rx,
2566             desired_min_tx=3 * vpp_session.desired_min_tx,
2567             detect_mult=4 * vpp_session.detect_mult)
2568         self.cli_verify_no_response(
2569             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2570             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2571             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2572              mod_session.desired_min_tx,
2573              mod_session.required_min_rx, mod_session.detect_mult))
2574         verify_bfd_session_config(self, mod_session)
2575         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2576             "peer-addr %s" % (self.pg0.name,
2577                               self.pg0.local_ip4, self.pg0.remote_ip4)
2578         self.cli_verify_no_response(cli_del_cmd)
2579         # 2nd del is expected to fail
2580         self.cli_verify_response(
2581             cli_del_cmd,
2582             "bfd udp session del: `bfd_udp_del_session' API call"
2583             " failed, rv=-102:No such BFD object")
2584         self.assertFalse(vpp_session.query_vpp_config())
2585
2586     def test_add_mod_del_bfd_udp6_auth(self):
2587         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2588         key = self.factory.create_random_key(
2589             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2590         key.add_vpp_config()
2591         vpp_session = VppBFDUDPSession(
2592             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2593         self.registry.register(vpp_session, self.logger)
2594         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2595             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2596             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2597             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2598                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2599                vpp_session.detect_mult, key.conf_key_id,
2600                vpp_session.bfd_key_id)
2601         self.cli_verify_no_response(cli_add_cmd)
2602         # 2nd add should fail
2603         self.cli_verify_response(
2604             cli_add_cmd,
2605             "bfd udp session add: `bfd_add_add_session' API call"
2606             " failed, rv=-101:Duplicate BFD object")
2607         verify_bfd_session_config(self, vpp_session)
2608         mod_session = VppBFDUDPSession(
2609             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2610             bfd_key_id=vpp_session.bfd_key_id,
2611             required_min_rx=2 * vpp_session.required_min_rx,
2612             desired_min_tx=3 * vpp_session.desired_min_tx,
2613             detect_mult=4 * vpp_session.detect_mult)
2614         self.cli_verify_no_response(
2615             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2616             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2617             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2618              mod_session.desired_min_tx,
2619              mod_session.required_min_rx, mod_session.detect_mult))
2620         verify_bfd_session_config(self, mod_session)
2621         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2622             "peer-addr %s" % (self.pg0.name,
2623                               self.pg0.local_ip6, self.pg0.remote_ip6)
2624         self.cli_verify_no_response(cli_del_cmd)
2625         # 2nd del is expected to fail
2626         self.cli_verify_response(
2627             cli_del_cmd,
2628             "bfd udp session del: `bfd_udp_del_session' API call"
2629             " failed, rv=-102:No such BFD object")
2630         self.assertFalse(vpp_session.query_vpp_config())
2631
2632     def test_auth_on_off(self):
2633         """ turn authentication on and off """
2634         key = self.factory.create_random_key(
2635             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2636         key.add_vpp_config()
2637         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2638         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2639                                         sha1_key=key)
2640         session.add_vpp_config()
2641         cli_activate = \
2642             "bfd udp session auth activate interface %s local-addr %s "\
2643             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2644             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2645                key.conf_key_id, auth_session.bfd_key_id)
2646         self.cli_verify_no_response(cli_activate)
2647         verify_bfd_session_config(self, auth_session)
2648         self.cli_verify_no_response(cli_activate)
2649         verify_bfd_session_config(self, auth_session)
2650         cli_deactivate = \
2651             "bfd udp session auth deactivate interface %s local-addr %s "\
2652             "peer-addr %s "\
2653             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2654         self.cli_verify_no_response(cli_deactivate)
2655         verify_bfd_session_config(self, session)
2656         self.cli_verify_no_response(cli_deactivate)
2657         verify_bfd_session_config(self, session)
2658
2659     def test_auth_on_off_delayed(self):
2660         """ turn authentication on and off (delayed) """
2661         key = self.factory.create_random_key(
2662             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2663         key.add_vpp_config()
2664         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2665         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2666                                         sha1_key=key)
2667         session.add_vpp_config()
2668         cli_activate = \
2669             "bfd udp session auth activate interface %s local-addr %s "\
2670             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2671             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2672                key.conf_key_id, auth_session.bfd_key_id)
2673         self.cli_verify_no_response(cli_activate)
2674         verify_bfd_session_config(self, auth_session)
2675         self.cli_verify_no_response(cli_activate)
2676         verify_bfd_session_config(self, auth_session)
2677         cli_deactivate = \
2678             "bfd udp session auth deactivate interface %s local-addr %s "\
2679             "peer-addr %s delayed yes"\
2680             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2681         self.cli_verify_no_response(cli_deactivate)
2682         verify_bfd_session_config(self, session)
2683         self.cli_verify_no_response(cli_deactivate)
2684         verify_bfd_session_config(self, session)
2685
2686     def test_admin_up_down(self):
2687         """ put session admin-up and admin-down """
2688         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2689         session.add_vpp_config()
2690         cli_down = \
2691             "bfd udp session set-flags admin down interface %s local-addr %s "\
2692             "peer-addr %s "\
2693             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2694         cli_up = \
2695             "bfd udp session set-flags admin up interface %s local-addr %s "\
2696             "peer-addr %s "\
2697             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2698         self.cli_verify_no_response(cli_down)
2699         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2700         self.cli_verify_no_response(cli_up)
2701         verify_bfd_session_config(self, session, state=BFDState.down)
2702
2703     def test_set_del_udp_echo_source(self):
2704         """ set/del udp echo source """
2705         self.create_loopback_interfaces(1)
2706         self.loopback0 = self.lo_interfaces[0]
2707         self.loopback0.admin_up()
2708         self.cli_verify_response("show bfd echo-source",
2709                                  "UDP echo source is not set.")
2710         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2711         self.cli_verify_no_response(cli_set)
2712         self.cli_verify_response("show bfd echo-source",
2713                                  "UDP echo source is: %s\n"
2714                                  "IPv4 address usable as echo source: none\n"
2715                                  "IPv6 address usable as echo source: none" %
2716                                  self.loopback0.name)
2717         self.loopback0.config_ip4()
2718         unpacked = unpack("!L", self.loopback0.local_ip4n)
2719         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2720         self.cli_verify_response("show bfd echo-source",
2721                                  "UDP echo source is: %s\n"
2722                                  "IPv4 address usable as echo source: %s\n"
2723                                  "IPv6 address usable as echo source: none" %
2724                                  (self.loopback0.name, echo_ip4))
2725         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2726         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2727                                             unpacked[2], unpacked[3] ^ 1))
2728         self.loopback0.config_ip6()
2729         self.cli_verify_response("show bfd echo-source",
2730                                  "UDP echo source is: %s\n"
2731                                  "IPv4 address usable as echo source: %s\n"
2732                                  "IPv6 address usable as echo source: %s" %
2733                                  (self.loopback0.name, echo_ip4, echo_ip6))
2734         cli_del = "bfd udp echo-source del"
2735         self.cli_verify_no_response(cli_del)
2736         self.cli_verify_response("show bfd echo-source",
2737                                  "UDP echo source is not set.")
2738
2739
2740 if __name__ == '__main__':
2741     unittest.main(testRunner=VppTestRunner)