6dc58b933db8dd8e9626cda2e7f943c3372478d7
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
29     CliFailedCommandError
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
33
34 USEC_IN_SEC = 1000000
35
36
37 class AuthKeyFactory(object):
38     """Factory class for creating auth keys with unique conf key ID"""
39
40     def __init__(self):
41         self._conf_key_ids = {}
42
43     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44         """ create a random key with unique conf key id """
45         conf_key_id = randint(0, 0xFFFFFFFF)
46         while conf_key_id in self._conf_key_ids:
47             conf_key_id = randint(0, 0xFFFFFFFF)
48         self._conf_key_ids[conf_key_id] = 1
49         key = scapy.compat.raw(
50             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51         return VppBFDAuthKey(test=test, auth_type=auth_type,
52                              conf_key_id=conf_key_id, key=key)
53
54
55 class BFDAPITestCase(VppTestCase):
56     """Bidirectional Forwarding Detection (BFD) - API"""
57
58     pg0 = None
59     pg1 = None
60
61     @classmethod
62     def setUpClass(cls):
63         super(BFDAPITestCase, cls).setUpClass()
64         cls.vapi.cli("set log class bfd level debug")
65         try:
66             cls.create_pg_interfaces(range(2))
67             for i in cls.pg_interfaces:
68                 i.config_ip4()
69                 i.config_ip6()
70                 i.resolve_arp()
71
72         except Exception:
73             super(BFDAPITestCase, cls).tearDownClass()
74             raise
75
76     @classmethod
77     def tearDownClass(cls):
78         super(BFDAPITestCase, cls).tearDownClass()
79
80     def setUp(self):
81         super(BFDAPITestCase, self).setUp()
82         self.factory = AuthKeyFactory()
83
84     def test_add_bfd(self):
85         """ create a BFD session """
86         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
87         session.add_vpp_config()
88         self.logger.debug("Session state is %s", session.state)
89         session.remove_vpp_config()
90         session.add_vpp_config()
91         self.logger.debug("Session state is %s", session.state)
92         session.remove_vpp_config()
93
94     def test_double_add(self):
95         """ create the same BFD session twice (negative case) """
96         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
97         session.add_vpp_config()
98
99         with self.vapi.assert_negative_api_retval():
100             session.add_vpp_config()
101
102         session.remove_vpp_config()
103
104     def test_add_bfd6(self):
105         """ create IPv6 BFD session """
106         session = VppBFDUDPSession(
107             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
108         session.add_vpp_config()
109         self.logger.debug("Session state is %s", session.state)
110         session.remove_vpp_config()
111         session.add_vpp_config()
112         self.logger.debug("Session state is %s", session.state)
113         session.remove_vpp_config()
114
115     def test_mod_bfd(self):
116         """ modify BFD session parameters """
117         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
118                                    desired_min_tx=50000,
119                                    required_min_rx=10000,
120                                    detect_mult=1)
121         session.add_vpp_config()
122         s = session.get_bfd_udp_session_dump_entry()
123         self.assert_equal(session.desired_min_tx,
124                           s.desired_min_tx,
125                           "desired min transmit interval")
126         self.assert_equal(session.required_min_rx,
127                           s.required_min_rx,
128                           "required min receive interval")
129         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
131                                   required_min_rx=session.required_min_rx * 2,
132                                   detect_mult=session.detect_mult * 2)
133         s = session.get_bfd_udp_session_dump_entry()
134         self.assert_equal(session.desired_min_tx,
135                           s.desired_min_tx,
136                           "desired min transmit interval")
137         self.assert_equal(session.required_min_rx,
138                           s.required_min_rx,
139                           "required min receive interval")
140         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
141
142     def test_add_sha1_keys(self):
143         """ add SHA1 keys """
144         key_count = 10
145         keys = [self.factory.create_random_key(
146             self) for i in range(0, key_count)]
147         for key in keys:
148             self.assertFalse(key.query_vpp_config())
149         for key in keys:
150             key.add_vpp_config()
151         for key in keys:
152             self.assertTrue(key.query_vpp_config())
153         # remove randomly
154         indexes = list(range(key_count))
155         shuffle(indexes)
156         removed = []
157         for i in indexes:
158             key = keys[i]
159             key.remove_vpp_config()
160             removed.append(i)
161             for j in range(key_count):
162                 key = keys[j]
163                 if j in removed:
164                     self.assertFalse(key.query_vpp_config())
165                 else:
166                     self.assertTrue(key.query_vpp_config())
167         # should be removed now
168         for key in keys:
169             self.assertFalse(key.query_vpp_config())
170         # add back and remove again
171         for key in keys:
172             key.add_vpp_config()
173         for key in keys:
174             self.assertTrue(key.query_vpp_config())
175         for key in keys:
176             key.remove_vpp_config()
177         for key in keys:
178             self.assertFalse(key.query_vpp_config())
179
180     def test_add_bfd_sha1(self):
181         """ create a BFD session (SHA1) """
182         key = self.factory.create_random_key(self)
183         key.add_vpp_config()
184         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185                                    sha1_key=key)
186         session.add_vpp_config()
187         self.logger.debug("Session state is %s", session.state)
188         session.remove_vpp_config()
189         session.add_vpp_config()
190         self.logger.debug("Session state is %s", session.state)
191         session.remove_vpp_config()
192
193     def test_double_add_sha1(self):
194         """ create the same BFD session twice (negative case) (SHA1) """
195         key = self.factory.create_random_key(self)
196         key.add_vpp_config()
197         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
198                                    sha1_key=key)
199         session.add_vpp_config()
200         with self.assertRaises(Exception):
201             session.add_vpp_config()
202
203     def test_add_auth_nonexistent_key(self):
204         """ create BFD session using non-existent SHA1 (negative case) """
205         session = VppBFDUDPSession(
206             self, self.pg0, self.pg0.remote_ip4,
207             sha1_key=self.factory.create_random_key(self))
208         with self.assertRaises(Exception):
209             session.add_vpp_config()
210
211     def test_shared_sha1_key(self):
212         """ share single SHA1 key between multiple BFD sessions """
213         key = self.factory.create_random_key(self)
214         key.add_vpp_config()
215         sessions = [
216             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217                              sha1_key=key),
218             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
219                              sha1_key=key, af=AF_INET6),
220             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
221                              sha1_key=key),
222             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
223                              sha1_key=key, af=AF_INET6)]
224         for s in sessions:
225             s.add_vpp_config()
226         removed = 0
227         for s in sessions:
228             e = key.get_bfd_auth_keys_dump_entry()
229             self.assert_equal(e.use_count, len(sessions) - removed,
230                               "Use count for shared key")
231             s.remove_vpp_config()
232             removed += 1
233         e = key.get_bfd_auth_keys_dump_entry()
234         self.assert_equal(e.use_count, len(sessions) - removed,
235                           "Use count for shared key")
236
237     def test_activate_auth(self):
238         """ activate SHA1 authentication """
239         key = self.factory.create_random_key(self)
240         key.add_vpp_config()
241         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242         session.add_vpp_config()
243         session.activate_auth(key)
244
245     def test_deactivate_auth(self):
246         """ deactivate SHA1 authentication """
247         key = self.factory.create_random_key(self)
248         key.add_vpp_config()
249         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
250         session.add_vpp_config()
251         session.activate_auth(key)
252         session.deactivate_auth()
253
254     def test_change_key(self):
255         """ change SHA1 key """
256         key1 = self.factory.create_random_key(self)
257         key2 = self.factory.create_random_key(self)
258         while key2.conf_key_id == key1.conf_key_id:
259             key2 = self.factory.create_random_key(self)
260         key1.add_vpp_config()
261         key2.add_vpp_config()
262         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
263                                    sha1_key=key1)
264         session.add_vpp_config()
265         session.activate_auth(key2)
266
267     def test_set_del_udp_echo_source(self):
268         """ set/del udp echo source """
269         self.create_loopback_interfaces(1)
270         self.loopback0 = self.lo_interfaces[0]
271         self.loopback0.admin_up()
272         echo_source = self.vapi.bfd_udp_get_echo_source()
273         self.assertFalse(echo_source.is_set)
274         self.assertFalse(echo_source.have_usable_ip4)
275         self.assertFalse(echo_source.have_usable_ip6)
276
277         self.vapi.bfd_udp_set_echo_source(
278             sw_if_index=self.loopback0.sw_if_index)
279         echo_source = self.vapi.bfd_udp_get_echo_source()
280         self.assertTrue(echo_source.is_set)
281         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282         self.assertFalse(echo_source.have_usable_ip4)
283         self.assertFalse(echo_source.have_usable_ip6)
284
285         self.loopback0.config_ip4()
286         unpacked = unpack("!L", self.loopback0.local_ip4n)
287         echo_ip4 = pack("!L", unpacked[0] ^ 1)
288         echo_source = self.vapi.bfd_udp_get_echo_source()
289         self.assertTrue(echo_source.is_set)
290         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
291         self.assertTrue(echo_source.have_usable_ip4)
292         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
293         self.assertFalse(echo_source.have_usable_ip6)
294
295         self.loopback0.config_ip6()
296         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
297         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
298                         unpacked[3] ^ 1)
299         echo_source = self.vapi.bfd_udp_get_echo_source()
300         self.assertTrue(echo_source.is_set)
301         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
302         self.assertTrue(echo_source.have_usable_ip4)
303         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
304         self.assertTrue(echo_source.have_usable_ip6)
305         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
306
307         self.vapi.bfd_udp_del_echo_source()
308         echo_source = self.vapi.bfd_udp_get_echo_source()
309         self.assertFalse(echo_source.is_set)
310         self.assertFalse(echo_source.have_usable_ip4)
311         self.assertFalse(echo_source.have_usable_ip6)
312
313
314 class BFDTestSession(object):
315     """ BFD session as seen from test framework side """
316
317     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
318                  bfd_key_id=None, our_seq_number=None,
319                  tunnel_header=None, phy_interface=None):
320         self.test = test
321         self.af = af
322         self.sha1_key = sha1_key
323         self.bfd_key_id = bfd_key_id
324         self.interface = interface
325         if phy_interface:
326             self.phy_interface = phy_interface
327         else:
328             self.phy_interface = self.interface
329         self.udp_sport = randint(49152, 65535)
330         if our_seq_number is None:
331             self.our_seq_number = randint(0, 40000000)
332         else:
333             self.our_seq_number = our_seq_number
334         self.vpp_seq_number = None
335         self.my_discriminator = 0
336         self.desired_min_tx = 300000
337         self.required_min_rx = 300000
338         self.required_min_echo_rx = None
339         self.detect_mult = detect_mult
340         self.diag = BFDDiagCode.no_diagnostic
341         self.your_discriminator = None
342         self.state = BFDState.down
343         self.auth_type = BFDAuthType.no_auth
344         self.tunnel_header = tunnel_header
345
346     def inc_seq_num(self):
347         """ increment sequence number, wrapping if needed """
348         if self.our_seq_number == 0xFFFFFFFF:
349             self.our_seq_number = 0
350         else:
351             self.our_seq_number += 1
352
353     def update(self, my_discriminator=None, your_discriminator=None,
354                desired_min_tx=None, required_min_rx=None,
355                required_min_echo_rx=None, detect_mult=None,
356                diag=None, state=None, auth_type=None):
357         """ update BFD parameters associated with session """
358         if my_discriminator is not None:
359             self.my_discriminator = my_discriminator
360         if your_discriminator is not None:
361             self.your_discriminator = your_discriminator
362         if required_min_rx is not None:
363             self.required_min_rx = required_min_rx
364         if required_min_echo_rx is not None:
365             self.required_min_echo_rx = required_min_echo_rx
366         if desired_min_tx is not None:
367             self.desired_min_tx = desired_min_tx
368         if detect_mult is not None:
369             self.detect_mult = detect_mult
370         if diag is not None:
371             self.diag = diag
372         if state is not None:
373             self.state = state
374         if auth_type is not None:
375             self.auth_type = auth_type
376
377     def fill_packet_fields(self, packet):
378         """ set packet fields with known values in packet """
379         bfd = packet[BFD]
380         if self.my_discriminator:
381             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
382                                    self.my_discriminator)
383             bfd.my_discriminator = self.my_discriminator
384         if self.your_discriminator:
385             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
386                                    self.your_discriminator)
387             bfd.your_discriminator = self.your_discriminator
388         if self.required_min_rx:
389             self.test.logger.debug(
390                 "BFD: setting packet.required_min_rx_interval=%s",
391                 self.required_min_rx)
392             bfd.required_min_rx_interval = self.required_min_rx
393         if self.required_min_echo_rx:
394             self.test.logger.debug(
395                 "BFD: setting packet.required_min_echo_rx=%s",
396                 self.required_min_echo_rx)
397             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
398         if self.desired_min_tx:
399             self.test.logger.debug(
400                 "BFD: setting packet.desired_min_tx_interval=%s",
401                 self.desired_min_tx)
402             bfd.desired_min_tx_interval = self.desired_min_tx
403         if self.detect_mult:
404             self.test.logger.debug(
405                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
406             bfd.detect_mult = self.detect_mult
407         if self.diag:
408             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
409             bfd.diag = self.diag
410         if self.state:
411             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
412             bfd.state = self.state
413         if self.auth_type:
414             # this is used by a negative test-case
415             self.test.logger.debug("BFD: setting packet.auth_type=%s",
416                                    self.auth_type)
417             bfd.auth_type = self.auth_type
418
419     def create_packet(self):
420         """ create a BFD packet, reflecting the current state of session """
421         if self.sha1_key:
422             bfd = BFD(flags="A")
423             bfd.auth_type = self.sha1_key.auth_type
424             bfd.auth_len = BFD.sha1_auth_len
425             bfd.auth_key_id = self.bfd_key_id
426             bfd.auth_seq_num = self.our_seq_number
427             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
428         else:
429             bfd = BFD()
430         packet = Ether(src=self.phy_interface.remote_mac,
431                        dst=self.phy_interface.local_mac)
432         if self.tunnel_header:
433             packet = packet / self.tunnel_header
434         if self.af == AF_INET6:
435             packet = (packet /
436                       IPv6(src=self.interface.remote_ip6,
437                            dst=self.interface.local_ip6,
438                            hlim=255) /
439                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
440                       bfd)
441         else:
442             packet = (packet /
443                       IP(src=self.interface.remote_ip4,
444                          dst=self.interface.local_ip4,
445                          ttl=255) /
446                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
447                       bfd)
448         self.test.logger.debug("BFD: Creating packet")
449         self.fill_packet_fields(packet)
450         if self.sha1_key:
451             hash_material = scapy.compat.raw(
452                 packet[BFD])[:32] + self.sha1_key.key + \
453                 b"\0" * (20 - len(self.sha1_key.key))
454             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
455                                    hashlib.sha1(hash_material).hexdigest())
456             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
457         return packet
458
459     def send_packet(self, packet=None, interface=None):
460         """ send packet on interface, creating the packet if needed """
461         if packet is None:
462             packet = self.create_packet()
463         if interface is None:
464             interface = self.phy_interface
465         self.test.logger.debug(ppp("Sending packet:", packet))
466         interface.add_stream(packet)
467         self.test.pg_start()
468
469     def verify_sha1_auth(self, packet):
470         """ Verify correctness of authentication in BFD layer. """
471         bfd = packet[BFD]
472         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
473         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
474                                BFDAuthType)
475         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
476         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
477         if self.vpp_seq_number is None:
478             self.vpp_seq_number = bfd.auth_seq_num
479             self.test.logger.debug("Received initial sequence number: %s" %
480                                    self.vpp_seq_number)
481         else:
482             recvd_seq_num = bfd.auth_seq_num
483             self.test.logger.debug("Received followup sequence number: %s" %
484                                    recvd_seq_num)
485             if self.vpp_seq_number < 0xffffffff:
486                 if self.sha1_key.auth_type == \
487                         BFDAuthType.meticulous_keyed_sha1:
488                     self.test.assert_equal(recvd_seq_num,
489                                            self.vpp_seq_number + 1,
490                                            "BFD sequence number")
491                 else:
492                     self.test.assert_in_range(recvd_seq_num,
493                                               self.vpp_seq_number,
494                                               self.vpp_seq_number + 1,
495                                               "BFD sequence number")
496             else:
497                 if self.sha1_key.auth_type == \
498                         BFDAuthType.meticulous_keyed_sha1:
499                     self.test.assert_equal(recvd_seq_num, 0,
500                                            "BFD sequence number")
501                 else:
502                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
503                                        "BFD sequence number not one of "
504                                        "(%s, 0)" % self.vpp_seq_number)
505             self.vpp_seq_number = recvd_seq_num
506         # last 20 bytes represent the hash - so replace them with the key,
507         # pad the result with zeros and hash the result
508         hash_material = bfd.original[:-20] + self.sha1_key.key + \
509             b"\0" * (20 - len(self.sha1_key.key))
510         expected_hash = hashlib.sha1(hash_material).hexdigest()
511         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
512                                expected_hash.encode(), "Auth key hash")
513
514     def verify_bfd(self, packet):
515         """ Verify correctness of BFD layer. """
516         bfd = packet[BFD]
517         self.test.assert_equal(bfd.version, 1, "BFD version")
518         self.test.assert_equal(bfd.your_discriminator,
519                                self.my_discriminator,
520                                "BFD - your discriminator")
521         if self.sha1_key:
522             self.verify_sha1_auth(packet)
523
524
525 def bfd_session_up(test):
526     """ Bring BFD session up """
527     test.logger.info("BFD: Waiting for slow hello")
528     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
529     old_offset = None
530     if hasattr(test, 'vpp_clock_offset'):
531         old_offset = test.vpp_clock_offset
532     test.vpp_clock_offset = time.time() - float(p.time)
533     test.logger.debug("BFD: Calculated vpp clock offset: %s",
534                       test.vpp_clock_offset)
535     if old_offset:
536         test.assertAlmostEqual(
537             old_offset, test.vpp_clock_offset, delta=0.5,
538             msg="vpp clock offset not stable (new: %s, old: %s)" %
539             (test.vpp_clock_offset, old_offset))
540     test.logger.info("BFD: Sending Init")
541     test.test_session.update(my_discriminator=randint(0, 40000000),
542                              your_discriminator=p[BFD].my_discriminator,
543                              state=BFDState.init)
544     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
545             BFDAuthType.meticulous_keyed_sha1:
546         test.test_session.inc_seq_num()
547     test.test_session.send_packet()
548     test.logger.info("BFD: Waiting for event")
549     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
550     verify_event(test, e, expected_state=BFDState.up)
551     test.logger.info("BFD: Session is Up")
552     test.test_session.update(state=BFDState.up)
553     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554             BFDAuthType.meticulous_keyed_sha1:
555         test.test_session.inc_seq_num()
556     test.test_session.send_packet()
557     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
558
559
560 def bfd_session_down(test):
561     """ Bring BFD session down """
562     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563     test.test_session.update(state=BFDState.down)
564     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
565             BFDAuthType.meticulous_keyed_sha1:
566         test.test_session.inc_seq_num()
567     test.test_session.send_packet()
568     test.logger.info("BFD: Waiting for event")
569     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
570     verify_event(test, e, expected_state=BFDState.down)
571     test.logger.info("BFD: Session is Down")
572     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
573
574
575 def verify_bfd_session_config(test, session, state=None):
576     dump = session.get_bfd_udp_session_dump_entry()
577     test.assertIsNotNone(dump)
578     # since dump is not none, we have verified that sw_if_index and addresses
579     # are valid (in get_bfd_udp_session_dump_entry)
580     if state:
581         test.assert_equal(dump.state, state, "session state")
582     test.assert_equal(dump.required_min_rx, session.required_min_rx,
583                       "required min rx interval")
584     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
585                       "desired min tx interval")
586     test.assert_equal(dump.detect_mult, session.detect_mult,
587                       "detect multiplier")
588     if session.sha1_key is None:
589         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
590     else:
591         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
592         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
593                           "bfd key id")
594         test.assert_equal(dump.conf_key_id,
595                           session.sha1_key.conf_key_id,
596                           "config key id")
597
598
599 def verify_ip(test, packet):
600     """ Verify correctness of IP layer. """
601     if test.vpp_session.af == AF_INET6:
602         ip = packet[IPv6]
603         local_ip = test.vpp_session.interface.local_ip6
604         remote_ip = test.vpp_session.interface.remote_ip6
605         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
606     else:
607         ip = packet[IP]
608         local_ip = test.vpp_session.interface.local_ip4
609         remote_ip = test.vpp_session.interface.remote_ip4
610         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
611     test.assert_equal(ip.src, local_ip, "IP source address")
612     test.assert_equal(ip.dst, remote_ip, "IP destination address")
613
614
615 def verify_udp(test, packet):
616     """ Verify correctness of UDP layer. """
617     udp = packet[UDP]
618     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
619     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
620                          "UDP source port")
621
622
623 def verify_event(test, event, expected_state):
624     """ Verify correctness of event values. """
625     e = event
626     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
627     test.assert_equal(e.sw_if_index,
628                       test.vpp_session.interface.sw_if_index,
629                       "BFD interface index")
630
631     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
632                       "Local IPv6 address")
633     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
634                       "Peer IPv6 address")
635     test.assert_equal(e.state, expected_state, BFDState)
636
637
638 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
639     """ wait for BFD packet and verify its correctness
640
641     :param timeout: how long to wait
642     :param pcap_time_min: ignore packets with pcap timestamp lower than this
643
644     :returns: tuple (packet, time spent waiting for packet)
645     """
646     test.logger.info("BFD: Waiting for BFD packet")
647     deadline = time.time() + timeout
648     counter = 0
649     while True:
650         counter += 1
651         # sanity check
652         test.assert_in_range(counter, 0, 100, "number of packets ignored")
653         time_left = deadline - time.time()
654         if time_left < 0:
655             raise CaptureTimeoutError("Packet did not arrive within timeout")
656         p = test.pg0.wait_for_packet(timeout=time_left)
657         test.logger.debug(ppp("BFD: Got packet:", p))
658         if pcap_time_min is not None and p.time < pcap_time_min:
659             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
660                                   "pcap time min %s):" %
661                                   (p.time, pcap_time_min), p))
662         else:
663             break
664     if is_tunnel:
665         # strip an IP layer and move to the next
666         p = p[IP].payload
667
668     bfd = p[BFD]
669     if bfd is None:
670         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
671     if bfd.payload:
672         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
673     verify_ip(test, p)
674     verify_udp(test, p)
675     test.test_session.verify_bfd(p)
676     return p
677
678
679 class BFD4TestCase(VppTestCase):
680     """Bidirectional Forwarding Detection (BFD)"""
681
682     pg0 = None
683     vpp_clock_offset = None
684     vpp_session = None
685     test_session = None
686
687     @classmethod
688     def setUpClass(cls):
689         super(BFD4TestCase, cls).setUpClass()
690         cls.vapi.cli("set log class bfd level debug")
691         try:
692             cls.create_pg_interfaces([0])
693             cls.create_loopback_interfaces(1)
694             cls.loopback0 = cls.lo_interfaces[0]
695             cls.loopback0.config_ip4()
696             cls.loopback0.admin_up()
697             cls.pg0.config_ip4()
698             cls.pg0.configure_ipv4_neighbors()
699             cls.pg0.admin_up()
700             cls.pg0.resolve_arp()
701
702         except Exception:
703             super(BFD4TestCase, cls).tearDownClass()
704             raise
705
706     @classmethod
707     def tearDownClass(cls):
708         super(BFD4TestCase, cls).tearDownClass()
709
710     def setUp(self):
711         super(BFD4TestCase, self).setUp()
712         self.factory = AuthKeyFactory()
713         self.vapi.want_bfd_events()
714         self.pg0.enable_capture()
715         try:
716             self.vpp_session = VppBFDUDPSession(self, self.pg0,
717                                                 self.pg0.remote_ip4)
718             self.vpp_session.add_vpp_config()
719             self.vpp_session.admin_up()
720             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
721         except BaseException:
722             self.vapi.want_bfd_events(enable_disable=0)
723             raise
724
725     def tearDown(self):
726         if not self.vpp_dead:
727             self.vapi.want_bfd_events(enable_disable=0)
728         self.vapi.collect_events()  # clear the event queue
729         super(BFD4TestCase, self).tearDown()
730
731     def test_session_up(self):
732         """ bring BFD session up """
733         bfd_session_up(self)
734
735     def test_session_up_by_ip(self):
736         """ bring BFD session up - first frame looked up by address pair """
737         self.logger.info("BFD: Sending Slow control frame")
738         self.test_session.update(my_discriminator=randint(0, 40000000))
739         self.test_session.send_packet()
740         self.pg0.enable_capture()
741         p = self.pg0.wait_for_packet(1)
742         self.assert_equal(p[BFD].your_discriminator,
743                           self.test_session.my_discriminator,
744                           "BFD - your discriminator")
745         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
746         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
747                                  state=BFDState.up)
748         self.logger.info("BFD: Waiting for event")
749         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750         verify_event(self, e, expected_state=BFDState.init)
751         self.logger.info("BFD: Sending Up")
752         self.test_session.send_packet()
753         self.logger.info("BFD: Waiting for event")
754         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755         verify_event(self, e, expected_state=BFDState.up)
756         self.logger.info("BFD: Session is Up")
757         self.test_session.update(state=BFDState.up)
758         self.test_session.send_packet()
759         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
760
761     def test_session_down(self):
762         """ bring BFD session down """
763         bfd_session_up(self)
764         bfd_session_down(self)
765
766     def test_hold_up(self):
767         """ hold BFD session up """
768         bfd_session_up(self)
769         for dummy in range(self.test_session.detect_mult * 2):
770             wait_for_bfd_packet(self)
771             self.test_session.send_packet()
772         self.assert_equal(len(self.vapi.collect_events()), 0,
773                           "number of bfd events")
774
775     def test_slow_timer(self):
776         """ verify slow periodic control frames while session down """
777         packet_count = 3
778         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
779         prev_packet = wait_for_bfd_packet(self, 2)
780         for dummy in range(packet_count):
781             next_packet = wait_for_bfd_packet(self, 2)
782             time_diff = next_packet.time - prev_packet.time
783             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
784             # to work around timing issues
785             self.assert_in_range(
786                 time_diff, 0.70, 1.05, "time between slow packets")
787             prev_packet = next_packet
788
789     def test_zero_remote_min_rx(self):
790         """ no packets when zero remote required min rx interval """
791         bfd_session_up(self)
792         self.test_session.update(required_min_rx=0)
793         self.test_session.send_packet()
794         for dummy in range(self.test_session.detect_mult):
795             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
796                        "sleep before transmitting bfd packet")
797             self.test_session.send_packet()
798             try:
799                 p = wait_for_bfd_packet(self, timeout=0)
800                 self.logger.error(ppp("Received unexpected packet:", p))
801             except CaptureTimeoutError:
802                 pass
803         self.assert_equal(
804             len(self.vapi.collect_events()), 0, "number of bfd events")
805         self.test_session.update(required_min_rx=300000)
806         for dummy in range(3):
807             self.test_session.send_packet()
808             wait_for_bfd_packet(
809                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
810         self.assert_equal(
811             len(self.vapi.collect_events()), 0, "number of bfd events")
812
813     def test_conn_down(self):
814         """ verify session goes down after inactivity """
815         bfd_session_up(self)
816         detection_time = self.test_session.detect_mult *\
817             self.vpp_session.required_min_rx / USEC_IN_SEC
818         self.sleep(detection_time, "waiting for BFD session time-out")
819         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
820         verify_event(self, e, expected_state=BFDState.down)
821
822     def test_large_required_min_rx(self):
823         """ large remote required min rx interval """
824         bfd_session_up(self)
825         p = wait_for_bfd_packet(self)
826         interval = 3000000
827         self.test_session.update(required_min_rx=interval)
828         self.test_session.send_packet()
829         time_mark = time.time()
830         count = 0
831         # busy wait here, trying to collect a packet or event, vpp is not
832         # allowed to send packets and the session will timeout first - so the
833         # Up->Down event must arrive before any packets do
834         while time.time() < time_mark + interval / USEC_IN_SEC:
835             try:
836                 p = wait_for_bfd_packet(self, timeout=0)
837                 # if vpp managed to send a packet before we did the session
838                 # session update, then that's fine, ignore it
839                 if p.time < time_mark - self.vpp_clock_offset:
840                     continue
841                 self.logger.error(ppp("Received unexpected packet:", p))
842                 count += 1
843             except CaptureTimeoutError:
844                 pass
845             events = self.vapi.collect_events()
846             if len(events) > 0:
847                 verify_event(self, events[0], BFDState.down)
848                 break
849         self.assert_equal(count, 0, "number of packets received")
850
851     def test_immediate_remote_min_rx_reduction(self):
852         """ immediately honor remote required min rx reduction """
853         self.vpp_session.remove_vpp_config()
854         self.vpp_session = VppBFDUDPSession(
855             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
856         self.pg0.enable_capture()
857         self.vpp_session.add_vpp_config()
858         self.test_session.update(desired_min_tx=1000000,
859                                  required_min_rx=1000000)
860         bfd_session_up(self)
861         reference_packet = wait_for_bfd_packet(self)
862         time_mark = time.time()
863         interval = 300000
864         self.test_session.update(required_min_rx=interval)
865         self.test_session.send_packet()
866         extra_time = time.time() - time_mark
867         p = wait_for_bfd_packet(self)
868         # first packet is allowed to be late by time we spent doing the update
869         # calculated in extra_time
870         self.assert_in_range(p.time - reference_packet.time,
871                              .95 * 0.75 * interval / USEC_IN_SEC,
872                              1.05 * interval / USEC_IN_SEC + extra_time,
873                              "time between BFD packets")
874         reference_packet = p
875         for dummy in range(3):
876             p = wait_for_bfd_packet(self)
877             diff = p.time - reference_packet.time
878             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
879                                  1.05 * interval / USEC_IN_SEC,
880                                  "time between BFD packets")
881             reference_packet = p
882
883     def test_modify_req_min_rx_double(self):
884         """ modify session - double required min rx """
885         bfd_session_up(self)
886         p = wait_for_bfd_packet(self)
887         self.test_session.update(desired_min_tx=10000,
888                                  required_min_rx=10000)
889         self.test_session.send_packet()
890         # double required min rx
891         self.vpp_session.modify_parameters(
892             required_min_rx=2 * self.vpp_session.required_min_rx)
893         p = wait_for_bfd_packet(
894             self, pcap_time_min=time.time() - self.vpp_clock_offset)
895         # poll bit needs to be set
896         self.assertIn("P", p.sprintf("%BFD.flags%"),
897                       "Poll bit not set in BFD packet")
898         # finish poll sequence with final packet
899         final = self.test_session.create_packet()
900         final[BFD].flags = "F"
901         timeout = self.test_session.detect_mult * \
902             max(self.test_session.desired_min_tx,
903                 self.vpp_session.required_min_rx) / USEC_IN_SEC
904         self.test_session.send_packet(final)
905         time_mark = time.time()
906         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
907         verify_event(self, e, expected_state=BFDState.down)
908         time_to_event = time.time() - time_mark
909         self.assert_in_range(time_to_event, .9 * timeout,
910                              1.1 * timeout, "session timeout")
911
912     def test_modify_req_min_rx_halve(self):
913         """ modify session - halve required min rx """
914         self.vpp_session.modify_parameters(
915             required_min_rx=2 * self.vpp_session.required_min_rx)
916         bfd_session_up(self)
917         p = wait_for_bfd_packet(self)
918         self.test_session.update(desired_min_tx=10000,
919                                  required_min_rx=10000)
920         self.test_session.send_packet()
921         p = wait_for_bfd_packet(
922             self, pcap_time_min=time.time() - self.vpp_clock_offset)
923         # halve required min rx
924         old_required_min_rx = self.vpp_session.required_min_rx
925         self.vpp_session.modify_parameters(
926             required_min_rx=self.vpp_session.required_min_rx // 2)
927         # now we wait 0.8*3*old-req-min-rx and the session should still be up
928         self.sleep(0.8 * self.vpp_session.detect_mult *
929                    old_required_min_rx / USEC_IN_SEC,
930                    "wait before finishing poll sequence")
931         self.assert_equal(len(self.vapi.collect_events()), 0,
932                           "number of bfd events")
933         p = wait_for_bfd_packet(self)
934         # poll bit needs to be set
935         self.assertIn("P", p.sprintf("%BFD.flags%"),
936                       "Poll bit not set in BFD packet")
937         # finish poll sequence with final packet
938         final = self.test_session.create_packet()
939         final[BFD].flags = "F"
940         self.test_session.send_packet(final)
941         # now the session should time out under new conditions
942         detection_time = self.test_session.detect_mult *\
943             self.vpp_session.required_min_rx / USEC_IN_SEC
944         before = time.time()
945         e = self.vapi.wait_for_event(
946             2 * detection_time, "bfd_udp_session_details")
947         after = time.time()
948         self.assert_in_range(after - before,
949                              0.9 * detection_time,
950                              1.1 * detection_time,
951                              "time before bfd session goes down")
952         verify_event(self, e, expected_state=BFDState.down)
953
954     def test_modify_detect_mult(self):
955         """ modify detect multiplier """
956         bfd_session_up(self)
957         p = wait_for_bfd_packet(self)
958         self.vpp_session.modify_parameters(detect_mult=1)
959         p = wait_for_bfd_packet(
960             self, pcap_time_min=time.time() - self.vpp_clock_offset)
961         self.assert_equal(self.vpp_session.detect_mult,
962                           p[BFD].detect_mult,
963                           "detect mult")
964         # poll bit must not be set
965         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
966                          "Poll bit not set in BFD packet")
967         self.vpp_session.modify_parameters(detect_mult=10)
968         p = wait_for_bfd_packet(
969             self, pcap_time_min=time.time() - self.vpp_clock_offset)
970         self.assert_equal(self.vpp_session.detect_mult,
971                           p[BFD].detect_mult,
972                           "detect mult")
973         # poll bit must not be set
974         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
975                          "Poll bit not set in BFD packet")
976
977     def test_queued_poll(self):
978         """ test poll sequence queueing """
979         bfd_session_up(self)
980         p = wait_for_bfd_packet(self)
981         self.vpp_session.modify_parameters(
982             required_min_rx=2 * self.vpp_session.required_min_rx)
983         p = wait_for_bfd_packet(self)
984         poll_sequence_start = time.time()
985         poll_sequence_length_min = 0.5
986         send_final_after = time.time() + poll_sequence_length_min
987         # poll bit needs to be set
988         self.assertIn("P", p.sprintf("%BFD.flags%"),
989                       "Poll bit not set in BFD packet")
990         self.assert_equal(p[BFD].required_min_rx_interval,
991                           self.vpp_session.required_min_rx,
992                           "BFD required min rx interval")
993         self.vpp_session.modify_parameters(
994             required_min_rx=2 * self.vpp_session.required_min_rx)
995         # 2nd poll sequence should be queued now
996         # don't send the reply back yet, wait for some time to emulate
997         # longer round-trip time
998         packet_count = 0
999         while time.time() < send_final_after:
1000             self.test_session.send_packet()
1001             p = wait_for_bfd_packet(self)
1002             self.assert_equal(len(self.vapi.collect_events()), 0,
1003                               "number of bfd events")
1004             self.assert_equal(p[BFD].required_min_rx_interval,
1005                               self.vpp_session.required_min_rx,
1006                               "BFD required min rx interval")
1007             packet_count += 1
1008             # poll bit must be set
1009             self.assertIn("P", p.sprintf("%BFD.flags%"),
1010                           "Poll bit not set in BFD packet")
1011         final = self.test_session.create_packet()
1012         final[BFD].flags = "F"
1013         self.test_session.send_packet(final)
1014         # finish 1st with final
1015         poll_sequence_length = time.time() - poll_sequence_start
1016         # vpp must wait for some time before starting new poll sequence
1017         poll_no_2_started = False
1018         for dummy in range(2 * packet_count):
1019             p = wait_for_bfd_packet(self)
1020             self.assert_equal(len(self.vapi.collect_events()), 0,
1021                               "number of bfd events")
1022             if "P" in p.sprintf("%BFD.flags%"):
1023                 poll_no_2_started = True
1024                 if time.time() < poll_sequence_start + poll_sequence_length:
1025                     raise Exception("VPP started 2nd poll sequence too soon")
1026                 final = self.test_session.create_packet()
1027                 final[BFD].flags = "F"
1028                 self.test_session.send_packet(final)
1029                 break
1030             else:
1031                 self.test_session.send_packet()
1032         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1033         # finish 2nd with final
1034         final = self.test_session.create_packet()
1035         final[BFD].flags = "F"
1036         self.test_session.send_packet(final)
1037         p = wait_for_bfd_packet(self)
1038         # poll bit must not be set
1039         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1040                          "Poll bit set in BFD packet")
1041
1042     # returning inconsistent results requiring retries in per-patch tests
1043     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1044     def test_poll_response(self):
1045         """ test correct response to control frame with poll bit set """
1046         bfd_session_up(self)
1047         poll = self.test_session.create_packet()
1048         poll[BFD].flags = "P"
1049         self.test_session.send_packet(poll)
1050         final = wait_for_bfd_packet(
1051             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1052         self.assertIn("F", final.sprintf("%BFD.flags%"))
1053
1054     def test_no_periodic_if_remote_demand(self):
1055         """ no periodic frames outside poll sequence if remote demand set """
1056         bfd_session_up(self)
1057         demand = self.test_session.create_packet()
1058         demand[BFD].flags = "D"
1059         self.test_session.send_packet(demand)
1060         transmit_time = 0.9 \
1061             * max(self.vpp_session.required_min_rx,
1062                   self.test_session.desired_min_tx) \
1063             / USEC_IN_SEC
1064         count = 0
1065         for dummy in range(self.test_session.detect_mult * 2):
1066             self.sleep(transmit_time)
1067             self.test_session.send_packet(demand)
1068             try:
1069                 p = wait_for_bfd_packet(self, timeout=0)
1070                 self.logger.error(ppp("Received unexpected packet:", p))
1071                 count += 1
1072             except CaptureTimeoutError:
1073                 pass
1074         events = self.vapi.collect_events()
1075         for e in events:
1076             self.logger.error("Received unexpected event: %s", e)
1077         self.assert_equal(count, 0, "number of packets received")
1078         self.assert_equal(len(events), 0, "number of events received")
1079
1080     def test_echo_looped_back(self):
1081         """ echo packets looped back """
1082         # don't need a session in this case..
1083         self.vpp_session.remove_vpp_config()
1084         self.pg0.enable_capture()
1085         echo_packet_count = 10
1086         # random source port low enough to increment a few times..
1087         udp_sport_tx = randint(1, 50000)
1088         udp_sport_rx = udp_sport_tx
1089         echo_packet = (Ether(src=self.pg0.remote_mac,
1090                              dst=self.pg0.local_mac) /
1091                        IP(src=self.pg0.remote_ip4,
1092                           dst=self.pg0.remote_ip4) /
1093                        UDP(dport=BFD.udp_dport_echo) /
1094                        Raw("this should be looped back"))
1095         for dummy in range(echo_packet_count):
1096             self.sleep(.01, "delay between echo packets")
1097             echo_packet[UDP].sport = udp_sport_tx
1098             udp_sport_tx += 1
1099             self.logger.debug(ppp("Sending packet:", echo_packet))
1100             self.pg0.add_stream(echo_packet)
1101             self.pg_start()
1102         for dummy in range(echo_packet_count):
1103             p = self.pg0.wait_for_packet(1)
1104             self.logger.debug(ppp("Got packet:", p))
1105             ether = p[Ether]
1106             self.assert_equal(self.pg0.remote_mac,
1107                               ether.dst, "Destination MAC")
1108             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1109             ip = p[IP]
1110             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1111             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1112             udp = p[UDP]
1113             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1114                               "UDP destination port")
1115             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1116             udp_sport_rx += 1
1117             # need to compare the hex payload here, otherwise BFD_vpp_echo
1118             # gets in way
1119             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1120                              scapy.compat.raw(echo_packet[UDP].payload),
1121                              "Received packet is not the echo packet sent")
1122         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1123                           "ECHO packet identifier for test purposes)")
1124
1125     def test_echo(self):
1126         """ echo function """
1127         bfd_session_up(self)
1128         self.test_session.update(required_min_echo_rx=150000)
1129         self.test_session.send_packet()
1130         detection_time = self.test_session.detect_mult *\
1131             self.vpp_session.required_min_rx / USEC_IN_SEC
1132         # echo shouldn't work without echo source set
1133         for dummy in range(10):
1134             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1135             self.sleep(sleep, "delay before sending bfd packet")
1136             self.test_session.send_packet()
1137         p = wait_for_bfd_packet(
1138             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1139         self.assert_equal(p[BFD].required_min_rx_interval,
1140                           self.vpp_session.required_min_rx,
1141                           "BFD required min rx interval")
1142         self.test_session.send_packet()
1143         self.vapi.bfd_udp_set_echo_source(
1144             sw_if_index=self.loopback0.sw_if_index)
1145         echo_seen = False
1146         # should be turned on - loopback echo packets
1147         for dummy in range(3):
1148             loop_until = time.time() + 0.75 * detection_time
1149             while time.time() < loop_until:
1150                 p = self.pg0.wait_for_packet(1)
1151                 self.logger.debug(ppp("Got packet:", p))
1152                 if p[UDP].dport == BFD.udp_dport_echo:
1153                     self.assert_equal(
1154                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1155                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1156                                         "BFD ECHO src IP equal to loopback IP")
1157                     self.logger.debug(ppp("Looping back packet:", p))
1158                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1159                                       "ECHO packet destination MAC address")
1160                     p[Ether].dst = self.pg0.local_mac
1161                     self.pg0.add_stream(p)
1162                     self.pg_start()
1163                     echo_seen = True
1164                 elif p.haslayer(BFD):
1165                     if echo_seen:
1166                         self.assertGreaterEqual(
1167                             p[BFD].required_min_rx_interval,
1168                             1000000)
1169                     if "P" in p.sprintf("%BFD.flags%"):
1170                         final = self.test_session.create_packet()
1171                         final[BFD].flags = "F"
1172                         self.test_session.send_packet(final)
1173                 else:
1174                     raise Exception(ppp("Received unknown packet:", p))
1175
1176                 self.assert_equal(len(self.vapi.collect_events()), 0,
1177                                   "number of bfd events")
1178             self.test_session.send_packet()
1179         self.assertTrue(echo_seen, "No echo packets received")
1180
1181     def test_echo_fail(self):
1182         """ session goes down if echo function fails """
1183         bfd_session_up(self)
1184         self.test_session.update(required_min_echo_rx=150000)
1185         self.test_session.send_packet()
1186         detection_time = self.test_session.detect_mult *\
1187             self.vpp_session.required_min_rx / USEC_IN_SEC
1188         self.vapi.bfd_udp_set_echo_source(
1189             sw_if_index=self.loopback0.sw_if_index)
1190         # echo function should be used now, but we will drop the echo packets
1191         verified_diag = False
1192         for dummy in range(3):
1193             loop_until = time.time() + 0.75 * detection_time
1194             while time.time() < loop_until:
1195                 p = self.pg0.wait_for_packet(1)
1196                 self.logger.debug(ppp("Got packet:", p))
1197                 if p[UDP].dport == BFD.udp_dport_echo:
1198                     # dropped
1199                     pass
1200                 elif p.haslayer(BFD):
1201                     if "P" in p.sprintf("%BFD.flags%"):
1202                         self.assertGreaterEqual(
1203                             p[BFD].required_min_rx_interval,
1204                             1000000)
1205                         final = self.test_session.create_packet()
1206                         final[BFD].flags = "F"
1207                         self.test_session.send_packet(final)
1208                     if p[BFD].state == BFDState.down:
1209                         self.assert_equal(p[BFD].diag,
1210                                           BFDDiagCode.echo_function_failed,
1211                                           BFDDiagCode)
1212                         verified_diag = True
1213                 else:
1214                     raise Exception(ppp("Received unknown packet:", p))
1215             self.test_session.send_packet()
1216         events = self.vapi.collect_events()
1217         self.assert_equal(len(events), 1, "number of bfd events")
1218         self.assert_equal(events[0].state, BFDState.down, BFDState)
1219         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1220
1221     def test_echo_stop(self):
1222         """ echo function stops if peer sets required min echo rx zero """
1223         bfd_session_up(self)
1224         self.test_session.update(required_min_echo_rx=150000)
1225         self.test_session.send_packet()
1226         self.vapi.bfd_udp_set_echo_source(
1227             sw_if_index=self.loopback0.sw_if_index)
1228         # wait for first echo packet
1229         while True:
1230             p = self.pg0.wait_for_packet(1)
1231             self.logger.debug(ppp("Got packet:", p))
1232             if p[UDP].dport == BFD.udp_dport_echo:
1233                 self.logger.debug(ppp("Looping back packet:", p))
1234                 p[Ether].dst = self.pg0.local_mac
1235                 self.pg0.add_stream(p)
1236                 self.pg_start()
1237                 break
1238             elif p.haslayer(BFD):
1239                 # ignore BFD
1240                 pass
1241             else:
1242                 raise Exception(ppp("Received unknown packet:", p))
1243         self.test_session.update(required_min_echo_rx=0)
1244         self.test_session.send_packet()
1245         # echo packets shouldn't arrive anymore
1246         for dummy in range(5):
1247             wait_for_bfd_packet(
1248                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1249             self.test_session.send_packet()
1250             events = self.vapi.collect_events()
1251             self.assert_equal(len(events), 0, "number of bfd events")
1252
1253     def test_echo_source_removed(self):
1254         """ echo function stops if echo source is removed """
1255         bfd_session_up(self)
1256         self.test_session.update(required_min_echo_rx=150000)
1257         self.test_session.send_packet()
1258         self.vapi.bfd_udp_set_echo_source(
1259             sw_if_index=self.loopback0.sw_if_index)
1260         # wait for first echo packet
1261         while True:
1262             p = self.pg0.wait_for_packet(1)
1263             self.logger.debug(ppp("Got packet:", p))
1264             if p[UDP].dport == BFD.udp_dport_echo:
1265                 self.logger.debug(ppp("Looping back packet:", p))
1266                 p[Ether].dst = self.pg0.local_mac
1267                 self.pg0.add_stream(p)
1268                 self.pg_start()
1269                 break
1270             elif p.haslayer(BFD):
1271                 # ignore BFD
1272                 pass
1273             else:
1274                 raise Exception(ppp("Received unknown packet:", p))
1275         self.vapi.bfd_udp_del_echo_source()
1276         self.test_session.send_packet()
1277         # echo packets shouldn't arrive anymore
1278         for dummy in range(5):
1279             wait_for_bfd_packet(
1280                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1281             self.test_session.send_packet()
1282             events = self.vapi.collect_events()
1283             self.assert_equal(len(events), 0, "number of bfd events")
1284
1285     def test_stale_echo(self):
1286         """ stale echo packets don't keep a session up """
1287         bfd_session_up(self)
1288         self.test_session.update(required_min_echo_rx=150000)
1289         self.vapi.bfd_udp_set_echo_source(
1290             sw_if_index=self.loopback0.sw_if_index)
1291         self.test_session.send_packet()
1292         # should be turned on - loopback echo packets
1293         echo_packet = None
1294         timeout_at = None
1295         timeout_ok = False
1296         for dummy in range(10 * self.vpp_session.detect_mult):
1297             p = self.pg0.wait_for_packet(1)
1298             if p[UDP].dport == BFD.udp_dport_echo:
1299                 if echo_packet is None:
1300                     self.logger.debug(ppp("Got first echo packet:", p))
1301                     echo_packet = p
1302                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1303                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1304                 else:
1305                     self.logger.debug(ppp("Got followup echo packet:", p))
1306                 self.logger.debug(ppp("Looping back first echo packet:", p))
1307                 echo_packet[Ether].dst = self.pg0.local_mac
1308                 self.pg0.add_stream(echo_packet)
1309                 self.pg_start()
1310             elif p.haslayer(BFD):
1311                 self.logger.debug(ppp("Got packet:", p))
1312                 if "P" in p.sprintf("%BFD.flags%"):
1313                     final = self.test_session.create_packet()
1314                     final[BFD].flags = "F"
1315                     self.test_session.send_packet(final)
1316                 if p[BFD].state == BFDState.down:
1317                     self.assertIsNotNone(
1318                         timeout_at,
1319                         "Session went down before first echo packet received")
1320                     now = time.time()
1321                     self.assertGreaterEqual(
1322                         now, timeout_at,
1323                         "Session timeout at %s, but is expected at %s" %
1324                         (now, timeout_at))
1325                     self.assert_equal(p[BFD].diag,
1326                                       BFDDiagCode.echo_function_failed,
1327                                       BFDDiagCode)
1328                     events = self.vapi.collect_events()
1329                     self.assert_equal(len(events), 1, "number of bfd events")
1330                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1331                     timeout_ok = True
1332                     break
1333             else:
1334                 raise Exception(ppp("Received unknown packet:", p))
1335             self.test_session.send_packet()
1336         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1337
1338     def test_invalid_echo_checksum(self):
1339         """ echo packets with invalid checksum don't keep a session up """
1340         bfd_session_up(self)
1341         self.test_session.update(required_min_echo_rx=150000)
1342         self.vapi.bfd_udp_set_echo_source(
1343             sw_if_index=self.loopback0.sw_if_index)
1344         self.test_session.send_packet()
1345         # should be turned on - loopback echo packets
1346         timeout_at = None
1347         timeout_ok = False
1348         for dummy in range(10 * self.vpp_session.detect_mult):
1349             p = self.pg0.wait_for_packet(1)
1350             if p[UDP].dport == BFD.udp_dport_echo:
1351                 self.logger.debug(ppp("Got echo packet:", p))
1352                 if timeout_at is None:
1353                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1354                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1355                 p[BFD_vpp_echo].checksum = getrandbits(64)
1356                 p[Ether].dst = self.pg0.local_mac
1357                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1358                 self.pg0.add_stream(p)
1359                 self.pg_start()
1360             elif p.haslayer(BFD):
1361                 self.logger.debug(ppp("Got packet:", p))
1362                 if "P" in p.sprintf("%BFD.flags%"):
1363                     final = self.test_session.create_packet()
1364                     final[BFD].flags = "F"
1365                     self.test_session.send_packet(final)
1366                 if p[BFD].state == BFDState.down:
1367                     self.assertIsNotNone(
1368                         timeout_at,
1369                         "Session went down before first echo packet received")
1370                     now = time.time()
1371                     self.assertGreaterEqual(
1372                         now, timeout_at,
1373                         "Session timeout at %s, but is expected at %s" %
1374                         (now, timeout_at))
1375                     self.assert_equal(p[BFD].diag,
1376                                       BFDDiagCode.echo_function_failed,
1377                                       BFDDiagCode)
1378                     events = self.vapi.collect_events()
1379                     self.assert_equal(len(events), 1, "number of bfd events")
1380                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1381                     timeout_ok = True
1382                     break
1383             else:
1384                 raise Exception(ppp("Received unknown packet:", p))
1385             self.test_session.send_packet()
1386         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1387
1388     def test_admin_up_down(self):
1389         """ put session admin-up and admin-down """
1390         bfd_session_up(self)
1391         self.vpp_session.admin_down()
1392         self.pg0.enable_capture()
1393         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1394         verify_event(self, e, expected_state=BFDState.admin_down)
1395         for dummy in range(2):
1396             p = wait_for_bfd_packet(self)
1397             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1398         # try to bring session up - shouldn't be possible
1399         self.test_session.update(state=BFDState.init)
1400         self.test_session.send_packet()
1401         for dummy in range(2):
1402             p = wait_for_bfd_packet(self)
1403             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1404         self.vpp_session.admin_up()
1405         self.test_session.update(state=BFDState.down)
1406         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1407         verify_event(self, e, expected_state=BFDState.down)
1408         p = wait_for_bfd_packet(
1409             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1410         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1411         self.test_session.send_packet()
1412         p = wait_for_bfd_packet(
1413             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1414         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1415         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1416         verify_event(self, e, expected_state=BFDState.init)
1417         self.test_session.update(state=BFDState.up)
1418         self.test_session.send_packet()
1419         p = wait_for_bfd_packet(
1420             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1421         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1422         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1423         verify_event(self, e, expected_state=BFDState.up)
1424
1425     def test_config_change_remote_demand(self):
1426         """ configuration change while peer in demand mode """
1427         bfd_session_up(self)
1428         demand = self.test_session.create_packet()
1429         demand[BFD].flags = "D"
1430         self.test_session.send_packet(demand)
1431         self.vpp_session.modify_parameters(
1432             required_min_rx=2 * self.vpp_session.required_min_rx)
1433         p = wait_for_bfd_packet(
1434             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1435         # poll bit must be set
1436         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1437         # terminate poll sequence
1438         final = self.test_session.create_packet()
1439         final[BFD].flags = "D+F"
1440         self.test_session.send_packet(final)
1441         # vpp should be quiet now again
1442         transmit_time = 0.9 \
1443             * max(self.vpp_session.required_min_rx,
1444                   self.test_session.desired_min_tx) \
1445             / USEC_IN_SEC
1446         count = 0
1447         for dummy in range(self.test_session.detect_mult * 2):
1448             self.sleep(transmit_time)
1449             self.test_session.send_packet(demand)
1450             try:
1451                 p = wait_for_bfd_packet(self, timeout=0)
1452                 self.logger.error(ppp("Received unexpected packet:", p))
1453                 count += 1
1454             except CaptureTimeoutError:
1455                 pass
1456         events = self.vapi.collect_events()
1457         for e in events:
1458             self.logger.error("Received unexpected event: %s", e)
1459         self.assert_equal(count, 0, "number of packets received")
1460         self.assert_equal(len(events), 0, "number of events received")
1461
1462     def test_intf_deleted(self):
1463         """ interface with bfd session deleted """
1464         intf = VppLoInterface(self)
1465         intf.config_ip4()
1466         intf.admin_up()
1467         sw_if_index = intf.sw_if_index
1468         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1469         vpp_session.add_vpp_config()
1470         vpp_session.admin_up()
1471         intf.remove_vpp_config()
1472         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1473         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1474         self.assertFalse(vpp_session.query_vpp_config())
1475
1476
1477 class BFD6TestCase(VppTestCase):
1478     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1479
1480     pg0 = None
1481     vpp_clock_offset = None
1482     vpp_session = None
1483     test_session = None
1484
1485     @classmethod
1486     def setUpClass(cls):
1487         super(BFD6TestCase, cls).setUpClass()
1488         cls.vapi.cli("set log class bfd level debug")
1489         try:
1490             cls.create_pg_interfaces([0])
1491             cls.pg0.config_ip6()
1492             cls.pg0.configure_ipv6_neighbors()
1493             cls.pg0.admin_up()
1494             cls.pg0.resolve_ndp()
1495             cls.create_loopback_interfaces(1)
1496             cls.loopback0 = cls.lo_interfaces[0]
1497             cls.loopback0.config_ip6()
1498             cls.loopback0.admin_up()
1499
1500         except Exception:
1501             super(BFD6TestCase, cls).tearDownClass()
1502             raise
1503
1504     @classmethod
1505     def tearDownClass(cls):
1506         super(BFD6TestCase, cls).tearDownClass()
1507
1508     def setUp(self):
1509         super(BFD6TestCase, self).setUp()
1510         self.factory = AuthKeyFactory()
1511         self.vapi.want_bfd_events()
1512         self.pg0.enable_capture()
1513         try:
1514             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1515                                                 self.pg0.remote_ip6,
1516                                                 af=AF_INET6)
1517             self.vpp_session.add_vpp_config()
1518             self.vpp_session.admin_up()
1519             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1520             self.logger.debug(self.vapi.cli("show adj nbr"))
1521         except BaseException:
1522             self.vapi.want_bfd_events(enable_disable=0)
1523             raise
1524
1525     def tearDown(self):
1526         if not self.vpp_dead:
1527             self.vapi.want_bfd_events(enable_disable=0)
1528         self.vapi.collect_events()  # clear the event queue
1529         super(BFD6TestCase, self).tearDown()
1530
1531     def test_session_up(self):
1532         """ bring BFD session up """
1533         bfd_session_up(self)
1534
1535     def test_session_up_by_ip(self):
1536         """ bring BFD session up - first frame looked up by address pair """
1537         self.logger.info("BFD: Sending Slow control frame")
1538         self.test_session.update(my_discriminator=randint(0, 40000000))
1539         self.test_session.send_packet()
1540         self.pg0.enable_capture()
1541         p = self.pg0.wait_for_packet(1)
1542         self.assert_equal(p[BFD].your_discriminator,
1543                           self.test_session.my_discriminator,
1544                           "BFD - your discriminator")
1545         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1546         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1547                                  state=BFDState.up)
1548         self.logger.info("BFD: Waiting for event")
1549         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1550         verify_event(self, e, expected_state=BFDState.init)
1551         self.logger.info("BFD: Sending Up")
1552         self.test_session.send_packet()
1553         self.logger.info("BFD: Waiting for event")
1554         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1555         verify_event(self, e, expected_state=BFDState.up)
1556         self.logger.info("BFD: Session is Up")
1557         self.test_session.update(state=BFDState.up)
1558         self.test_session.send_packet()
1559         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1560
1561     def test_hold_up(self):
1562         """ hold BFD session up """
1563         bfd_session_up(self)
1564         for dummy in range(self.test_session.detect_mult * 2):
1565             wait_for_bfd_packet(self)
1566             self.test_session.send_packet()
1567         self.assert_equal(len(self.vapi.collect_events()), 0,
1568                           "number of bfd events")
1569         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1570
1571     def test_echo_looped_back(self):
1572         """ echo packets looped back """
1573         # don't need a session in this case..
1574         self.vpp_session.remove_vpp_config()
1575         self.pg0.enable_capture()
1576         echo_packet_count = 10
1577         # random source port low enough to increment a few times..
1578         udp_sport_tx = randint(1, 50000)
1579         udp_sport_rx = udp_sport_tx
1580         echo_packet = (Ether(src=self.pg0.remote_mac,
1581                              dst=self.pg0.local_mac) /
1582                        IPv6(src=self.pg0.remote_ip6,
1583                             dst=self.pg0.remote_ip6) /
1584                        UDP(dport=BFD.udp_dport_echo) /
1585                        Raw("this should be looped back"))
1586         for dummy in range(echo_packet_count):
1587             self.sleep(.01, "delay between echo packets")
1588             echo_packet[UDP].sport = udp_sport_tx
1589             udp_sport_tx += 1
1590             self.logger.debug(ppp("Sending packet:", echo_packet))
1591             self.pg0.add_stream(echo_packet)
1592             self.pg_start()
1593         for dummy in range(echo_packet_count):
1594             p = self.pg0.wait_for_packet(1)
1595             self.logger.debug(ppp("Got packet:", p))
1596             ether = p[Ether]
1597             self.assert_equal(self.pg0.remote_mac,
1598                               ether.dst, "Destination MAC")
1599             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1600             ip = p[IPv6]
1601             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1602             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1603             udp = p[UDP]
1604             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1605                               "UDP destination port")
1606             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1607             udp_sport_rx += 1
1608             # need to compare the hex payload here, otherwise BFD_vpp_echo
1609             # gets in way
1610             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1611                              scapy.compat.raw(echo_packet[UDP].payload),
1612                              "Received packet is not the echo packet sent")
1613         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1614                           "ECHO packet identifier for test purposes)")
1615         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1616                           "ECHO packet identifier for test purposes)")
1617
1618     def test_echo(self):
1619         """ echo function """
1620         bfd_session_up(self)
1621         self.test_session.update(required_min_echo_rx=150000)
1622         self.test_session.send_packet()
1623         detection_time = self.test_session.detect_mult *\
1624             self.vpp_session.required_min_rx / USEC_IN_SEC
1625         # echo shouldn't work without echo source set
1626         for dummy in range(10):
1627             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1628             self.sleep(sleep, "delay before sending bfd packet")
1629             self.test_session.send_packet()
1630         p = wait_for_bfd_packet(
1631             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1632         self.assert_equal(p[BFD].required_min_rx_interval,
1633                           self.vpp_session.required_min_rx,
1634                           "BFD required min rx interval")
1635         self.test_session.send_packet()
1636         self.vapi.bfd_udp_set_echo_source(
1637             sw_if_index=self.loopback0.sw_if_index)
1638         echo_seen = False
1639         # should be turned on - loopback echo packets
1640         for dummy in range(3):
1641             loop_until = time.time() + 0.75 * detection_time
1642             while time.time() < loop_until:
1643                 p = self.pg0.wait_for_packet(1)
1644                 self.logger.debug(ppp("Got packet:", p))
1645                 if p[UDP].dport == BFD.udp_dport_echo:
1646                     self.assert_equal(
1647                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1648                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1649                                         "BFD ECHO src IP equal to loopback IP")
1650                     self.logger.debug(ppp("Looping back packet:", p))
1651                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1652                                       "ECHO packet destination MAC address")
1653                     p[Ether].dst = self.pg0.local_mac
1654                     self.pg0.add_stream(p)
1655                     self.pg_start()
1656                     echo_seen = True
1657                 elif p.haslayer(BFD):
1658                     if echo_seen:
1659                         self.assertGreaterEqual(
1660                             p[BFD].required_min_rx_interval,
1661                             1000000)
1662                     if "P" in p.sprintf("%BFD.flags%"):
1663                         final = self.test_session.create_packet()
1664                         final[BFD].flags = "F"
1665                         self.test_session.send_packet(final)
1666                 else:
1667                     raise Exception(ppp("Received unknown packet:", p))
1668
1669                 self.assert_equal(len(self.vapi.collect_events()), 0,
1670                                   "number of bfd events")
1671             self.test_session.send_packet()
1672         self.assertTrue(echo_seen, "No echo packets received")
1673
1674     def test_intf_deleted(self):
1675         """ interface with bfd session deleted """
1676         intf = VppLoInterface(self)
1677         intf.config_ip6()
1678         intf.admin_up()
1679         sw_if_index = intf.sw_if_index
1680         vpp_session = VppBFDUDPSession(
1681             self, intf, intf.remote_ip6, af=AF_INET6)
1682         vpp_session.add_vpp_config()
1683         vpp_session.admin_up()
1684         intf.remove_vpp_config()
1685         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1686         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1687         self.assertFalse(vpp_session.query_vpp_config())
1688
1689
1690 class BFDFIBTestCase(VppTestCase):
1691     """ BFD-FIB interactions (IPv6) """
1692
1693     vpp_session = None
1694     test_session = None
1695
1696     @classmethod
1697     def setUpClass(cls):
1698         super(BFDFIBTestCase, cls).setUpClass()
1699
1700     @classmethod
1701     def tearDownClass(cls):
1702         super(BFDFIBTestCase, cls).tearDownClass()
1703
1704     def setUp(self):
1705         super(BFDFIBTestCase, self).setUp()
1706         self.create_pg_interfaces(range(1))
1707
1708         self.vapi.want_bfd_events()
1709         self.pg0.enable_capture()
1710
1711         for i in self.pg_interfaces:
1712             i.admin_up()
1713             i.config_ip6()
1714             i.configure_ipv6_neighbors()
1715
1716     def tearDown(self):
1717         if not self.vpp_dead:
1718             self.vapi.want_bfd_events(enable_disable=False)
1719
1720         super(BFDFIBTestCase, self).tearDown()
1721
1722     @staticmethod
1723     def pkt_is_not_data_traffic(p):
1724         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1725         if p.haslayer(BFD) or is_ipv6_misc(p):
1726             return True
1727         return False
1728
1729     def test_session_with_fib(self):
1730         """ BFD-FIB interactions """
1731
1732         # packets to match against both of the routes
1733         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1734               IPv6(src="3001::1", dst="2001::1") /
1735               UDP(sport=1234, dport=1234) /
1736               Raw(b'\xa5' * 100)),
1737              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1738               IPv6(src="3001::1", dst="2002::1") /
1739               UDP(sport=1234, dport=1234) /
1740               Raw(b'\xa5' * 100))]
1741
1742         # A recursive and a non-recursive route via a next-hop that
1743         # will have a BFD session
1744         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1745                                   [VppRoutePath(self.pg0.remote_ip6,
1746                                                 self.pg0.sw_if_index)])
1747         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1748                                   [VppRoutePath(self.pg0.remote_ip6,
1749                                                 0xffffffff)])
1750         ip_2001_s_64.add_vpp_config()
1751         ip_2002_s_64.add_vpp_config()
1752
1753         # bring the session up now the routes are present
1754         self.vpp_session = VppBFDUDPSession(self,
1755                                             self.pg0,
1756                                             self.pg0.remote_ip6,
1757                                             af=AF_INET6)
1758         self.vpp_session.add_vpp_config()
1759         self.vpp_session.admin_up()
1760         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1761
1762         # session is up - traffic passes
1763         bfd_session_up(self)
1764
1765         self.pg0.add_stream(p)
1766         self.pg_start()
1767         for packet in p:
1768             captured = self.pg0.wait_for_packet(
1769                 1,
1770                 filter_out_fn=self.pkt_is_not_data_traffic)
1771             self.assertEqual(captured[IPv6].dst,
1772                              packet[IPv6].dst)
1773
1774         # session is up - traffic is dropped
1775         bfd_session_down(self)
1776
1777         self.pg0.add_stream(p)
1778         self.pg_start()
1779         with self.assertRaises(CaptureTimeoutError):
1780             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1781
1782         # session is up - traffic passes
1783         bfd_session_up(self)
1784
1785         self.pg0.add_stream(p)
1786         self.pg_start()
1787         for packet in p:
1788             captured = self.pg0.wait_for_packet(
1789                 1,
1790                 filter_out_fn=self.pkt_is_not_data_traffic)
1791             self.assertEqual(captured[IPv6].dst,
1792                              packet[IPv6].dst)
1793
1794
1795 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1796 class BFDTunTestCase(VppTestCase):
1797     """ BFD over GRE tunnel """
1798
1799     vpp_session = None
1800     test_session = None
1801
1802     @classmethod
1803     def setUpClass(cls):
1804         super(BFDTunTestCase, cls).setUpClass()
1805
1806     @classmethod
1807     def tearDownClass(cls):
1808         super(BFDTunTestCase, cls).tearDownClass()
1809
1810     def setUp(self):
1811         super(BFDTunTestCase, self).setUp()
1812         self.create_pg_interfaces(range(1))
1813
1814         self.vapi.want_bfd_events()
1815         self.pg0.enable_capture()
1816
1817         for i in self.pg_interfaces:
1818             i.admin_up()
1819             i.config_ip4()
1820             i.resolve_arp()
1821
1822     def tearDown(self):
1823         if not self.vpp_dead:
1824             self.vapi.want_bfd_events(enable_disable=0)
1825
1826         super(BFDTunTestCase, self).tearDown()
1827
1828     @staticmethod
1829     def pkt_is_not_data_traffic(p):
1830         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1831         if p.haslayer(BFD) or is_ipv6_misc(p):
1832             return True
1833         return False
1834
1835     def test_bfd_o_gre(self):
1836         """ BFD-o-GRE  """
1837
1838         # A GRE interface over which to run a BFD session
1839         gre_if = VppGreInterface(self,
1840                                  self.pg0.local_ip4,
1841                                  self.pg0.remote_ip4)
1842         gre_if.add_vpp_config()
1843         gre_if.admin_up()
1844         gre_if.config_ip4()
1845
1846         # bring the session up now the routes are present
1847         self.vpp_session = VppBFDUDPSession(self,
1848                                             gre_if,
1849                                             gre_if.remote_ip4,
1850                                             is_tunnel=True)
1851         self.vpp_session.add_vpp_config()
1852         self.vpp_session.admin_up()
1853
1854         self.test_session = BFDTestSession(
1855             self, gre_if, AF_INET,
1856             tunnel_header=(IP(src=self.pg0.remote_ip4,
1857                               dst=self.pg0.local_ip4) /
1858                            GRE()),
1859             phy_interface=self.pg0)
1860
1861         # packets to match against both of the routes
1862         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1863               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1864               UDP(sport=1234, dport=1234) /
1865               Raw(b'\xa5' * 100))]
1866
1867         # session is up - traffic passes
1868         bfd_session_up(self)
1869
1870         self.send_and_expect(self.pg0, p, self.pg0)
1871
1872         # bring session down
1873         bfd_session_down(self)
1874
1875
1876 class BFDSHA1TestCase(VppTestCase):
1877     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1878
1879     pg0 = None
1880     vpp_clock_offset = None
1881     vpp_session = None
1882     test_session = None
1883
1884     @classmethod
1885     def setUpClass(cls):
1886         super(BFDSHA1TestCase, cls).setUpClass()
1887         cls.vapi.cli("set log class bfd level debug")
1888         try:
1889             cls.create_pg_interfaces([0])
1890             cls.pg0.config_ip4()
1891             cls.pg0.admin_up()
1892             cls.pg0.resolve_arp()
1893
1894         except Exception:
1895             super(BFDSHA1TestCase, cls).tearDownClass()
1896             raise
1897
1898     @classmethod
1899     def tearDownClass(cls):
1900         super(BFDSHA1TestCase, cls).tearDownClass()
1901
1902     def setUp(self):
1903         super(BFDSHA1TestCase, self).setUp()
1904         self.factory = AuthKeyFactory()
1905         self.vapi.want_bfd_events()
1906         self.pg0.enable_capture()
1907
1908     def tearDown(self):
1909         if not self.vpp_dead:
1910             self.vapi.want_bfd_events(enable_disable=False)
1911         self.vapi.collect_events()  # clear the event queue
1912         super(BFDSHA1TestCase, self).tearDown()
1913
1914     def test_session_up(self):
1915         """ bring BFD session up """
1916         key = self.factory.create_random_key(self)
1917         key.add_vpp_config()
1918         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1919                                             self.pg0.remote_ip4,
1920                                             sha1_key=key)
1921         self.vpp_session.add_vpp_config()
1922         self.vpp_session.admin_up()
1923         self.test_session = BFDTestSession(
1924             self, self.pg0, AF_INET, sha1_key=key,
1925             bfd_key_id=self.vpp_session.bfd_key_id)
1926         bfd_session_up(self)
1927
1928     def test_hold_up(self):
1929         """ hold BFD session up """
1930         key = self.factory.create_random_key(self)
1931         key.add_vpp_config()
1932         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1933                                             self.pg0.remote_ip4,
1934                                             sha1_key=key)
1935         self.vpp_session.add_vpp_config()
1936         self.vpp_session.admin_up()
1937         self.test_session = BFDTestSession(
1938             self, self.pg0, AF_INET, sha1_key=key,
1939             bfd_key_id=self.vpp_session.bfd_key_id)
1940         bfd_session_up(self)
1941         for dummy in range(self.test_session.detect_mult * 2):
1942             wait_for_bfd_packet(self)
1943             self.test_session.send_packet()
1944         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1945
1946     def test_hold_up_meticulous(self):
1947         """ hold BFD session up - meticulous auth """
1948         key = self.factory.create_random_key(
1949             self, BFDAuthType.meticulous_keyed_sha1)
1950         key.add_vpp_config()
1951         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1952                                             self.pg0.remote_ip4, sha1_key=key)
1953         self.vpp_session.add_vpp_config()
1954         self.vpp_session.admin_up()
1955         # specify sequence number so that it wraps
1956         self.test_session = BFDTestSession(
1957             self, self.pg0, AF_INET, sha1_key=key,
1958             bfd_key_id=self.vpp_session.bfd_key_id,
1959             our_seq_number=0xFFFFFFFF - 4)
1960         bfd_session_up(self)
1961         for dummy in range(30):
1962             wait_for_bfd_packet(self)
1963             self.test_session.inc_seq_num()
1964             self.test_session.send_packet()
1965         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1966
1967     def test_send_bad_seq_number(self):
1968         """ session is not kept alive by msgs with bad sequence numbers"""
1969         key = self.factory.create_random_key(
1970             self, BFDAuthType.meticulous_keyed_sha1)
1971         key.add_vpp_config()
1972         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1973                                             self.pg0.remote_ip4, sha1_key=key)
1974         self.vpp_session.add_vpp_config()
1975         self.test_session = BFDTestSession(
1976             self, self.pg0, AF_INET, sha1_key=key,
1977             bfd_key_id=self.vpp_session.bfd_key_id)
1978         bfd_session_up(self)
1979         detection_time = self.test_session.detect_mult *\
1980             self.vpp_session.required_min_rx / USEC_IN_SEC
1981         send_until = time.time() + 2 * detection_time
1982         while time.time() < send_until:
1983             self.test_session.send_packet()
1984             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1985                        "time between bfd packets")
1986         e = self.vapi.collect_events()
1987         # session should be down now, because the sequence numbers weren't
1988         # updated
1989         self.assert_equal(len(e), 1, "number of bfd events")
1990         verify_event(self, e[0], expected_state=BFDState.down)
1991
1992     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1993                                        legitimate_test_session,
1994                                        rogue_test_session,
1995                                        rogue_bfd_values=None):
1996         """ execute a rogue session interaction scenario
1997
1998         1. create vpp session, add config
1999         2. bring the legitimate session up
2000         3. copy the bfd values from legitimate session to rogue session
2001         4. apply rogue_bfd_values to rogue session
2002         5. set rogue session state to down
2003         6. send message to take the session down from the rogue session
2004         7. assert that the legitimate session is unaffected
2005         """
2006
2007         self.vpp_session = vpp_bfd_udp_session
2008         self.vpp_session.add_vpp_config()
2009         self.test_session = legitimate_test_session
2010         # bring vpp session up
2011         bfd_session_up(self)
2012         # send packet from rogue session
2013         rogue_test_session.update(
2014             my_discriminator=self.test_session.my_discriminator,
2015             your_discriminator=self.test_session.your_discriminator,
2016             desired_min_tx=self.test_session.desired_min_tx,
2017             required_min_rx=self.test_session.required_min_rx,
2018             detect_mult=self.test_session.detect_mult,
2019             diag=self.test_session.diag,
2020             state=self.test_session.state,
2021             auth_type=self.test_session.auth_type)
2022         if rogue_bfd_values:
2023             rogue_test_session.update(**rogue_bfd_values)
2024         rogue_test_session.update(state=BFDState.down)
2025         rogue_test_session.send_packet()
2026         wait_for_bfd_packet(self)
2027         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2028
2029     def test_mismatch_auth(self):
2030         """ session is not brought down by unauthenticated msg """
2031         key = self.factory.create_random_key(self)
2032         key.add_vpp_config()
2033         vpp_session = VppBFDUDPSession(
2034             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2035         legitimate_test_session = BFDTestSession(
2036             self, self.pg0, AF_INET, sha1_key=key,
2037             bfd_key_id=vpp_session.bfd_key_id)
2038         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2039         self.execute_rogue_session_scenario(vpp_session,
2040                                             legitimate_test_session,
2041                                             rogue_test_session)
2042
2043     def test_mismatch_bfd_key_id(self):
2044         """ session is not brought down by msg with non-existent key-id """
2045         key = self.factory.create_random_key(self)
2046         key.add_vpp_config()
2047         vpp_session = VppBFDUDPSession(
2048             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2049         # pick a different random bfd key id
2050         x = randint(0, 255)
2051         while x == vpp_session.bfd_key_id:
2052             x = randint(0, 255)
2053         legitimate_test_session = BFDTestSession(
2054             self, self.pg0, AF_INET, sha1_key=key,
2055             bfd_key_id=vpp_session.bfd_key_id)
2056         rogue_test_session = BFDTestSession(
2057             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2058         self.execute_rogue_session_scenario(vpp_session,
2059                                             legitimate_test_session,
2060                                             rogue_test_session)
2061
2062     def test_mismatched_auth_type(self):
2063         """ session is not brought down by msg with wrong auth type """
2064         key = self.factory.create_random_key(self)
2065         key.add_vpp_config()
2066         vpp_session = VppBFDUDPSession(
2067             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2068         legitimate_test_session = BFDTestSession(
2069             self, self.pg0, AF_INET, sha1_key=key,
2070             bfd_key_id=vpp_session.bfd_key_id)
2071         rogue_test_session = BFDTestSession(
2072             self, self.pg0, AF_INET, sha1_key=key,
2073             bfd_key_id=vpp_session.bfd_key_id)
2074         self.execute_rogue_session_scenario(
2075             vpp_session, legitimate_test_session, rogue_test_session,
2076             {'auth_type': BFDAuthType.keyed_md5})
2077
2078     def test_restart(self):
2079         """ simulate remote peer restart and resynchronization """
2080         key = self.factory.create_random_key(
2081             self, BFDAuthType.meticulous_keyed_sha1)
2082         key.add_vpp_config()
2083         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2084                                             self.pg0.remote_ip4, sha1_key=key)
2085         self.vpp_session.add_vpp_config()
2086         self.test_session = BFDTestSession(
2087             self, self.pg0, AF_INET, sha1_key=key,
2088             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2089         bfd_session_up(self)
2090         # don't send any packets for 2*detection_time
2091         detection_time = self.test_session.detect_mult *\
2092             self.vpp_session.required_min_rx / USEC_IN_SEC
2093         self.sleep(2 * detection_time, "simulating peer restart")
2094         events = self.vapi.collect_events()
2095         self.assert_equal(len(events), 1, "number of bfd events")
2096         verify_event(self, events[0], expected_state=BFDState.down)
2097         self.test_session.update(state=BFDState.down)
2098         # reset sequence number
2099         self.test_session.our_seq_number = 0
2100         self.test_session.vpp_seq_number = None
2101         # now throw away any pending packets
2102         self.pg0.enable_capture()
2103         bfd_session_up(self)
2104
2105
2106 class BFDAuthOnOffTestCase(VppTestCase):
2107     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2108
2109     pg0 = None
2110     vpp_session = None
2111     test_session = None
2112
2113     @classmethod
2114     def setUpClass(cls):
2115         super(BFDAuthOnOffTestCase, cls).setUpClass()
2116         cls.vapi.cli("set log class bfd level debug")
2117         try:
2118             cls.create_pg_interfaces([0])
2119             cls.pg0.config_ip4()
2120             cls.pg0.admin_up()
2121             cls.pg0.resolve_arp()
2122
2123         except Exception:
2124             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2125             raise
2126
2127     @classmethod
2128     def tearDownClass(cls):
2129         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2130
2131     def setUp(self):
2132         super(BFDAuthOnOffTestCase, self).setUp()
2133         self.factory = AuthKeyFactory()
2134         self.vapi.want_bfd_events()
2135         self.pg0.enable_capture()
2136
2137     def tearDown(self):
2138         if not self.vpp_dead:
2139             self.vapi.want_bfd_events(enable_disable=False)
2140         self.vapi.collect_events()  # clear the event queue
2141         super(BFDAuthOnOffTestCase, self).tearDown()
2142
2143     def test_auth_on_immediate(self):
2144         """ turn auth on without disturbing session state (immediate) """
2145         key = self.factory.create_random_key(self)
2146         key.add_vpp_config()
2147         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2148                                             self.pg0.remote_ip4)
2149         self.vpp_session.add_vpp_config()
2150         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2151         bfd_session_up(self)
2152         for dummy in range(self.test_session.detect_mult * 2):
2153             p = wait_for_bfd_packet(self)
2154             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2155             self.test_session.send_packet()
2156         self.vpp_session.activate_auth(key)
2157         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2158         self.test_session.sha1_key = key
2159         for dummy in range(self.test_session.detect_mult * 2):
2160             p = wait_for_bfd_packet(self)
2161             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2162             self.test_session.send_packet()
2163         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2164         self.assert_equal(len(self.vapi.collect_events()), 0,
2165                           "number of bfd events")
2166
2167     def test_auth_off_immediate(self):
2168         """ turn auth off without disturbing session state (immediate) """
2169         key = self.factory.create_random_key(self)
2170         key.add_vpp_config()
2171         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2172                                             self.pg0.remote_ip4, sha1_key=key)
2173         self.vpp_session.add_vpp_config()
2174         self.test_session = BFDTestSession(
2175             self, self.pg0, AF_INET, sha1_key=key,
2176             bfd_key_id=self.vpp_session.bfd_key_id)
2177         bfd_session_up(self)
2178         # self.vapi.want_bfd_events(enable_disable=0)
2179         for dummy in range(self.test_session.detect_mult * 2):
2180             p = wait_for_bfd_packet(self)
2181             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2182             self.test_session.inc_seq_num()
2183             self.test_session.send_packet()
2184         self.vpp_session.deactivate_auth()
2185         self.test_session.bfd_key_id = None
2186         self.test_session.sha1_key = None
2187         for dummy in range(self.test_session.detect_mult * 2):
2188             p = wait_for_bfd_packet(self)
2189             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2190             self.test_session.inc_seq_num()
2191             self.test_session.send_packet()
2192         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2193         self.assert_equal(len(self.vapi.collect_events()), 0,
2194                           "number of bfd events")
2195
2196     def test_auth_change_key_immediate(self):
2197         """ change auth key without disturbing session state (immediate) """
2198         key1 = self.factory.create_random_key(self)
2199         key1.add_vpp_config()
2200         key2 = self.factory.create_random_key(self)
2201         key2.add_vpp_config()
2202         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2203                                             self.pg0.remote_ip4, sha1_key=key1)
2204         self.vpp_session.add_vpp_config()
2205         self.test_session = BFDTestSession(
2206             self, self.pg0, AF_INET, sha1_key=key1,
2207             bfd_key_id=self.vpp_session.bfd_key_id)
2208         bfd_session_up(self)
2209         for dummy in range(self.test_session.detect_mult * 2):
2210             p = wait_for_bfd_packet(self)
2211             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2212             self.test_session.send_packet()
2213         self.vpp_session.activate_auth(key2)
2214         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2215         self.test_session.sha1_key = key2
2216         for dummy in range(self.test_session.detect_mult * 2):
2217             p = wait_for_bfd_packet(self)
2218             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2219             self.test_session.send_packet()
2220         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2221         self.assert_equal(len(self.vapi.collect_events()), 0,
2222                           "number of bfd events")
2223
2224     def test_auth_on_delayed(self):
2225         """ turn auth on without disturbing session state (delayed) """
2226         key = self.factory.create_random_key(self)
2227         key.add_vpp_config()
2228         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2229                                             self.pg0.remote_ip4)
2230         self.vpp_session.add_vpp_config()
2231         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2232         bfd_session_up(self)
2233         for dummy in range(self.test_session.detect_mult * 2):
2234             wait_for_bfd_packet(self)
2235             self.test_session.send_packet()
2236         self.vpp_session.activate_auth(key, delayed=True)
2237         for dummy in range(self.test_session.detect_mult * 2):
2238             p = wait_for_bfd_packet(self)
2239             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2240             self.test_session.send_packet()
2241         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2242         self.test_session.sha1_key = key
2243         self.test_session.send_packet()
2244         for dummy in range(self.test_session.detect_mult * 2):
2245             p = wait_for_bfd_packet(self)
2246             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2247             self.test_session.send_packet()
2248         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2249         self.assert_equal(len(self.vapi.collect_events()), 0,
2250                           "number of bfd events")
2251
2252     def test_auth_off_delayed(self):
2253         """ turn auth off without disturbing session state (delayed) """
2254         key = self.factory.create_random_key(self)
2255         key.add_vpp_config()
2256         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2257                                             self.pg0.remote_ip4, sha1_key=key)
2258         self.vpp_session.add_vpp_config()
2259         self.test_session = BFDTestSession(
2260             self, self.pg0, AF_INET, sha1_key=key,
2261             bfd_key_id=self.vpp_session.bfd_key_id)
2262         bfd_session_up(self)
2263         for dummy in range(self.test_session.detect_mult * 2):
2264             p = wait_for_bfd_packet(self)
2265             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2266             self.test_session.send_packet()
2267         self.vpp_session.deactivate_auth(delayed=True)
2268         for dummy in range(self.test_session.detect_mult * 2):
2269             p = wait_for_bfd_packet(self)
2270             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2271             self.test_session.send_packet()
2272         self.test_session.bfd_key_id = None
2273         self.test_session.sha1_key = None
2274         self.test_session.send_packet()
2275         for dummy in range(self.test_session.detect_mult * 2):
2276             p = wait_for_bfd_packet(self)
2277             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2278             self.test_session.send_packet()
2279         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2280         self.assert_equal(len(self.vapi.collect_events()), 0,
2281                           "number of bfd events")
2282
2283     def test_auth_change_key_delayed(self):
2284         """ change auth key without disturbing session state (delayed) """
2285         key1 = self.factory.create_random_key(self)
2286         key1.add_vpp_config()
2287         key2 = self.factory.create_random_key(self)
2288         key2.add_vpp_config()
2289         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2290                                             self.pg0.remote_ip4, sha1_key=key1)
2291         self.vpp_session.add_vpp_config()
2292         self.vpp_session.admin_up()
2293         self.test_session = BFDTestSession(
2294             self, self.pg0, AF_INET, sha1_key=key1,
2295             bfd_key_id=self.vpp_session.bfd_key_id)
2296         bfd_session_up(self)
2297         for dummy in range(self.test_session.detect_mult * 2):
2298             p = wait_for_bfd_packet(self)
2299             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2300             self.test_session.send_packet()
2301         self.vpp_session.activate_auth(key2, delayed=True)
2302         for dummy in range(self.test_session.detect_mult * 2):
2303             p = wait_for_bfd_packet(self)
2304             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2305             self.test_session.send_packet()
2306         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2307         self.test_session.sha1_key = key2
2308         self.test_session.send_packet()
2309         for dummy in range(self.test_session.detect_mult * 2):
2310             p = wait_for_bfd_packet(self)
2311             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2312             self.test_session.send_packet()
2313         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2314         self.assert_equal(len(self.vapi.collect_events()), 0,
2315                           "number of bfd events")
2316
2317
2318 class BFDCLITestCase(VppTestCase):
2319     """Bidirectional Forwarding Detection (BFD) (CLI) """
2320     pg0 = None
2321
2322     @classmethod
2323     def setUpClass(cls):
2324         super(BFDCLITestCase, cls).setUpClass()
2325         cls.vapi.cli("set log class bfd level debug")
2326         try:
2327             cls.create_pg_interfaces((0,))
2328             cls.pg0.config_ip4()
2329             cls.pg0.config_ip6()
2330             cls.pg0.resolve_arp()
2331             cls.pg0.resolve_ndp()
2332
2333         except Exception:
2334             super(BFDCLITestCase, cls).tearDownClass()
2335             raise
2336
2337     @classmethod
2338     def tearDownClass(cls):
2339         super(BFDCLITestCase, cls).tearDownClass()
2340
2341     def setUp(self):
2342         super(BFDCLITestCase, self).setUp()
2343         self.factory = AuthKeyFactory()
2344         self.pg0.enable_capture()
2345
2346     def tearDown(self):
2347         try:
2348             self.vapi.want_bfd_events(enable_disable=False)
2349         except UnexpectedApiReturnValueError:
2350             # some tests aren't subscribed, so this is not an issue
2351             pass
2352         self.vapi.collect_events()  # clear the event queue
2353         super(BFDCLITestCase, self).tearDown()
2354
2355     def cli_verify_no_response(self, cli):
2356         """ execute a CLI, asserting that the response is empty """
2357         self.assert_equal(self.vapi.cli(cli),
2358                           "",
2359                           "CLI command response")
2360
2361     def cli_verify_response(self, cli, expected):
2362         """ execute a CLI, asserting that the response matches expectation """
2363         try:
2364             reply = self.vapi.cli(cli)
2365         except CliFailedCommandError as cli_error:
2366             reply = str(cli_error)
2367         self.assert_equal(reply.strip(),
2368                           expected,
2369                           "CLI command response")
2370
2371     def test_show(self):
2372         """ show commands """
2373         k1 = self.factory.create_random_key(self)
2374         k1.add_vpp_config()
2375         k2 = self.factory.create_random_key(
2376             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2377         k2.add_vpp_config()
2378         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2379         s1.add_vpp_config()
2380         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2381                               sha1_key=k2)
2382         s2.add_vpp_config()
2383         self.logger.info(self.vapi.ppcli("show bfd keys"))
2384         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2385         self.logger.info(self.vapi.ppcli("show bfd"))
2386
2387     def test_set_del_sha1_key(self):
2388         """ set/delete SHA1 auth key """
2389         k = self.factory.create_random_key(self)
2390         self.registry.register(k, self.logger)
2391         self.cli_verify_no_response(
2392             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2393             (k.conf_key_id,
2394                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2395         self.assertTrue(k.query_vpp_config())
2396         self.vpp_session = VppBFDUDPSession(
2397             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2398         self.vpp_session.add_vpp_config()
2399         self.test_session = \
2400             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2401                            bfd_key_id=self.vpp_session.bfd_key_id)
2402         self.vapi.want_bfd_events()
2403         bfd_session_up(self)
2404         bfd_session_down(self)
2405         # try to replace the secret for the key - should fail because the key
2406         # is in-use
2407         k2 = self.factory.create_random_key(self)
2408         self.cli_verify_response(
2409             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2410             (k.conf_key_id,
2411                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2412             "bfd key set: `bfd_auth_set_key' API call failed, "
2413             "rv=-103:BFD object in use")
2414         # manipulating the session using old secret should still work
2415         bfd_session_up(self)
2416         bfd_session_down(self)
2417         self.vpp_session.remove_vpp_config()
2418         self.cli_verify_no_response(
2419             "bfd key del conf-key-id %s" % k.conf_key_id)
2420         self.assertFalse(k.query_vpp_config())
2421
2422     def test_set_del_meticulous_sha1_key(self):
2423         """ set/delete meticulous SHA1 auth key """
2424         k = self.factory.create_random_key(
2425             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2426         self.registry.register(k, self.logger)
2427         self.cli_verify_no_response(
2428             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2429             (k.conf_key_id,
2430                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2431         self.assertTrue(k.query_vpp_config())
2432         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2433                                             self.pg0.remote_ip6, af=AF_INET6,
2434                                             sha1_key=k)
2435         self.vpp_session.add_vpp_config()
2436         self.vpp_session.admin_up()
2437         self.test_session = \
2438             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2439                            bfd_key_id=self.vpp_session.bfd_key_id)
2440         self.vapi.want_bfd_events()
2441         bfd_session_up(self)
2442         bfd_session_down(self)
2443         # try to replace the secret for the key - should fail because the key
2444         # is in-use
2445         k2 = self.factory.create_random_key(self)
2446         self.cli_verify_response(
2447             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2448             (k.conf_key_id,
2449                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2450             "bfd key set: `bfd_auth_set_key' API call failed, "
2451             "rv=-103:BFD object in use")
2452         # manipulating the session using old secret should still work
2453         bfd_session_up(self)
2454         bfd_session_down(self)
2455         self.vpp_session.remove_vpp_config()
2456         self.cli_verify_no_response(
2457             "bfd key del conf-key-id %s" % k.conf_key_id)
2458         self.assertFalse(k.query_vpp_config())
2459
2460     def test_add_mod_del_bfd_udp(self):
2461         """ create/modify/delete IPv4 BFD UDP session """
2462         vpp_session = VppBFDUDPSession(
2463             self, self.pg0, self.pg0.remote_ip4)
2464         self.registry.register(vpp_session, self.logger)
2465         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2466             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2467             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2468                                 self.pg0.remote_ip4,
2469                                 vpp_session.desired_min_tx,
2470                                 vpp_session.required_min_rx,
2471                                 vpp_session.detect_mult)
2472         self.cli_verify_no_response(cli_add_cmd)
2473         # 2nd add should fail
2474         self.cli_verify_response(
2475             cli_add_cmd,
2476             "bfd udp session add: `bfd_add_add_session' API call"
2477             " failed, rv=-101:Duplicate BFD object")
2478         verify_bfd_session_config(self, vpp_session)
2479         mod_session = VppBFDUDPSession(
2480             self, self.pg0, self.pg0.remote_ip4,
2481             required_min_rx=2 * vpp_session.required_min_rx,
2482             desired_min_tx=3 * vpp_session.desired_min_tx,
2483             detect_mult=4 * vpp_session.detect_mult)
2484         self.cli_verify_no_response(
2485             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2486             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2487             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2488              mod_session.desired_min_tx, mod_session.required_min_rx,
2489              mod_session.detect_mult))
2490         verify_bfd_session_config(self, mod_session)
2491         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2492             "peer-addr %s" % (self.pg0.name,
2493                               self.pg0.local_ip4, self.pg0.remote_ip4)
2494         self.cli_verify_no_response(cli_del_cmd)
2495         # 2nd del is expected to fail
2496         self.cli_verify_response(
2497             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2498             " failed, rv=-102:No such BFD object")
2499         self.assertFalse(vpp_session.query_vpp_config())
2500
2501     def test_add_mod_del_bfd_udp6(self):
2502         """ create/modify/delete IPv6 BFD UDP session """
2503         vpp_session = VppBFDUDPSession(
2504             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2505         self.registry.register(vpp_session, self.logger)
2506         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2507             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2508             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2509                                 self.pg0.remote_ip6,
2510                                 vpp_session.desired_min_tx,
2511                                 vpp_session.required_min_rx,
2512                                 vpp_session.detect_mult)
2513         self.cli_verify_no_response(cli_add_cmd)
2514         # 2nd add should fail
2515         self.cli_verify_response(
2516             cli_add_cmd,
2517             "bfd udp session add: `bfd_add_add_session' API call"
2518             " failed, rv=-101:Duplicate BFD object")
2519         verify_bfd_session_config(self, vpp_session)
2520         mod_session = VppBFDUDPSession(
2521             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2522             required_min_rx=2 * vpp_session.required_min_rx,
2523             desired_min_tx=3 * vpp_session.desired_min_tx,
2524             detect_mult=4 * vpp_session.detect_mult)
2525         self.cli_verify_no_response(
2526             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2527             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2528             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2529              mod_session.desired_min_tx,
2530              mod_session.required_min_rx, mod_session.detect_mult))
2531         verify_bfd_session_config(self, mod_session)
2532         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2533             "peer-addr %s" % (self.pg0.name,
2534                               self.pg0.local_ip6, self.pg0.remote_ip6)
2535         self.cli_verify_no_response(cli_del_cmd)
2536         # 2nd del is expected to fail
2537         self.cli_verify_response(
2538             cli_del_cmd,
2539             "bfd udp session del: `bfd_udp_del_session' API call"
2540             " failed, rv=-102:No such BFD object")
2541         self.assertFalse(vpp_session.query_vpp_config())
2542
2543     def test_add_mod_del_bfd_udp_auth(self):
2544         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2545         key = self.factory.create_random_key(self)
2546         key.add_vpp_config()
2547         vpp_session = VppBFDUDPSession(
2548             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2549         self.registry.register(vpp_session, self.logger)
2550         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2551             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2552             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2553             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2554                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2555                vpp_session.detect_mult, key.conf_key_id,
2556                vpp_session.bfd_key_id)
2557         self.cli_verify_no_response(cli_add_cmd)
2558         # 2nd add should fail
2559         self.cli_verify_response(
2560             cli_add_cmd,
2561             "bfd udp session add: `bfd_add_add_session' API call"
2562             " failed, rv=-101:Duplicate BFD object")
2563         verify_bfd_session_config(self, vpp_session)
2564         mod_session = VppBFDUDPSession(
2565             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2566             bfd_key_id=vpp_session.bfd_key_id,
2567             required_min_rx=2 * vpp_session.required_min_rx,
2568             desired_min_tx=3 * vpp_session.desired_min_tx,
2569             detect_mult=4 * vpp_session.detect_mult)
2570         self.cli_verify_no_response(
2571             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2572             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2573             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2574              mod_session.desired_min_tx,
2575              mod_session.required_min_rx, mod_session.detect_mult))
2576         verify_bfd_session_config(self, mod_session)
2577         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2578             "peer-addr %s" % (self.pg0.name,
2579                               self.pg0.local_ip4, self.pg0.remote_ip4)
2580         self.cli_verify_no_response(cli_del_cmd)
2581         # 2nd del is expected to fail
2582         self.cli_verify_response(
2583             cli_del_cmd,
2584             "bfd udp session del: `bfd_udp_del_session' API call"
2585             " failed, rv=-102:No such BFD object")
2586         self.assertFalse(vpp_session.query_vpp_config())
2587
2588     def test_add_mod_del_bfd_udp6_auth(self):
2589         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2590         key = self.factory.create_random_key(
2591             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2592         key.add_vpp_config()
2593         vpp_session = VppBFDUDPSession(
2594             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2595         self.registry.register(vpp_session, self.logger)
2596         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2597             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2598             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2599             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2600                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2601                vpp_session.detect_mult, key.conf_key_id,
2602                vpp_session.bfd_key_id)
2603         self.cli_verify_no_response(cli_add_cmd)
2604         # 2nd add should fail
2605         self.cli_verify_response(
2606             cli_add_cmd,
2607             "bfd udp session add: `bfd_add_add_session' API call"
2608             " failed, rv=-101:Duplicate BFD object")
2609         verify_bfd_session_config(self, vpp_session)
2610         mod_session = VppBFDUDPSession(
2611             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2612             bfd_key_id=vpp_session.bfd_key_id,
2613             required_min_rx=2 * vpp_session.required_min_rx,
2614             desired_min_tx=3 * vpp_session.desired_min_tx,
2615             detect_mult=4 * vpp_session.detect_mult)
2616         self.cli_verify_no_response(
2617             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2618             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2619             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2620              mod_session.desired_min_tx,
2621              mod_session.required_min_rx, mod_session.detect_mult))
2622         verify_bfd_session_config(self, mod_session)
2623         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2624             "peer-addr %s" % (self.pg0.name,
2625                               self.pg0.local_ip6, self.pg0.remote_ip6)
2626         self.cli_verify_no_response(cli_del_cmd)
2627         # 2nd del is expected to fail
2628         self.cli_verify_response(
2629             cli_del_cmd,
2630             "bfd udp session del: `bfd_udp_del_session' API call"
2631             " failed, rv=-102:No such BFD object")
2632         self.assertFalse(vpp_session.query_vpp_config())
2633
2634     def test_auth_on_off(self):
2635         """ turn authentication on and off """
2636         key = self.factory.create_random_key(
2637             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2638         key.add_vpp_config()
2639         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2640         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2641                                         sha1_key=key)
2642         session.add_vpp_config()
2643         cli_activate = \
2644             "bfd udp session auth activate interface %s local-addr %s "\
2645             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2646             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2647                key.conf_key_id, auth_session.bfd_key_id)
2648         self.cli_verify_no_response(cli_activate)
2649         verify_bfd_session_config(self, auth_session)
2650         self.cli_verify_no_response(cli_activate)
2651         verify_bfd_session_config(self, auth_session)
2652         cli_deactivate = \
2653             "bfd udp session auth deactivate interface %s local-addr %s "\
2654             "peer-addr %s "\
2655             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2656         self.cli_verify_no_response(cli_deactivate)
2657         verify_bfd_session_config(self, session)
2658         self.cli_verify_no_response(cli_deactivate)
2659         verify_bfd_session_config(self, session)
2660
2661     def test_auth_on_off_delayed(self):
2662         """ turn authentication on and off (delayed) """
2663         key = self.factory.create_random_key(
2664             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2665         key.add_vpp_config()
2666         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2667         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2668                                         sha1_key=key)
2669         session.add_vpp_config()
2670         cli_activate = \
2671             "bfd udp session auth activate interface %s local-addr %s "\
2672             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2673             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2674                key.conf_key_id, auth_session.bfd_key_id)
2675         self.cli_verify_no_response(cli_activate)
2676         verify_bfd_session_config(self, auth_session)
2677         self.cli_verify_no_response(cli_activate)
2678         verify_bfd_session_config(self, auth_session)
2679         cli_deactivate = \
2680             "bfd udp session auth deactivate interface %s local-addr %s "\
2681             "peer-addr %s delayed yes"\
2682             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2683         self.cli_verify_no_response(cli_deactivate)
2684         verify_bfd_session_config(self, session)
2685         self.cli_verify_no_response(cli_deactivate)
2686         verify_bfd_session_config(self, session)
2687
2688     def test_admin_up_down(self):
2689         """ put session admin-up and admin-down """
2690         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2691         session.add_vpp_config()
2692         cli_down = \
2693             "bfd udp session set-flags admin down interface %s local-addr %s "\
2694             "peer-addr %s "\
2695             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2696         cli_up = \
2697             "bfd udp session set-flags admin up interface %s local-addr %s "\
2698             "peer-addr %s "\
2699             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2700         self.cli_verify_no_response(cli_down)
2701         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2702         self.cli_verify_no_response(cli_up)
2703         verify_bfd_session_config(self, session, state=BFDState.down)
2704
2705     def test_set_del_udp_echo_source(self):
2706         """ set/del udp echo source """
2707         self.create_loopback_interfaces(1)
2708         self.loopback0 = self.lo_interfaces[0]
2709         self.loopback0.admin_up()
2710         self.cli_verify_response("show bfd echo-source",
2711                                  "UDP echo source is not set.")
2712         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2713         self.cli_verify_no_response(cli_set)
2714         self.cli_verify_response("show bfd echo-source",
2715                                  "UDP echo source is: %s\n"
2716                                  "IPv4 address usable as echo source: none\n"
2717                                  "IPv6 address usable as echo source: none" %
2718                                  self.loopback0.name)
2719         self.loopback0.config_ip4()
2720         unpacked = unpack("!L", self.loopback0.local_ip4n)
2721         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2722         self.cli_verify_response("show bfd echo-source",
2723                                  "UDP echo source is: %s\n"
2724                                  "IPv4 address usable as echo source: %s\n"
2725                                  "IPv6 address usable as echo source: none" %
2726                                  (self.loopback0.name, echo_ip4))
2727         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2728         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2729                                             unpacked[2], unpacked[3] ^ 1))
2730         self.loopback0.config_ip6()
2731         self.cli_verify_response("show bfd echo-source",
2732                                  "UDP echo source is: %s\n"
2733                                  "IPv4 address usable as echo source: %s\n"
2734                                  "IPv6 address usable as echo source: %s" %
2735                                  (self.loopback0.name, echo_ip4, echo_ip6))
2736         cli_del = "bfd udp echo-source del"
2737         self.cli_verify_no_response(cli_del)
2738         self.cli_verify_response("show bfd echo-source",
2739                                  "UDP echo source is not set.")
2740
2741
2742 if __name__ == '__main__':
2743     unittest.main(testRunner=VppTestRunner)