GBP Endpoint Updates
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from vpp_lo_interface import VppLoInterface
21 from util import ppp
22 from vpp_papi_provider import UnexpectedApiReturnValueError
23 from vpp_ip import DpoProto
24 from vpp_ip_route import VppIpRoute, VppRoutePath
25
26 USEC_IN_SEC = 1000000
27
28
29 class AuthKeyFactory(object):
30     """Factory class for creating auth keys with unique conf key ID"""
31
32     def __init__(self):
33         self._conf_key_ids = {}
34
35     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
36         """ create a random key with unique conf key id """
37         conf_key_id = randint(0, 0xFFFFFFFF)
38         while conf_key_id in self._conf_key_ids:
39             conf_key_id = randint(0, 0xFFFFFFFF)
40         self._conf_key_ids[conf_key_id] = 1
41         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
42         return VppBFDAuthKey(test=test, auth_type=auth_type,
43                              conf_key_id=conf_key_id, key=key)
44
45
46 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
47 class BFDAPITestCase(VppTestCase):
48     """Bidirectional Forwarding Detection (BFD) - API"""
49
50     pg0 = None
51     pg1 = None
52
53     @classmethod
54     def setUpClass(cls):
55         super(BFDAPITestCase, cls).setUpClass()
56         cls.vapi.cli("set log class bfd level debug")
57         try:
58             cls.create_pg_interfaces(range(2))
59             for i in cls.pg_interfaces:
60                 i.config_ip4()
61                 i.config_ip6()
62                 i.resolve_arp()
63
64         except Exception:
65             super(BFDAPITestCase, cls).tearDownClass()
66             raise
67
68     def setUp(self):
69         super(BFDAPITestCase, self).setUp()
70         self.factory = AuthKeyFactory()
71
72     def test_add_bfd(self):
73         """ create a BFD session """
74         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
75         session.add_vpp_config()
76         self.logger.debug("Session state is %s", session.state)
77         session.remove_vpp_config()
78         session.add_vpp_config()
79         self.logger.debug("Session state is %s", session.state)
80         session.remove_vpp_config()
81
82     def test_double_add(self):
83         """ create the same BFD session twice (negative case) """
84         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
85         session.add_vpp_config()
86
87         with self.vapi.expect_negative_api_retval():
88             session.add_vpp_config()
89
90         session.remove_vpp_config()
91
92     def test_add_bfd6(self):
93         """ create IPv6 BFD session """
94         session = VppBFDUDPSession(
95             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
96         session.add_vpp_config()
97         self.logger.debug("Session state is %s", session.state)
98         session.remove_vpp_config()
99         session.add_vpp_config()
100         self.logger.debug("Session state is %s", session.state)
101         session.remove_vpp_config()
102
103     def test_mod_bfd(self):
104         """ modify BFD session parameters """
105         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
106                                    desired_min_tx=50000,
107                                    required_min_rx=10000,
108                                    detect_mult=1)
109         session.add_vpp_config()
110         s = session.get_bfd_udp_session_dump_entry()
111         self.assert_equal(session.desired_min_tx,
112                           s.desired_min_tx,
113                           "desired min transmit interval")
114         self.assert_equal(session.required_min_rx,
115                           s.required_min_rx,
116                           "required min receive interval")
117         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
118         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
119                                   required_min_rx=session.required_min_rx * 2,
120                                   detect_mult=session.detect_mult * 2)
121         s = session.get_bfd_udp_session_dump_entry()
122         self.assert_equal(session.desired_min_tx,
123                           s.desired_min_tx,
124                           "desired min transmit interval")
125         self.assert_equal(session.required_min_rx,
126                           s.required_min_rx,
127                           "required min receive interval")
128         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129
130     def test_add_sha1_keys(self):
131         """ add SHA1 keys """
132         key_count = 10
133         keys = [self.factory.create_random_key(
134             self) for i in range(0, key_count)]
135         for key in keys:
136             self.assertFalse(key.query_vpp_config())
137         for key in keys:
138             key.add_vpp_config()
139         for key in keys:
140             self.assertTrue(key.query_vpp_config())
141         # remove randomly
142         indexes = range(key_count)
143         shuffle(indexes)
144         removed = []
145         for i in indexes:
146             key = keys[i]
147             key.remove_vpp_config()
148             removed.append(i)
149             for j in range(key_count):
150                 key = keys[j]
151                 if j in removed:
152                     self.assertFalse(key.query_vpp_config())
153                 else:
154                     self.assertTrue(key.query_vpp_config())
155         # should be removed now
156         for key in keys:
157             self.assertFalse(key.query_vpp_config())
158         # add back and remove again
159         for key in keys:
160             key.add_vpp_config()
161         for key in keys:
162             self.assertTrue(key.query_vpp_config())
163         for key in keys:
164             key.remove_vpp_config()
165         for key in keys:
166             self.assertFalse(key.query_vpp_config())
167
168     def test_add_bfd_sha1(self):
169         """ create a BFD session (SHA1) """
170         key = self.factory.create_random_key(self)
171         key.add_vpp_config()
172         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
173                                    sha1_key=key)
174         session.add_vpp_config()
175         self.logger.debug("Session state is %s", session.state)
176         session.remove_vpp_config()
177         session.add_vpp_config()
178         self.logger.debug("Session state is %s", session.state)
179         session.remove_vpp_config()
180
181     def test_double_add_sha1(self):
182         """ create the same BFD session twice (negative case) (SHA1) """
183         key = self.factory.create_random_key(self)
184         key.add_vpp_config()
185         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186                                    sha1_key=key)
187         session.add_vpp_config()
188         with self.assertRaises(Exception):
189             session.add_vpp_config()
190
191     def test_add_auth_nonexistent_key(self):
192         """ create BFD session using non-existent SHA1 (negative case) """
193         session = VppBFDUDPSession(
194             self, self.pg0, self.pg0.remote_ip4,
195             sha1_key=self.factory.create_random_key(self))
196         with self.assertRaises(Exception):
197             session.add_vpp_config()
198
199     def test_shared_sha1_key(self):
200         """ share single SHA1 key between multiple BFD sessions """
201         key = self.factory.create_random_key(self)
202         key.add_vpp_config()
203         sessions = [
204             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
205                              sha1_key=key),
206             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
207                              sha1_key=key, af=AF_INET6),
208             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
209                              sha1_key=key),
210             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
211                              sha1_key=key, af=AF_INET6)]
212         for s in sessions:
213             s.add_vpp_config()
214         removed = 0
215         for s in sessions:
216             e = key.get_bfd_auth_keys_dump_entry()
217             self.assert_equal(e.use_count, len(sessions) - removed,
218                               "Use count for shared key")
219             s.remove_vpp_config()
220             removed += 1
221         e = key.get_bfd_auth_keys_dump_entry()
222         self.assert_equal(e.use_count, len(sessions) - removed,
223                           "Use count for shared key")
224
225     def test_activate_auth(self):
226         """ activate SHA1 authentication """
227         key = self.factory.create_random_key(self)
228         key.add_vpp_config()
229         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
230         session.add_vpp_config()
231         session.activate_auth(key)
232
233     def test_deactivate_auth(self):
234         """ deactivate SHA1 authentication """
235         key = self.factory.create_random_key(self)
236         key.add_vpp_config()
237         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
238         session.add_vpp_config()
239         session.activate_auth(key)
240         session.deactivate_auth()
241
242     def test_change_key(self):
243         """ change SHA1 key """
244         key1 = self.factory.create_random_key(self)
245         key2 = self.factory.create_random_key(self)
246         while key2.conf_key_id == key1.conf_key_id:
247             key2 = self.factory.create_random_key(self)
248         key1.add_vpp_config()
249         key2.add_vpp_config()
250         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
251                                    sha1_key=key1)
252         session.add_vpp_config()
253         session.activate_auth(key2)
254
255
256 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
257 class BFDTestSession(object):
258     """ BFD session as seen from test framework side """
259
260     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
261                  bfd_key_id=None, our_seq_number=None):
262         self.test = test
263         self.af = af
264         self.sha1_key = sha1_key
265         self.bfd_key_id = bfd_key_id
266         self.interface = interface
267         self.udp_sport = randint(49152, 65535)
268         if our_seq_number is None:
269             self.our_seq_number = randint(0, 40000000)
270         else:
271             self.our_seq_number = our_seq_number
272         self.vpp_seq_number = None
273         self.my_discriminator = 0
274         self.desired_min_tx = 300000
275         self.required_min_rx = 300000
276         self.required_min_echo_rx = None
277         self.detect_mult = detect_mult
278         self.diag = BFDDiagCode.no_diagnostic
279         self.your_discriminator = None
280         self.state = BFDState.down
281         self.auth_type = BFDAuthType.no_auth
282
283     def inc_seq_num(self):
284         """ increment sequence number, wrapping if needed """
285         if self.our_seq_number == 0xFFFFFFFF:
286             self.our_seq_number = 0
287         else:
288             self.our_seq_number += 1
289
290     def update(self, my_discriminator=None, your_discriminator=None,
291                desired_min_tx=None, required_min_rx=None,
292                required_min_echo_rx=None, detect_mult=None,
293                diag=None, state=None, auth_type=None):
294         """ update BFD parameters associated with session """
295         if my_discriminator is not None:
296             self.my_discriminator = my_discriminator
297         if your_discriminator is not None:
298             self.your_discriminator = your_discriminator
299         if required_min_rx is not None:
300             self.required_min_rx = required_min_rx
301         if required_min_echo_rx is not None:
302             self.required_min_echo_rx = required_min_echo_rx
303         if desired_min_tx is not None:
304             self.desired_min_tx = desired_min_tx
305         if detect_mult is not None:
306             self.detect_mult = detect_mult
307         if diag is not None:
308             self.diag = diag
309         if state is not None:
310             self.state = state
311         if auth_type is not None:
312             self.auth_type = auth_type
313
314     def fill_packet_fields(self, packet):
315         """ set packet fields with known values in packet """
316         bfd = packet[BFD]
317         if self.my_discriminator:
318             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
319                                    self.my_discriminator)
320             bfd.my_discriminator = self.my_discriminator
321         if self.your_discriminator:
322             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
323                                    self.your_discriminator)
324             bfd.your_discriminator = self.your_discriminator
325         if self.required_min_rx:
326             self.test.logger.debug(
327                 "BFD: setting packet.required_min_rx_interval=%s",
328                 self.required_min_rx)
329             bfd.required_min_rx_interval = self.required_min_rx
330         if self.required_min_echo_rx:
331             self.test.logger.debug(
332                 "BFD: setting packet.required_min_echo_rx=%s",
333                 self.required_min_echo_rx)
334             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
335         if self.desired_min_tx:
336             self.test.logger.debug(
337                 "BFD: setting packet.desired_min_tx_interval=%s",
338                 self.desired_min_tx)
339             bfd.desired_min_tx_interval = self.desired_min_tx
340         if self.detect_mult:
341             self.test.logger.debug(
342                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
343             bfd.detect_mult = self.detect_mult
344         if self.diag:
345             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
346             bfd.diag = self.diag
347         if self.state:
348             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
349             bfd.state = self.state
350         if self.auth_type:
351             # this is used by a negative test-case
352             self.test.logger.debug("BFD: setting packet.auth_type=%s",
353                                    self.auth_type)
354             bfd.auth_type = self.auth_type
355
356     def create_packet(self):
357         """ create a BFD packet, reflecting the current state of session """
358         if self.sha1_key:
359             bfd = BFD(flags="A")
360             bfd.auth_type = self.sha1_key.auth_type
361             bfd.auth_len = BFD.sha1_auth_len
362             bfd.auth_key_id = self.bfd_key_id
363             bfd.auth_seq_num = self.our_seq_number
364             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
365         else:
366             bfd = BFD()
367         if self.af == AF_INET6:
368             packet = (Ether(src=self.interface.remote_mac,
369                             dst=self.interface.local_mac) /
370                       IPv6(src=self.interface.remote_ip6,
371                            dst=self.interface.local_ip6,
372                            hlim=255) /
373                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
374                       bfd)
375         else:
376             packet = (Ether(src=self.interface.remote_mac,
377                             dst=self.interface.local_mac) /
378                       IP(src=self.interface.remote_ip4,
379                          dst=self.interface.local_ip4,
380                          ttl=255) /
381                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
382                       bfd)
383         self.test.logger.debug("BFD: Creating packet")
384         self.fill_packet_fields(packet)
385         if self.sha1_key:
386             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
387                 "\0" * (20 - len(self.sha1_key.key))
388             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
389                                    hashlib.sha1(hash_material).hexdigest())
390             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
391         return packet
392
393     def send_packet(self, packet=None, interface=None):
394         """ send packet on interface, creating the packet if needed """
395         if packet is None:
396             packet = self.create_packet()
397         if interface is None:
398             interface = self.test.pg0
399         self.test.logger.debug(ppp("Sending packet:", packet))
400         interface.add_stream(packet)
401         self.test.pg_start()
402
403     def verify_sha1_auth(self, packet):
404         """ Verify correctness of authentication in BFD layer. """
405         bfd = packet[BFD]
406         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
407         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
408                                BFDAuthType)
409         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
410         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
411         if self.vpp_seq_number is None:
412             self.vpp_seq_number = bfd.auth_seq_num
413             self.test.logger.debug("Received initial sequence number: %s" %
414                                    self.vpp_seq_number)
415         else:
416             recvd_seq_num = bfd.auth_seq_num
417             self.test.logger.debug("Received followup sequence number: %s" %
418                                    recvd_seq_num)
419             if self.vpp_seq_number < 0xffffffff:
420                 if self.sha1_key.auth_type == \
421                         BFDAuthType.meticulous_keyed_sha1:
422                     self.test.assert_equal(recvd_seq_num,
423                                            self.vpp_seq_number + 1,
424                                            "BFD sequence number")
425                 else:
426                     self.test.assert_in_range(recvd_seq_num,
427                                               self.vpp_seq_number,
428                                               self.vpp_seq_number + 1,
429                                               "BFD sequence number")
430             else:
431                 if self.sha1_key.auth_type == \
432                         BFDAuthType.meticulous_keyed_sha1:
433                     self.test.assert_equal(recvd_seq_num, 0,
434                                            "BFD sequence number")
435                 else:
436                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
437                                        "BFD sequence number not one of "
438                                        "(%s, 0)" % self.vpp_seq_number)
439             self.vpp_seq_number = recvd_seq_num
440         # last 20 bytes represent the hash - so replace them with the key,
441         # pad the result with zeros and hash the result
442         hash_material = bfd.original[:-20] + self.sha1_key.key + \
443             "\0" * (20 - len(self.sha1_key.key))
444         expected_hash = hashlib.sha1(hash_material).hexdigest()
445         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
446                                expected_hash, "Auth key hash")
447
448     def verify_bfd(self, packet):
449         """ Verify correctness of BFD layer. """
450         bfd = packet[BFD]
451         self.test.assert_equal(bfd.version, 1, "BFD version")
452         self.test.assert_equal(bfd.your_discriminator,
453                                self.my_discriminator,
454                                "BFD - your discriminator")
455         if self.sha1_key:
456             self.verify_sha1_auth(packet)
457
458
459 def bfd_session_up(test):
460     """ Bring BFD session up """
461     test.logger.info("BFD: Waiting for slow hello")
462     p = wait_for_bfd_packet(test, 2)
463     old_offset = None
464     if hasattr(test, 'vpp_clock_offset'):
465         old_offset = test.vpp_clock_offset
466     test.vpp_clock_offset = time.time() - p.time
467     test.logger.debug("BFD: Calculated vpp clock offset: %s",
468                       test.vpp_clock_offset)
469     if old_offset:
470         test.assertAlmostEqual(
471             old_offset, test.vpp_clock_offset, delta=0.5,
472             msg="vpp clock offset not stable (new: %s, old: %s)" %
473             (test.vpp_clock_offset, old_offset))
474     test.logger.info("BFD: Sending Init")
475     test.test_session.update(my_discriminator=randint(0, 40000000),
476                              your_discriminator=p[BFD].my_discriminator,
477                              state=BFDState.init)
478     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
479             BFDAuthType.meticulous_keyed_sha1:
480         test.test_session.inc_seq_num()
481     test.test_session.send_packet()
482     test.logger.info("BFD: Waiting for event")
483     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
484     verify_event(test, e, expected_state=BFDState.up)
485     test.logger.info("BFD: Session is Up")
486     test.test_session.update(state=BFDState.up)
487     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
488             BFDAuthType.meticulous_keyed_sha1:
489         test.test_session.inc_seq_num()
490     test.test_session.send_packet()
491     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
492
493
494 def bfd_session_down(test):
495     """ Bring BFD session down """
496     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
497     test.test_session.update(state=BFDState.down)
498     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
499             BFDAuthType.meticulous_keyed_sha1:
500         test.test_session.inc_seq_num()
501     test.test_session.send_packet()
502     test.logger.info("BFD: Waiting for event")
503     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
504     verify_event(test, e, expected_state=BFDState.down)
505     test.logger.info("BFD: Session is Down")
506     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
507
508
509 def verify_bfd_session_config(test, session, state=None):
510     dump = session.get_bfd_udp_session_dump_entry()
511     test.assertIsNotNone(dump)
512     # since dump is not none, we have verified that sw_if_index and addresses
513     # are valid (in get_bfd_udp_session_dump_entry)
514     if state:
515         test.assert_equal(dump.state, state, "session state")
516     test.assert_equal(dump.required_min_rx, session.required_min_rx,
517                       "required min rx interval")
518     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
519                       "desired min tx interval")
520     test.assert_equal(dump.detect_mult, session.detect_mult,
521                       "detect multiplier")
522     if session.sha1_key is None:
523         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
524     else:
525         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
526         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
527                           "bfd key id")
528         test.assert_equal(dump.conf_key_id,
529                           session.sha1_key.conf_key_id,
530                           "config key id")
531
532
533 def verify_ip(test, packet):
534     """ Verify correctness of IP layer. """
535     if test.vpp_session.af == AF_INET6:
536         ip = packet[IPv6]
537         local_ip = test.pg0.local_ip6
538         remote_ip = test.pg0.remote_ip6
539         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
540     else:
541         ip = packet[IP]
542         local_ip = test.pg0.local_ip4
543         remote_ip = test.pg0.remote_ip4
544         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
545     test.assert_equal(ip.src, local_ip, "IP source address")
546     test.assert_equal(ip.dst, remote_ip, "IP destination address")
547
548
549 def verify_udp(test, packet):
550     """ Verify correctness of UDP layer. """
551     udp = packet[UDP]
552     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
553     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
554                          "UDP source port")
555
556
557 def verify_event(test, event, expected_state):
558     """ Verify correctness of event values. """
559     e = event
560     test.logger.debug("BFD: Event: %s" % repr(e))
561     test.assert_equal(e.sw_if_index,
562                       test.vpp_session.interface.sw_if_index,
563                       "BFD interface index")
564     is_ipv6 = 0
565     if test.vpp_session.af == AF_INET6:
566         is_ipv6 = 1
567     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
568     if test.vpp_session.af == AF_INET:
569         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
570                           "Local IPv4 address")
571         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
572                           "Peer IPv4 address")
573     else:
574         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
575                           "Local IPv6 address")
576         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
577                           "Peer IPv6 address")
578     test.assert_equal(e.state, expected_state, BFDState)
579
580
581 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
582     """ wait for BFD packet and verify its correctness
583
584     :param timeout: how long to wait
585     :param pcap_time_min: ignore packets with pcap timestamp lower than this
586
587     :returns: tuple (packet, time spent waiting for packet)
588     """
589     test.logger.info("BFD: Waiting for BFD packet")
590     deadline = time.time() + timeout
591     counter = 0
592     while True:
593         counter += 1
594         # sanity check
595         test.assert_in_range(counter, 0, 100, "number of packets ignored")
596         time_left = deadline - time.time()
597         if time_left < 0:
598             raise CaptureTimeoutError("Packet did not arrive within timeout")
599         p = test.pg0.wait_for_packet(timeout=time_left)
600         test.logger.debug(ppp("BFD: Got packet:", p))
601         if pcap_time_min is not None and p.time < pcap_time_min:
602             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
603                                   "pcap time min %s):" %
604                                   (p.time, pcap_time_min), p))
605         else:
606             break
607     bfd = p[BFD]
608     if bfd is None:
609         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
610     if bfd.payload:
611         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
612     verify_ip(test, p)
613     verify_udp(test, p)
614     test.test_session.verify_bfd(p)
615     return p
616
617
618 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
619 class BFD4TestCase(VppTestCase):
620     """Bidirectional Forwarding Detection (BFD)"""
621
622     pg0 = None
623     vpp_clock_offset = None
624     vpp_session = None
625     test_session = None
626
627     @classmethod
628     def setUpClass(cls):
629         super(BFD4TestCase, cls).setUpClass()
630         cls.vapi.cli("set log class bfd level debug")
631         try:
632             cls.create_pg_interfaces([0])
633             cls.create_loopback_interfaces(1)
634             cls.loopback0 = cls.lo_interfaces[0]
635             cls.loopback0.config_ip4()
636             cls.loopback0.admin_up()
637             cls.pg0.config_ip4()
638             cls.pg0.configure_ipv4_neighbors()
639             cls.pg0.admin_up()
640             cls.pg0.resolve_arp()
641
642         except Exception:
643             super(BFD4TestCase, cls).tearDownClass()
644             raise
645
646     def setUp(self):
647         super(BFD4TestCase, self).setUp()
648         self.factory = AuthKeyFactory()
649         self.vapi.want_bfd_events()
650         self.pg0.enable_capture()
651         try:
652             self.vpp_session = VppBFDUDPSession(self, self.pg0,
653                                                 self.pg0.remote_ip4)
654             self.vpp_session.add_vpp_config()
655             self.vpp_session.admin_up()
656             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
657         except:
658             self.vapi.want_bfd_events(enable_disable=0)
659             raise
660
661     def tearDown(self):
662         if not self.vpp_dead:
663             self.vapi.want_bfd_events(enable_disable=0)
664         self.vapi.collect_events()  # clear the event queue
665         super(BFD4TestCase, self).tearDown()
666
667     def test_session_up(self):
668         """ bring BFD session up """
669         bfd_session_up(self)
670
671     def test_session_up_by_ip(self):
672         """ bring BFD session up - first frame looked up by address pair """
673         self.logger.info("BFD: Sending Slow control frame")
674         self.test_session.update(my_discriminator=randint(0, 40000000))
675         self.test_session.send_packet()
676         self.pg0.enable_capture()
677         p = self.pg0.wait_for_packet(1)
678         self.assert_equal(p[BFD].your_discriminator,
679                           self.test_session.my_discriminator,
680                           "BFD - your discriminator")
681         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
682         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
683                                  state=BFDState.up)
684         self.logger.info("BFD: Waiting for event")
685         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
686         verify_event(self, e, expected_state=BFDState.init)
687         self.logger.info("BFD: Sending Up")
688         self.test_session.send_packet()
689         self.logger.info("BFD: Waiting for event")
690         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
691         verify_event(self, e, expected_state=BFDState.up)
692         self.logger.info("BFD: Session is Up")
693         self.test_session.update(state=BFDState.up)
694         self.test_session.send_packet()
695         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
696
697     def test_session_down(self):
698         """ bring BFD session down """
699         bfd_session_up(self)
700         bfd_session_down(self)
701
702     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
703     def test_hold_up(self):
704         """ hold BFD session up """
705         bfd_session_up(self)
706         for dummy in range(self.test_session.detect_mult * 2):
707             wait_for_bfd_packet(self)
708             self.test_session.send_packet()
709         self.assert_equal(len(self.vapi.collect_events()), 0,
710                           "number of bfd events")
711
712     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
713     def test_slow_timer(self):
714         """ verify slow periodic control frames while session down """
715         packet_count = 3
716         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
717         prev_packet = wait_for_bfd_packet(self, 2)
718         for dummy in range(packet_count):
719             next_packet = wait_for_bfd_packet(self, 2)
720             time_diff = next_packet.time - prev_packet.time
721             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
722             # to work around timing issues
723             self.assert_in_range(
724                 time_diff, 0.70, 1.05, "time between slow packets")
725             prev_packet = next_packet
726
727     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
728     def test_zero_remote_min_rx(self):
729         """ no packets when zero remote required min rx interval """
730         bfd_session_up(self)
731         self.test_session.update(required_min_rx=0)
732         self.test_session.send_packet()
733         for dummy in range(self.test_session.detect_mult):
734             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
735                        "sleep before transmitting bfd packet")
736             self.test_session.send_packet()
737             try:
738                 p = wait_for_bfd_packet(self, timeout=0)
739                 self.logger.error(ppp("Received unexpected packet:", p))
740             except CaptureTimeoutError:
741                 pass
742         self.assert_equal(
743             len(self.vapi.collect_events()), 0, "number of bfd events")
744         self.test_session.update(required_min_rx=300000)
745         for dummy in range(3):
746             self.test_session.send_packet()
747             wait_for_bfd_packet(
748                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
749         self.assert_equal(
750             len(self.vapi.collect_events()), 0, "number of bfd events")
751
752     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
753     def test_conn_down(self):
754         """ verify session goes down after inactivity """
755         bfd_session_up(self)
756         detection_time = self.test_session.detect_mult *\
757             self.vpp_session.required_min_rx / USEC_IN_SEC
758         self.sleep(detection_time, "waiting for BFD session time-out")
759         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
760         verify_event(self, e, expected_state=BFDState.down)
761
762     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
763     def test_large_required_min_rx(self):
764         """ large remote required min rx interval """
765         bfd_session_up(self)
766         p = wait_for_bfd_packet(self)
767         interval = 3000000
768         self.test_session.update(required_min_rx=interval)
769         self.test_session.send_packet()
770         time_mark = time.time()
771         count = 0
772         # busy wait here, trying to collect a packet or event, vpp is not
773         # allowed to send packets and the session will timeout first - so the
774         # Up->Down event must arrive before any packets do
775         while time.time() < time_mark + interval / USEC_IN_SEC:
776             try:
777                 p = wait_for_bfd_packet(self, timeout=0)
778                 # if vpp managed to send a packet before we did the session
779                 # session update, then that's fine, ignore it
780                 if p.time < time_mark - self.vpp_clock_offset:
781                     continue
782                 self.logger.error(ppp("Received unexpected packet:", p))
783                 count += 1
784             except CaptureTimeoutError:
785                 pass
786             events = self.vapi.collect_events()
787             if len(events) > 0:
788                 verify_event(self, events[0], BFDState.down)
789                 break
790         self.assert_equal(count, 0, "number of packets received")
791
792     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
793     def test_immediate_remote_min_rx_reduction(self):
794         """ immediately honor remote required min rx reduction """
795         self.vpp_session.remove_vpp_config()
796         self.vpp_session = VppBFDUDPSession(
797             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
798         self.pg0.enable_capture()
799         self.vpp_session.add_vpp_config()
800         self.test_session.update(desired_min_tx=1000000,
801                                  required_min_rx=1000000)
802         bfd_session_up(self)
803         reference_packet = wait_for_bfd_packet(self)
804         time_mark = time.time()
805         interval = 300000
806         self.test_session.update(required_min_rx=interval)
807         self.test_session.send_packet()
808         extra_time = time.time() - time_mark
809         p = wait_for_bfd_packet(self)
810         # first packet is allowed to be late by time we spent doing the update
811         # calculated in extra_time
812         self.assert_in_range(p.time - reference_packet.time,
813                              .95 * 0.75 * interval / USEC_IN_SEC,
814                              1.05 * interval / USEC_IN_SEC + extra_time,
815                              "time between BFD packets")
816         reference_packet = p
817         for dummy in range(3):
818             p = wait_for_bfd_packet(self)
819             diff = p.time - reference_packet.time
820             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
821                                  1.05 * interval / USEC_IN_SEC,
822                                  "time between BFD packets")
823             reference_packet = p
824
825     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
826     def test_modify_req_min_rx_double(self):
827         """ modify session - double required min rx """
828         bfd_session_up(self)
829         p = wait_for_bfd_packet(self)
830         self.test_session.update(desired_min_tx=10000,
831                                  required_min_rx=10000)
832         self.test_session.send_packet()
833         # double required min rx
834         self.vpp_session.modify_parameters(
835             required_min_rx=2 * self.vpp_session.required_min_rx)
836         p = wait_for_bfd_packet(
837             self, pcap_time_min=time.time() - self.vpp_clock_offset)
838         # poll bit needs to be set
839         self.assertIn("P", p.sprintf("%BFD.flags%"),
840                       "Poll bit not set in BFD packet")
841         # finish poll sequence with final packet
842         final = self.test_session.create_packet()
843         final[BFD].flags = "F"
844         timeout = self.test_session.detect_mult * \
845             max(self.test_session.desired_min_tx,
846                 self.vpp_session.required_min_rx) / USEC_IN_SEC
847         self.test_session.send_packet(final)
848         time_mark = time.time()
849         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
850         verify_event(self, e, expected_state=BFDState.down)
851         time_to_event = time.time() - time_mark
852         self.assert_in_range(time_to_event, .9 * timeout,
853                              1.1 * timeout, "session timeout")
854
855     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
856     def test_modify_req_min_rx_halve(self):
857         """ modify session - halve required min rx """
858         self.vpp_session.modify_parameters(
859             required_min_rx=2 * self.vpp_session.required_min_rx)
860         bfd_session_up(self)
861         p = wait_for_bfd_packet(self)
862         self.test_session.update(desired_min_tx=10000,
863                                  required_min_rx=10000)
864         self.test_session.send_packet()
865         p = wait_for_bfd_packet(
866             self, pcap_time_min=time.time() - self.vpp_clock_offset)
867         # halve required min rx
868         old_required_min_rx = self.vpp_session.required_min_rx
869         self.vpp_session.modify_parameters(
870             required_min_rx=0.5 * self.vpp_session.required_min_rx)
871         # now we wait 0.8*3*old-req-min-rx and the session should still be up
872         self.sleep(0.8 * self.vpp_session.detect_mult *
873                    old_required_min_rx / USEC_IN_SEC,
874                    "wait before finishing poll sequence")
875         self.assert_equal(len(self.vapi.collect_events()), 0,
876                           "number of bfd events")
877         p = wait_for_bfd_packet(self)
878         # poll bit needs to be set
879         self.assertIn("P", p.sprintf("%BFD.flags%"),
880                       "Poll bit not set in BFD packet")
881         # finish poll sequence with final packet
882         final = self.test_session.create_packet()
883         final[BFD].flags = "F"
884         self.test_session.send_packet(final)
885         # now the session should time out under new conditions
886         detection_time = self.test_session.detect_mult *\
887             self.vpp_session.required_min_rx / USEC_IN_SEC
888         before = time.time()
889         e = self.vapi.wait_for_event(
890             2 * detection_time, "bfd_udp_session_details")
891         after = time.time()
892         self.assert_in_range(after - before,
893                              0.9 * detection_time,
894                              1.1 * detection_time,
895                              "time before bfd session goes down")
896         verify_event(self, e, expected_state=BFDState.down)
897
898     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
899     def test_modify_detect_mult(self):
900         """ modify detect multiplier """
901         bfd_session_up(self)
902         p = wait_for_bfd_packet(self)
903         self.vpp_session.modify_parameters(detect_mult=1)
904         p = wait_for_bfd_packet(
905             self, pcap_time_min=time.time() - self.vpp_clock_offset)
906         self.assert_equal(self.vpp_session.detect_mult,
907                           p[BFD].detect_mult,
908                           "detect mult")
909         # poll bit must not be set
910         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
911                          "Poll bit not set in BFD packet")
912         self.vpp_session.modify_parameters(detect_mult=10)
913         p = wait_for_bfd_packet(
914             self, pcap_time_min=time.time() - self.vpp_clock_offset)
915         self.assert_equal(self.vpp_session.detect_mult,
916                           p[BFD].detect_mult,
917                           "detect mult")
918         # poll bit must not be set
919         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
920                          "Poll bit not set in BFD packet")
921
922     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
923     def test_queued_poll(self):
924         """ test poll sequence queueing """
925         bfd_session_up(self)
926         p = wait_for_bfd_packet(self)
927         self.vpp_session.modify_parameters(
928             required_min_rx=2 * self.vpp_session.required_min_rx)
929         p = wait_for_bfd_packet(self)
930         poll_sequence_start = time.time()
931         poll_sequence_length_min = 0.5
932         send_final_after = time.time() + poll_sequence_length_min
933         # poll bit needs to be set
934         self.assertIn("P", p.sprintf("%BFD.flags%"),
935                       "Poll bit not set in BFD packet")
936         self.assert_equal(p[BFD].required_min_rx_interval,
937                           self.vpp_session.required_min_rx,
938                           "BFD required min rx interval")
939         self.vpp_session.modify_parameters(
940             required_min_rx=2 * self.vpp_session.required_min_rx)
941         # 2nd poll sequence should be queued now
942         # don't send the reply back yet, wait for some time to emulate
943         # longer round-trip time
944         packet_count = 0
945         while time.time() < send_final_after:
946             self.test_session.send_packet()
947             p = wait_for_bfd_packet(self)
948             self.assert_equal(len(self.vapi.collect_events()), 0,
949                               "number of bfd events")
950             self.assert_equal(p[BFD].required_min_rx_interval,
951                               self.vpp_session.required_min_rx,
952                               "BFD required min rx interval")
953             packet_count += 1
954             # poll bit must be set
955             self.assertIn("P", p.sprintf("%BFD.flags%"),
956                           "Poll bit not set in BFD packet")
957         final = self.test_session.create_packet()
958         final[BFD].flags = "F"
959         self.test_session.send_packet(final)
960         # finish 1st with final
961         poll_sequence_length = time.time() - poll_sequence_start
962         # vpp must wait for some time before starting new poll sequence
963         poll_no_2_started = False
964         for dummy in range(2 * packet_count):
965             p = wait_for_bfd_packet(self)
966             self.assert_equal(len(self.vapi.collect_events()), 0,
967                               "number of bfd events")
968             if "P" in p.sprintf("%BFD.flags%"):
969                 poll_no_2_started = True
970                 if time.time() < poll_sequence_start + poll_sequence_length:
971                     raise Exception("VPP started 2nd poll sequence too soon")
972                 final = self.test_session.create_packet()
973                 final[BFD].flags = "F"
974                 self.test_session.send_packet(final)
975                 break
976             else:
977                 self.test_session.send_packet()
978         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
979         # finish 2nd with final
980         final = self.test_session.create_packet()
981         final[BFD].flags = "F"
982         self.test_session.send_packet(final)
983         p = wait_for_bfd_packet(self)
984         # poll bit must not be set
985         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
986                          "Poll bit set in BFD packet")
987
988     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
989     def test_poll_response(self):
990         """ test correct response to control frame with poll bit set """
991         bfd_session_up(self)
992         poll = self.test_session.create_packet()
993         poll[BFD].flags = "P"
994         self.test_session.send_packet(poll)
995         final = wait_for_bfd_packet(
996             self, pcap_time_min=time.time() - self.vpp_clock_offset)
997         self.assertIn("F", final.sprintf("%BFD.flags%"))
998
999     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1000     def test_no_periodic_if_remote_demand(self):
1001         """ no periodic frames outside poll sequence if remote demand set """
1002         bfd_session_up(self)
1003         demand = self.test_session.create_packet()
1004         demand[BFD].flags = "D"
1005         self.test_session.send_packet(demand)
1006         transmit_time = 0.9 \
1007             * max(self.vpp_session.required_min_rx,
1008                   self.test_session.desired_min_tx) \
1009             / USEC_IN_SEC
1010         count = 0
1011         for dummy in range(self.test_session.detect_mult * 2):
1012             time.sleep(transmit_time)
1013             self.test_session.send_packet(demand)
1014             try:
1015                 p = wait_for_bfd_packet(self, timeout=0)
1016                 self.logger.error(ppp("Received unexpected packet:", p))
1017                 count += 1
1018             except CaptureTimeoutError:
1019                 pass
1020         events = self.vapi.collect_events()
1021         for e in events:
1022             self.logger.error("Received unexpected event: %s", e)
1023         self.assert_equal(count, 0, "number of packets received")
1024         self.assert_equal(len(events), 0, "number of events received")
1025
1026     def test_echo_looped_back(self):
1027         """ echo packets looped back """
1028         # don't need a session in this case..
1029         self.vpp_session.remove_vpp_config()
1030         self.pg0.enable_capture()
1031         echo_packet_count = 10
1032         # random source port low enough to increment a few times..
1033         udp_sport_tx = randint(1, 50000)
1034         udp_sport_rx = udp_sport_tx
1035         echo_packet = (Ether(src=self.pg0.remote_mac,
1036                              dst=self.pg0.local_mac) /
1037                        IP(src=self.pg0.remote_ip4,
1038                           dst=self.pg0.remote_ip4) /
1039                        UDP(dport=BFD.udp_dport_echo) /
1040                        Raw("this should be looped back"))
1041         for dummy in range(echo_packet_count):
1042             self.sleep(.01, "delay between echo packets")
1043             echo_packet[UDP].sport = udp_sport_tx
1044             udp_sport_tx += 1
1045             self.logger.debug(ppp("Sending packet:", echo_packet))
1046             self.pg0.add_stream(echo_packet)
1047             self.pg_start()
1048         for dummy in range(echo_packet_count):
1049             p = self.pg0.wait_for_packet(1)
1050             self.logger.debug(ppp("Got packet:", p))
1051             ether = p[Ether]
1052             self.assert_equal(self.pg0.remote_mac,
1053                               ether.dst, "Destination MAC")
1054             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1055             ip = p[IP]
1056             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1057             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1058             udp = p[UDP]
1059             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1060                               "UDP destination port")
1061             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1062             udp_sport_rx += 1
1063             # need to compare the hex payload here, otherwise BFD_vpp_echo
1064             # gets in way
1065             self.assertEqual(str(p[UDP].payload),
1066                              str(echo_packet[UDP].payload),
1067                              "Received packet is not the echo packet sent")
1068         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1069                           "ECHO packet identifier for test purposes)")
1070
1071     def test_echo(self):
1072         """ echo function """
1073         bfd_session_up(self)
1074         self.test_session.update(required_min_echo_rx=150000)
1075         self.test_session.send_packet()
1076         detection_time = self.test_session.detect_mult *\
1077             self.vpp_session.required_min_rx / USEC_IN_SEC
1078         # echo shouldn't work without echo source set
1079         for dummy in range(10):
1080             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1081             self.sleep(sleep, "delay before sending bfd packet")
1082             self.test_session.send_packet()
1083         p = wait_for_bfd_packet(
1084             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1085         self.assert_equal(p[BFD].required_min_rx_interval,
1086                           self.vpp_session.required_min_rx,
1087                           "BFD required min rx interval")
1088         self.test_session.send_packet()
1089         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1090         echo_seen = False
1091         # should be turned on - loopback echo packets
1092         for dummy in range(3):
1093             loop_until = time.time() + 0.75 * detection_time
1094             while time.time() < loop_until:
1095                 p = self.pg0.wait_for_packet(1)
1096                 self.logger.debug(ppp("Got packet:", p))
1097                 if p[UDP].dport == BFD.udp_dport_echo:
1098                     self.assert_equal(
1099                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1100                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1101                                         "BFD ECHO src IP equal to loopback IP")
1102                     self.logger.debug(ppp("Looping back packet:", p))
1103                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1104                                       "ECHO packet destination MAC address")
1105                     p[Ether].dst = self.pg0.local_mac
1106                     self.pg0.add_stream(p)
1107                     self.pg_start()
1108                     echo_seen = True
1109                 elif p.haslayer(BFD):
1110                     if echo_seen:
1111                         self.assertGreaterEqual(
1112                             p[BFD].required_min_rx_interval,
1113                             1000000)
1114                     if "P" in p.sprintf("%BFD.flags%"):
1115                         final = self.test_session.create_packet()
1116                         final[BFD].flags = "F"
1117                         self.test_session.send_packet(final)
1118                 else:
1119                     raise Exception(ppp("Received unknown packet:", p))
1120
1121                 self.assert_equal(len(self.vapi.collect_events()), 0,
1122                                   "number of bfd events")
1123             self.test_session.send_packet()
1124         self.assertTrue(echo_seen, "No echo packets received")
1125
1126     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1127     def test_echo_fail(self):
1128         """ session goes down if echo function fails """
1129         bfd_session_up(self)
1130         self.test_session.update(required_min_echo_rx=150000)
1131         self.test_session.send_packet()
1132         detection_time = self.test_session.detect_mult *\
1133             self.vpp_session.required_min_rx / USEC_IN_SEC
1134         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1135         # echo function should be used now, but we will drop the echo packets
1136         verified_diag = False
1137         for dummy in range(3):
1138             loop_until = time.time() + 0.75 * detection_time
1139             while time.time() < loop_until:
1140                 p = self.pg0.wait_for_packet(1)
1141                 self.logger.debug(ppp("Got packet:", p))
1142                 if p[UDP].dport == BFD.udp_dport_echo:
1143                     # dropped
1144                     pass
1145                 elif p.haslayer(BFD):
1146                     if "P" in p.sprintf("%BFD.flags%"):
1147                         self.assertGreaterEqual(
1148                             p[BFD].required_min_rx_interval,
1149                             1000000)
1150                         final = self.test_session.create_packet()
1151                         final[BFD].flags = "F"
1152                         self.test_session.send_packet(final)
1153                     if p[BFD].state == BFDState.down:
1154                         self.assert_equal(p[BFD].diag,
1155                                           BFDDiagCode.echo_function_failed,
1156                                           BFDDiagCode)
1157                         verified_diag = True
1158                 else:
1159                     raise Exception(ppp("Received unknown packet:", p))
1160             self.test_session.send_packet()
1161         events = self.vapi.collect_events()
1162         self.assert_equal(len(events), 1, "number of bfd events")
1163         self.assert_equal(events[0].state, BFDState.down, BFDState)
1164         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1165
1166     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1167     def test_echo_stop(self):
1168         """ echo function stops if peer sets required min echo rx zero """
1169         bfd_session_up(self)
1170         self.test_session.update(required_min_echo_rx=150000)
1171         self.test_session.send_packet()
1172         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1173         # wait for first echo packet
1174         while True:
1175             p = self.pg0.wait_for_packet(1)
1176             self.logger.debug(ppp("Got packet:", p))
1177             if p[UDP].dport == BFD.udp_dport_echo:
1178                 self.logger.debug(ppp("Looping back packet:", p))
1179                 p[Ether].dst = self.pg0.local_mac
1180                 self.pg0.add_stream(p)
1181                 self.pg_start()
1182                 break
1183             elif p.haslayer(BFD):
1184                 # ignore BFD
1185                 pass
1186             else:
1187                 raise Exception(ppp("Received unknown packet:", p))
1188         self.test_session.update(required_min_echo_rx=0)
1189         self.test_session.send_packet()
1190         # echo packets shouldn't arrive anymore
1191         for dummy in range(5):
1192             wait_for_bfd_packet(
1193                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1194             self.test_session.send_packet()
1195             events = self.vapi.collect_events()
1196             self.assert_equal(len(events), 0, "number of bfd events")
1197
1198     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1199     def test_echo_source_removed(self):
1200         """ echo function stops if echo source is removed """
1201         bfd_session_up(self)
1202         self.test_session.update(required_min_echo_rx=150000)
1203         self.test_session.send_packet()
1204         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1205         # wait for first echo packet
1206         while True:
1207             p = self.pg0.wait_for_packet(1)
1208             self.logger.debug(ppp("Got packet:", p))
1209             if p[UDP].dport == BFD.udp_dport_echo:
1210                 self.logger.debug(ppp("Looping back packet:", p))
1211                 p[Ether].dst = self.pg0.local_mac
1212                 self.pg0.add_stream(p)
1213                 self.pg_start()
1214                 break
1215             elif p.haslayer(BFD):
1216                 # ignore BFD
1217                 pass
1218             else:
1219                 raise Exception(ppp("Received unknown packet:", p))
1220         self.vapi.bfd_udp_del_echo_source()
1221         self.test_session.send_packet()
1222         # echo packets shouldn't arrive anymore
1223         for dummy in range(5):
1224             wait_for_bfd_packet(
1225                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1226             self.test_session.send_packet()
1227             events = self.vapi.collect_events()
1228             self.assert_equal(len(events), 0, "number of bfd events")
1229
1230     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1231     def test_stale_echo(self):
1232         """ stale echo packets don't keep a session up """
1233         bfd_session_up(self)
1234         self.test_session.update(required_min_echo_rx=150000)
1235         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1236         self.test_session.send_packet()
1237         # should be turned on - loopback echo packets
1238         echo_packet = None
1239         timeout_at = None
1240         timeout_ok = False
1241         for dummy in range(10 * self.vpp_session.detect_mult):
1242             p = self.pg0.wait_for_packet(1)
1243             if p[UDP].dport == BFD.udp_dport_echo:
1244                 if echo_packet is None:
1245                     self.logger.debug(ppp("Got first echo packet:", p))
1246                     echo_packet = p
1247                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1248                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1249                 else:
1250                     self.logger.debug(ppp("Got followup echo packet:", p))
1251                 self.logger.debug(ppp("Looping back first echo packet:", p))
1252                 echo_packet[Ether].dst = self.pg0.local_mac
1253                 self.pg0.add_stream(echo_packet)
1254                 self.pg_start()
1255             elif p.haslayer(BFD):
1256                 self.logger.debug(ppp("Got packet:", p))
1257                 if "P" in p.sprintf("%BFD.flags%"):
1258                     final = self.test_session.create_packet()
1259                     final[BFD].flags = "F"
1260                     self.test_session.send_packet(final)
1261                 if p[BFD].state == BFDState.down:
1262                     self.assertIsNotNone(
1263                         timeout_at,
1264                         "Session went down before first echo packet received")
1265                     now = time.time()
1266                     self.assertGreaterEqual(
1267                         now, timeout_at,
1268                         "Session timeout at %s, but is expected at %s" %
1269                         (now, timeout_at))
1270                     self.assert_equal(p[BFD].diag,
1271                                       BFDDiagCode.echo_function_failed,
1272                                       BFDDiagCode)
1273                     events = self.vapi.collect_events()
1274                     self.assert_equal(len(events), 1, "number of bfd events")
1275                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1276                     timeout_ok = True
1277                     break
1278             else:
1279                 raise Exception(ppp("Received unknown packet:", p))
1280             self.test_session.send_packet()
1281         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1282
1283     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1284     def test_invalid_echo_checksum(self):
1285         """ echo packets with invalid checksum don't keep a session up """
1286         bfd_session_up(self)
1287         self.test_session.update(required_min_echo_rx=150000)
1288         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1289         self.test_session.send_packet()
1290         # should be turned on - loopback echo packets
1291         timeout_at = None
1292         timeout_ok = False
1293         for dummy in range(10 * self.vpp_session.detect_mult):
1294             p = self.pg0.wait_for_packet(1)
1295             if p[UDP].dport == BFD.udp_dport_echo:
1296                 self.logger.debug(ppp("Got echo packet:", p))
1297                 if timeout_at is None:
1298                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1299                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1300                 p[BFD_vpp_echo].checksum = getrandbits(64)
1301                 p[Ether].dst = self.pg0.local_mac
1302                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1303                 self.pg0.add_stream(p)
1304                 self.pg_start()
1305             elif p.haslayer(BFD):
1306                 self.logger.debug(ppp("Got packet:", p))
1307                 if "P" in p.sprintf("%BFD.flags%"):
1308                     final = self.test_session.create_packet()
1309                     final[BFD].flags = "F"
1310                     self.test_session.send_packet(final)
1311                 if p[BFD].state == BFDState.down:
1312                     self.assertIsNotNone(
1313                         timeout_at,
1314                         "Session went down before first echo packet received")
1315                     now = time.time()
1316                     self.assertGreaterEqual(
1317                         now, timeout_at,
1318                         "Session timeout at %s, but is expected at %s" %
1319                         (now, timeout_at))
1320                     self.assert_equal(p[BFD].diag,
1321                                       BFDDiagCode.echo_function_failed,
1322                                       BFDDiagCode)
1323                     events = self.vapi.collect_events()
1324                     self.assert_equal(len(events), 1, "number of bfd events")
1325                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1326                     timeout_ok = True
1327                     break
1328             else:
1329                 raise Exception(ppp("Received unknown packet:", p))
1330             self.test_session.send_packet()
1331         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1332
1333     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1334     def test_admin_up_down(self):
1335         """ put session admin-up and admin-down """
1336         bfd_session_up(self)
1337         self.vpp_session.admin_down()
1338         self.pg0.enable_capture()
1339         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1340         verify_event(self, e, expected_state=BFDState.admin_down)
1341         for dummy in range(2):
1342             p = wait_for_bfd_packet(self)
1343             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1344         # try to bring session up - shouldn't be possible
1345         self.test_session.update(state=BFDState.init)
1346         self.test_session.send_packet()
1347         for dummy in range(2):
1348             p = wait_for_bfd_packet(self)
1349             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1350         self.vpp_session.admin_up()
1351         self.test_session.update(state=BFDState.down)
1352         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1353         verify_event(self, e, expected_state=BFDState.down)
1354         p = wait_for_bfd_packet(
1355             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1356         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1357         self.test_session.send_packet()
1358         p = wait_for_bfd_packet(
1359             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1360         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1361         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1362         verify_event(self, e, expected_state=BFDState.init)
1363         self.test_session.update(state=BFDState.up)
1364         self.test_session.send_packet()
1365         p = wait_for_bfd_packet(
1366             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1367         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1368         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1369         verify_event(self, e, expected_state=BFDState.up)
1370
1371     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1372     def test_config_change_remote_demand(self):
1373         """ configuration change while peer in demand mode """
1374         bfd_session_up(self)
1375         demand = self.test_session.create_packet()
1376         demand[BFD].flags = "D"
1377         self.test_session.send_packet(demand)
1378         self.vpp_session.modify_parameters(
1379             required_min_rx=2 * self.vpp_session.required_min_rx)
1380         p = wait_for_bfd_packet(
1381             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1382         # poll bit must be set
1383         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1384         # terminate poll sequence
1385         final = self.test_session.create_packet()
1386         final[BFD].flags = "D+F"
1387         self.test_session.send_packet(final)
1388         # vpp should be quiet now again
1389         transmit_time = 0.9 \
1390             * max(self.vpp_session.required_min_rx,
1391                   self.test_session.desired_min_tx) \
1392             / USEC_IN_SEC
1393         count = 0
1394         for dummy in range(self.test_session.detect_mult * 2):
1395             time.sleep(transmit_time)
1396             self.test_session.send_packet(demand)
1397             try:
1398                 p = wait_for_bfd_packet(self, timeout=0)
1399                 self.logger.error(ppp("Received unexpected packet:", p))
1400                 count += 1
1401             except CaptureTimeoutError:
1402                 pass
1403         events = self.vapi.collect_events()
1404         for e in events:
1405             self.logger.error("Received unexpected event: %s", e)
1406         self.assert_equal(count, 0, "number of packets received")
1407         self.assert_equal(len(events), 0, "number of events received")
1408
1409     def test_intf_deleted(self):
1410         """ interface with bfd session deleted """
1411         intf = VppLoInterface(self)
1412         intf.config_ip4()
1413         intf.admin_up()
1414         sw_if_index = intf.sw_if_index
1415         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1416         vpp_session.add_vpp_config()
1417         vpp_session.admin_up()
1418         intf.remove_vpp_config()
1419         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1420         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1421         self.assertFalse(vpp_session.query_vpp_config())
1422
1423
1424 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1425 class BFD6TestCase(VppTestCase):
1426     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1427
1428     pg0 = None
1429     vpp_clock_offset = None
1430     vpp_session = None
1431     test_session = None
1432
1433     @classmethod
1434     def setUpClass(cls):
1435         super(BFD6TestCase, cls).setUpClass()
1436         cls.vapi.cli("set log class bfd level debug")
1437         try:
1438             cls.create_pg_interfaces([0])
1439             cls.pg0.config_ip6()
1440             cls.pg0.configure_ipv6_neighbors()
1441             cls.pg0.admin_up()
1442             cls.pg0.resolve_ndp()
1443             cls.create_loopback_interfaces(1)
1444             cls.loopback0 = cls.lo_interfaces[0]
1445             cls.loopback0.config_ip6()
1446             cls.loopback0.admin_up()
1447
1448         except Exception:
1449             super(BFD6TestCase, cls).tearDownClass()
1450             raise
1451
1452     def setUp(self):
1453         super(BFD6TestCase, self).setUp()
1454         self.factory = AuthKeyFactory()
1455         self.vapi.want_bfd_events()
1456         self.pg0.enable_capture()
1457         try:
1458             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1459                                                 self.pg0.remote_ip6,
1460                                                 af=AF_INET6)
1461             self.vpp_session.add_vpp_config()
1462             self.vpp_session.admin_up()
1463             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1464             self.logger.debug(self.vapi.cli("show adj nbr"))
1465         except:
1466             self.vapi.want_bfd_events(enable_disable=0)
1467             raise
1468
1469     def tearDown(self):
1470         if not self.vpp_dead:
1471             self.vapi.want_bfd_events(enable_disable=0)
1472         self.vapi.collect_events()  # clear the event queue
1473         super(BFD6TestCase, self).tearDown()
1474
1475     def test_session_up(self):
1476         """ bring BFD session up """
1477         bfd_session_up(self)
1478
1479     def test_session_up_by_ip(self):
1480         """ bring BFD session up - first frame looked up by address pair """
1481         self.logger.info("BFD: Sending Slow control frame")
1482         self.test_session.update(my_discriminator=randint(0, 40000000))
1483         self.test_session.send_packet()
1484         self.pg0.enable_capture()
1485         p = self.pg0.wait_for_packet(1)
1486         self.assert_equal(p[BFD].your_discriminator,
1487                           self.test_session.my_discriminator,
1488                           "BFD - your discriminator")
1489         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1490         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1491                                  state=BFDState.up)
1492         self.logger.info("BFD: Waiting for event")
1493         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1494         verify_event(self, e, expected_state=BFDState.init)
1495         self.logger.info("BFD: Sending Up")
1496         self.test_session.send_packet()
1497         self.logger.info("BFD: Waiting for event")
1498         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1499         verify_event(self, e, expected_state=BFDState.up)
1500         self.logger.info("BFD: Session is Up")
1501         self.test_session.update(state=BFDState.up)
1502         self.test_session.send_packet()
1503         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1504
1505     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1506     def test_hold_up(self):
1507         """ hold BFD session up """
1508         bfd_session_up(self)
1509         for dummy in range(self.test_session.detect_mult * 2):
1510             wait_for_bfd_packet(self)
1511             self.test_session.send_packet()
1512         self.assert_equal(len(self.vapi.collect_events()), 0,
1513                           "number of bfd events")
1514         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1515
1516     def test_echo_looped_back(self):
1517         """ echo packets looped back """
1518         # don't need a session in this case..
1519         self.vpp_session.remove_vpp_config()
1520         self.pg0.enable_capture()
1521         echo_packet_count = 10
1522         # random source port low enough to increment a few times..
1523         udp_sport_tx = randint(1, 50000)
1524         udp_sport_rx = udp_sport_tx
1525         echo_packet = (Ether(src=self.pg0.remote_mac,
1526                              dst=self.pg0.local_mac) /
1527                        IPv6(src=self.pg0.remote_ip6,
1528                             dst=self.pg0.remote_ip6) /
1529                        UDP(dport=BFD.udp_dport_echo) /
1530                        Raw("this should be looped back"))
1531         for dummy in range(echo_packet_count):
1532             self.sleep(.01, "delay between echo packets")
1533             echo_packet[UDP].sport = udp_sport_tx
1534             udp_sport_tx += 1
1535             self.logger.debug(ppp("Sending packet:", echo_packet))
1536             self.pg0.add_stream(echo_packet)
1537             self.pg_start()
1538         for dummy in range(echo_packet_count):
1539             p = self.pg0.wait_for_packet(1)
1540             self.logger.debug(ppp("Got packet:", p))
1541             ether = p[Ether]
1542             self.assert_equal(self.pg0.remote_mac,
1543                               ether.dst, "Destination MAC")
1544             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1545             ip = p[IPv6]
1546             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1547             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1548             udp = p[UDP]
1549             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1550                               "UDP destination port")
1551             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1552             udp_sport_rx += 1
1553             # need to compare the hex payload here, otherwise BFD_vpp_echo
1554             # gets in way
1555             self.assertEqual(str(p[UDP].payload),
1556                              str(echo_packet[UDP].payload),
1557                              "Received packet is not the echo packet sent")
1558         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1559                           "ECHO packet identifier for test purposes)")
1560         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1561                           "ECHO packet identifier for test purposes)")
1562
1563     def test_echo(self):
1564         """ echo function """
1565         bfd_session_up(self)
1566         self.test_session.update(required_min_echo_rx=150000)
1567         self.test_session.send_packet()
1568         detection_time = self.test_session.detect_mult *\
1569             self.vpp_session.required_min_rx / USEC_IN_SEC
1570         # echo shouldn't work without echo source set
1571         for dummy in range(10):
1572             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1573             self.sleep(sleep, "delay before sending bfd packet")
1574             self.test_session.send_packet()
1575         p = wait_for_bfd_packet(
1576             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1577         self.assert_equal(p[BFD].required_min_rx_interval,
1578                           self.vpp_session.required_min_rx,
1579                           "BFD required min rx interval")
1580         self.test_session.send_packet()
1581         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1582         echo_seen = False
1583         # should be turned on - loopback echo packets
1584         for dummy in range(3):
1585             loop_until = time.time() + 0.75 * detection_time
1586             while time.time() < loop_until:
1587                 p = self.pg0.wait_for_packet(1)
1588                 self.logger.debug(ppp("Got packet:", p))
1589                 if p[UDP].dport == BFD.udp_dport_echo:
1590                     self.assert_equal(
1591                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1592                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1593                                         "BFD ECHO src IP equal to loopback IP")
1594                     self.logger.debug(ppp("Looping back packet:", p))
1595                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1596                                       "ECHO packet destination MAC address")
1597                     p[Ether].dst = self.pg0.local_mac
1598                     self.pg0.add_stream(p)
1599                     self.pg_start()
1600                     echo_seen = True
1601                 elif p.haslayer(BFD):
1602                     if echo_seen:
1603                         self.assertGreaterEqual(
1604                             p[BFD].required_min_rx_interval,
1605                             1000000)
1606                     if "P" in p.sprintf("%BFD.flags%"):
1607                         final = self.test_session.create_packet()
1608                         final[BFD].flags = "F"
1609                         self.test_session.send_packet(final)
1610                 else:
1611                     raise Exception(ppp("Received unknown packet:", p))
1612
1613                 self.assert_equal(len(self.vapi.collect_events()), 0,
1614                                   "number of bfd events")
1615             self.test_session.send_packet()
1616         self.assertTrue(echo_seen, "No echo packets received")
1617
1618     def test_intf_deleted(self):
1619         """ interface with bfd session deleted """
1620         intf = VppLoInterface(self)
1621         intf.config_ip6()
1622         intf.admin_up()
1623         sw_if_index = intf.sw_if_index
1624         vpp_session = VppBFDUDPSession(
1625             self, intf, intf.remote_ip6, af=AF_INET6)
1626         vpp_session.add_vpp_config()
1627         vpp_session.admin_up()
1628         intf.remove_vpp_config()
1629         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1630         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1631         self.assertFalse(vpp_session.query_vpp_config())
1632
1633
1634 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1635 class BFDFIBTestCase(VppTestCase):
1636     """ BFD-FIB interactions (IPv6) """
1637
1638     vpp_session = None
1639     test_session = None
1640
1641     def setUp(self):
1642         super(BFDFIBTestCase, self).setUp()
1643         self.create_pg_interfaces(range(1))
1644
1645         self.vapi.want_bfd_events()
1646         self.pg0.enable_capture()
1647
1648         for i in self.pg_interfaces:
1649             i.admin_up()
1650             i.config_ip6()
1651             i.configure_ipv6_neighbors()
1652
1653     def tearDown(self):
1654         if not self.vpp_dead:
1655             self.vapi.want_bfd_events(enable_disable=0)
1656
1657         super(BFDFIBTestCase, self).tearDown()
1658
1659     @staticmethod
1660     def pkt_is_not_data_traffic(p):
1661         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1662         if p.haslayer(BFD) or is_ipv6_misc(p):
1663             return True
1664         return False
1665
1666     def test_session_with_fib(self):
1667         """ BFD-FIB interactions """
1668
1669         # packets to match against both of the routes
1670         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1671               IPv6(src="3001::1", dst="2001::1") /
1672               UDP(sport=1234, dport=1234) /
1673               Raw('\xa5' * 100)),
1674              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1675               IPv6(src="3001::1", dst="2002::1") /
1676               UDP(sport=1234, dport=1234) /
1677               Raw('\xa5' * 100))]
1678
1679         # A recursive and a non-recursive route via a next-hop that
1680         # will have a BFD session
1681         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1682                                   [VppRoutePath(self.pg0.remote_ip6,
1683                                                 self.pg0.sw_if_index,
1684                                                 proto=DpoProto.DPO_PROTO_IP6)],
1685                                   is_ip6=1)
1686         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1687                                   [VppRoutePath(self.pg0.remote_ip6,
1688                                                 0xffffffff,
1689                                                 proto=DpoProto.DPO_PROTO_IP6)],
1690                                   is_ip6=1)
1691         ip_2001_s_64.add_vpp_config()
1692         ip_2002_s_64.add_vpp_config()
1693
1694         # bring the session up now the routes are present
1695         self.vpp_session = VppBFDUDPSession(self,
1696                                             self.pg0,
1697                                             self.pg0.remote_ip6,
1698                                             af=AF_INET6)
1699         self.vpp_session.add_vpp_config()
1700         self.vpp_session.admin_up()
1701         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1702
1703         # session is up - traffic passes
1704         bfd_session_up(self)
1705
1706         self.pg0.add_stream(p)
1707         self.pg_start()
1708         for packet in p:
1709             captured = self.pg0.wait_for_packet(
1710                 1,
1711                 filter_out_fn=self.pkt_is_not_data_traffic)
1712             self.assertEqual(captured[IPv6].dst,
1713                              packet[IPv6].dst)
1714
1715         # session is up - traffic is dropped
1716         bfd_session_down(self)
1717
1718         self.pg0.add_stream(p)
1719         self.pg_start()
1720         with self.assertRaises(CaptureTimeoutError):
1721             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1722
1723         # session is up - traffic passes
1724         bfd_session_up(self)
1725
1726         self.pg0.add_stream(p)
1727         self.pg_start()
1728         for packet in p:
1729             captured = self.pg0.wait_for_packet(
1730                 1,
1731                 filter_out_fn=self.pkt_is_not_data_traffic)
1732             self.assertEqual(captured[IPv6].dst,
1733                              packet[IPv6].dst)
1734
1735
1736 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1737 class BFDSHA1TestCase(VppTestCase):
1738     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1739
1740     pg0 = None
1741     vpp_clock_offset = None
1742     vpp_session = None
1743     test_session = None
1744
1745     @classmethod
1746     def setUpClass(cls):
1747         super(BFDSHA1TestCase, cls).setUpClass()
1748         cls.vapi.cli("set log class bfd level debug")
1749         try:
1750             cls.create_pg_interfaces([0])
1751             cls.pg0.config_ip4()
1752             cls.pg0.admin_up()
1753             cls.pg0.resolve_arp()
1754
1755         except Exception:
1756             super(BFDSHA1TestCase, cls).tearDownClass()
1757             raise
1758
1759     def setUp(self):
1760         super(BFDSHA1TestCase, self).setUp()
1761         self.factory = AuthKeyFactory()
1762         self.vapi.want_bfd_events()
1763         self.pg0.enable_capture()
1764
1765     def tearDown(self):
1766         if not self.vpp_dead:
1767             self.vapi.want_bfd_events(enable_disable=0)
1768         self.vapi.collect_events()  # clear the event queue
1769         super(BFDSHA1TestCase, self).tearDown()
1770
1771     def test_session_up(self):
1772         """ bring BFD session up """
1773         key = self.factory.create_random_key(self)
1774         key.add_vpp_config()
1775         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1776                                             self.pg0.remote_ip4,
1777                                             sha1_key=key)
1778         self.vpp_session.add_vpp_config()
1779         self.vpp_session.admin_up()
1780         self.test_session = BFDTestSession(
1781             self, self.pg0, AF_INET, sha1_key=key,
1782             bfd_key_id=self.vpp_session.bfd_key_id)
1783         bfd_session_up(self)
1784
1785     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1786     def test_hold_up(self):
1787         """ hold BFD session up """
1788         key = self.factory.create_random_key(self)
1789         key.add_vpp_config()
1790         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1791                                             self.pg0.remote_ip4,
1792                                             sha1_key=key)
1793         self.vpp_session.add_vpp_config()
1794         self.vpp_session.admin_up()
1795         self.test_session = BFDTestSession(
1796             self, self.pg0, AF_INET, sha1_key=key,
1797             bfd_key_id=self.vpp_session.bfd_key_id)
1798         bfd_session_up(self)
1799         for dummy in range(self.test_session.detect_mult * 2):
1800             wait_for_bfd_packet(self)
1801             self.test_session.send_packet()
1802         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1803
1804     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1805     def test_hold_up_meticulous(self):
1806         """ hold BFD session up - meticulous auth """
1807         key = self.factory.create_random_key(
1808             self, BFDAuthType.meticulous_keyed_sha1)
1809         key.add_vpp_config()
1810         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1811                                             self.pg0.remote_ip4, sha1_key=key)
1812         self.vpp_session.add_vpp_config()
1813         self.vpp_session.admin_up()
1814         # specify sequence number so that it wraps
1815         self.test_session = BFDTestSession(
1816             self, self.pg0, AF_INET, sha1_key=key,
1817             bfd_key_id=self.vpp_session.bfd_key_id,
1818             our_seq_number=0xFFFFFFFF - 4)
1819         bfd_session_up(self)
1820         for dummy in range(30):
1821             wait_for_bfd_packet(self)
1822             self.test_session.inc_seq_num()
1823             self.test_session.send_packet()
1824         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1825
1826     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1827     def test_send_bad_seq_number(self):
1828         """ session is not kept alive by msgs with bad sequence numbers"""
1829         key = self.factory.create_random_key(
1830             self, BFDAuthType.meticulous_keyed_sha1)
1831         key.add_vpp_config()
1832         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1833                                             self.pg0.remote_ip4, sha1_key=key)
1834         self.vpp_session.add_vpp_config()
1835         self.test_session = BFDTestSession(
1836             self, self.pg0, AF_INET, sha1_key=key,
1837             bfd_key_id=self.vpp_session.bfd_key_id)
1838         bfd_session_up(self)
1839         detection_time = self.test_session.detect_mult *\
1840             self.vpp_session.required_min_rx / USEC_IN_SEC
1841         send_until = time.time() + 2 * detection_time
1842         while time.time() < send_until:
1843             self.test_session.send_packet()
1844             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1845                        "time between bfd packets")
1846         e = self.vapi.collect_events()
1847         # session should be down now, because the sequence numbers weren't
1848         # updated
1849         self.assert_equal(len(e), 1, "number of bfd events")
1850         verify_event(self, e[0], expected_state=BFDState.down)
1851
1852     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1853                                        legitimate_test_session,
1854                                        rogue_test_session,
1855                                        rogue_bfd_values=None):
1856         """ execute a rogue session interaction scenario
1857
1858         1. create vpp session, add config
1859         2. bring the legitimate session up
1860         3. copy the bfd values from legitimate session to rogue session
1861         4. apply rogue_bfd_values to rogue session
1862         5. set rogue session state to down
1863         6. send message to take the session down from the rogue session
1864         7. assert that the legitimate session is unaffected
1865         """
1866
1867         self.vpp_session = vpp_bfd_udp_session
1868         self.vpp_session.add_vpp_config()
1869         self.test_session = legitimate_test_session
1870         # bring vpp session up
1871         bfd_session_up(self)
1872         # send packet from rogue session
1873         rogue_test_session.update(
1874             my_discriminator=self.test_session.my_discriminator,
1875             your_discriminator=self.test_session.your_discriminator,
1876             desired_min_tx=self.test_session.desired_min_tx,
1877             required_min_rx=self.test_session.required_min_rx,
1878             detect_mult=self.test_session.detect_mult,
1879             diag=self.test_session.diag,
1880             state=self.test_session.state,
1881             auth_type=self.test_session.auth_type)
1882         if rogue_bfd_values:
1883             rogue_test_session.update(**rogue_bfd_values)
1884         rogue_test_session.update(state=BFDState.down)
1885         rogue_test_session.send_packet()
1886         wait_for_bfd_packet(self)
1887         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1888
1889     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1890     def test_mismatch_auth(self):
1891         """ session is not brought down by unauthenticated msg """
1892         key = self.factory.create_random_key(self)
1893         key.add_vpp_config()
1894         vpp_session = VppBFDUDPSession(
1895             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1896         legitimate_test_session = BFDTestSession(
1897             self, self.pg0, AF_INET, sha1_key=key,
1898             bfd_key_id=vpp_session.bfd_key_id)
1899         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1900         self.execute_rogue_session_scenario(vpp_session,
1901                                             legitimate_test_session,
1902                                             rogue_test_session)
1903
1904     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1905     def test_mismatch_bfd_key_id(self):
1906         """ session is not brought down by msg with non-existent key-id """
1907         key = self.factory.create_random_key(self)
1908         key.add_vpp_config()
1909         vpp_session = VppBFDUDPSession(
1910             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1911         # pick a different random bfd key id
1912         x = randint(0, 255)
1913         while x == vpp_session.bfd_key_id:
1914             x = randint(0, 255)
1915         legitimate_test_session = BFDTestSession(
1916             self, self.pg0, AF_INET, sha1_key=key,
1917             bfd_key_id=vpp_session.bfd_key_id)
1918         rogue_test_session = BFDTestSession(
1919             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1920         self.execute_rogue_session_scenario(vpp_session,
1921                                             legitimate_test_session,
1922                                             rogue_test_session)
1923
1924     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1925     def test_mismatched_auth_type(self):
1926         """ session is not brought down by msg with wrong auth type """
1927         key = self.factory.create_random_key(self)
1928         key.add_vpp_config()
1929         vpp_session = VppBFDUDPSession(
1930             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1931         legitimate_test_session = BFDTestSession(
1932             self, self.pg0, AF_INET, sha1_key=key,
1933             bfd_key_id=vpp_session.bfd_key_id)
1934         rogue_test_session = BFDTestSession(
1935             self, self.pg0, AF_INET, sha1_key=key,
1936             bfd_key_id=vpp_session.bfd_key_id)
1937         self.execute_rogue_session_scenario(
1938             vpp_session, legitimate_test_session, rogue_test_session,
1939             {'auth_type': BFDAuthType.keyed_md5})
1940
1941     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1942     def test_restart(self):
1943         """ simulate remote peer restart and resynchronization """
1944         key = self.factory.create_random_key(
1945             self, BFDAuthType.meticulous_keyed_sha1)
1946         key.add_vpp_config()
1947         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1948                                             self.pg0.remote_ip4, sha1_key=key)
1949         self.vpp_session.add_vpp_config()
1950         self.test_session = BFDTestSession(
1951             self, self.pg0, AF_INET, sha1_key=key,
1952             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1953         bfd_session_up(self)
1954         # don't send any packets for 2*detection_time
1955         detection_time = self.test_session.detect_mult *\
1956             self.vpp_session.required_min_rx / USEC_IN_SEC
1957         self.sleep(2 * detection_time, "simulating peer restart")
1958         events = self.vapi.collect_events()
1959         self.assert_equal(len(events), 1, "number of bfd events")
1960         verify_event(self, events[0], expected_state=BFDState.down)
1961         self.test_session.update(state=BFDState.down)
1962         # reset sequence number
1963         self.test_session.our_seq_number = 0
1964         self.test_session.vpp_seq_number = None
1965         # now throw away any pending packets
1966         self.pg0.enable_capture()
1967         bfd_session_up(self)
1968
1969
1970 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1971 class BFDAuthOnOffTestCase(VppTestCase):
1972     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1973
1974     pg0 = None
1975     vpp_session = None
1976     test_session = None
1977
1978     @classmethod
1979     def setUpClass(cls):
1980         super(BFDAuthOnOffTestCase, cls).setUpClass()
1981         cls.vapi.cli("set log class bfd level debug")
1982         try:
1983             cls.create_pg_interfaces([0])
1984             cls.pg0.config_ip4()
1985             cls.pg0.admin_up()
1986             cls.pg0.resolve_arp()
1987
1988         except Exception:
1989             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1990             raise
1991
1992     def setUp(self):
1993         super(BFDAuthOnOffTestCase, self).setUp()
1994         self.factory = AuthKeyFactory()
1995         self.vapi.want_bfd_events()
1996         self.pg0.enable_capture()
1997
1998     def tearDown(self):
1999         if not self.vpp_dead:
2000             self.vapi.want_bfd_events(enable_disable=0)
2001         self.vapi.collect_events()  # clear the event queue
2002         super(BFDAuthOnOffTestCase, self).tearDown()
2003
2004     def test_auth_on_immediate(self):
2005         """ turn auth on without disturbing session state (immediate) """
2006         key = self.factory.create_random_key(self)
2007         key.add_vpp_config()
2008         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2009                                             self.pg0.remote_ip4)
2010         self.vpp_session.add_vpp_config()
2011         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2012         bfd_session_up(self)
2013         for dummy in range(self.test_session.detect_mult * 2):
2014             p = wait_for_bfd_packet(self)
2015             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2016             self.test_session.send_packet()
2017         self.vpp_session.activate_auth(key)
2018         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2019         self.test_session.sha1_key = key
2020         for dummy in range(self.test_session.detect_mult * 2):
2021             p = wait_for_bfd_packet(self)
2022             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2023             self.test_session.send_packet()
2024         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2025         self.assert_equal(len(self.vapi.collect_events()), 0,
2026                           "number of bfd events")
2027
2028     def test_auth_off_immediate(self):
2029         """ turn auth off without disturbing session state (immediate) """
2030         key = self.factory.create_random_key(self)
2031         key.add_vpp_config()
2032         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2033                                             self.pg0.remote_ip4, sha1_key=key)
2034         self.vpp_session.add_vpp_config()
2035         self.test_session = BFDTestSession(
2036             self, self.pg0, AF_INET, sha1_key=key,
2037             bfd_key_id=self.vpp_session.bfd_key_id)
2038         bfd_session_up(self)
2039         # self.vapi.want_bfd_events(enable_disable=0)
2040         for dummy in range(self.test_session.detect_mult * 2):
2041             p = wait_for_bfd_packet(self)
2042             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2043             self.test_session.inc_seq_num()
2044             self.test_session.send_packet()
2045         self.vpp_session.deactivate_auth()
2046         self.test_session.bfd_key_id = None
2047         self.test_session.sha1_key = None
2048         for dummy in range(self.test_session.detect_mult * 2):
2049             p = wait_for_bfd_packet(self)
2050             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2051             self.test_session.inc_seq_num()
2052             self.test_session.send_packet()
2053         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2054         self.assert_equal(len(self.vapi.collect_events()), 0,
2055                           "number of bfd events")
2056
2057     def test_auth_change_key_immediate(self):
2058         """ change auth key without disturbing session state (immediate) """
2059         key1 = self.factory.create_random_key(self)
2060         key1.add_vpp_config()
2061         key2 = self.factory.create_random_key(self)
2062         key2.add_vpp_config()
2063         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2064                                             self.pg0.remote_ip4, sha1_key=key1)
2065         self.vpp_session.add_vpp_config()
2066         self.test_session = BFDTestSession(
2067             self, self.pg0, AF_INET, sha1_key=key1,
2068             bfd_key_id=self.vpp_session.bfd_key_id)
2069         bfd_session_up(self)
2070         for dummy in range(self.test_session.detect_mult * 2):
2071             p = wait_for_bfd_packet(self)
2072             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2073             self.test_session.send_packet()
2074         self.vpp_session.activate_auth(key2)
2075         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2076         self.test_session.sha1_key = key2
2077         for dummy in range(self.test_session.detect_mult * 2):
2078             p = wait_for_bfd_packet(self)
2079             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2080             self.test_session.send_packet()
2081         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2082         self.assert_equal(len(self.vapi.collect_events()), 0,
2083                           "number of bfd events")
2084
2085     def test_auth_on_delayed(self):
2086         """ turn auth on without disturbing session state (delayed) """
2087         key = self.factory.create_random_key(self)
2088         key.add_vpp_config()
2089         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2090                                             self.pg0.remote_ip4)
2091         self.vpp_session.add_vpp_config()
2092         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2093         bfd_session_up(self)
2094         for dummy in range(self.test_session.detect_mult * 2):
2095             wait_for_bfd_packet(self)
2096             self.test_session.send_packet()
2097         self.vpp_session.activate_auth(key, delayed=True)
2098         for dummy in range(self.test_session.detect_mult * 2):
2099             p = wait_for_bfd_packet(self)
2100             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2101             self.test_session.send_packet()
2102         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2103         self.test_session.sha1_key = key
2104         self.test_session.send_packet()
2105         for dummy in range(self.test_session.detect_mult * 2):
2106             p = wait_for_bfd_packet(self)
2107             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2108             self.test_session.send_packet()
2109         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2110         self.assert_equal(len(self.vapi.collect_events()), 0,
2111                           "number of bfd events")
2112
2113     def test_auth_off_delayed(self):
2114         """ turn auth off without disturbing session state (delayed) """
2115         key = self.factory.create_random_key(self)
2116         key.add_vpp_config()
2117         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2118                                             self.pg0.remote_ip4, sha1_key=key)
2119         self.vpp_session.add_vpp_config()
2120         self.test_session = BFDTestSession(
2121             self, self.pg0, AF_INET, sha1_key=key,
2122             bfd_key_id=self.vpp_session.bfd_key_id)
2123         bfd_session_up(self)
2124         for dummy in range(self.test_session.detect_mult * 2):
2125             p = wait_for_bfd_packet(self)
2126             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2127             self.test_session.send_packet()
2128         self.vpp_session.deactivate_auth(delayed=True)
2129         for dummy in range(self.test_session.detect_mult * 2):
2130             p = wait_for_bfd_packet(self)
2131             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2132             self.test_session.send_packet()
2133         self.test_session.bfd_key_id = None
2134         self.test_session.sha1_key = None
2135         self.test_session.send_packet()
2136         for dummy in range(self.test_session.detect_mult * 2):
2137             p = wait_for_bfd_packet(self)
2138             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2139             self.test_session.send_packet()
2140         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2141         self.assert_equal(len(self.vapi.collect_events()), 0,
2142                           "number of bfd events")
2143
2144     def test_auth_change_key_delayed(self):
2145         """ change auth key without disturbing session state (delayed) """
2146         key1 = self.factory.create_random_key(self)
2147         key1.add_vpp_config()
2148         key2 = self.factory.create_random_key(self)
2149         key2.add_vpp_config()
2150         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2151                                             self.pg0.remote_ip4, sha1_key=key1)
2152         self.vpp_session.add_vpp_config()
2153         self.vpp_session.admin_up()
2154         self.test_session = BFDTestSession(
2155             self, self.pg0, AF_INET, sha1_key=key1,
2156             bfd_key_id=self.vpp_session.bfd_key_id)
2157         bfd_session_up(self)
2158         for dummy in range(self.test_session.detect_mult * 2):
2159             p = wait_for_bfd_packet(self)
2160             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2161             self.test_session.send_packet()
2162         self.vpp_session.activate_auth(key2, delayed=True)
2163         for dummy in range(self.test_session.detect_mult * 2):
2164             p = wait_for_bfd_packet(self)
2165             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2166             self.test_session.send_packet()
2167         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2168         self.test_session.sha1_key = key2
2169         self.test_session.send_packet()
2170         for dummy in range(self.test_session.detect_mult * 2):
2171             p = wait_for_bfd_packet(self)
2172             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2173             self.test_session.send_packet()
2174         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2175         self.assert_equal(len(self.vapi.collect_events()), 0,
2176                           "number of bfd events")
2177
2178
2179 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2180 class BFDCLITestCase(VppTestCase):
2181     """Bidirectional Forwarding Detection (BFD) (CLI) """
2182     pg0 = None
2183
2184     @classmethod
2185     def setUpClass(cls):
2186         super(BFDCLITestCase, cls).setUpClass()
2187         cls.vapi.cli("set log class bfd level debug")
2188         try:
2189             cls.create_pg_interfaces((0,))
2190             cls.pg0.config_ip4()
2191             cls.pg0.config_ip6()
2192             cls.pg0.resolve_arp()
2193             cls.pg0.resolve_ndp()
2194
2195         except Exception:
2196             super(BFDCLITestCase, cls).tearDownClass()
2197             raise
2198
2199     def setUp(self):
2200         super(BFDCLITestCase, self).setUp()
2201         self.factory = AuthKeyFactory()
2202         self.pg0.enable_capture()
2203
2204     def tearDown(self):
2205         try:
2206             self.vapi.want_bfd_events(enable_disable=0)
2207         except UnexpectedApiReturnValueError:
2208             # some tests aren't subscribed, so this is not an issue
2209             pass
2210         self.vapi.collect_events()  # clear the event queue
2211         super(BFDCLITestCase, self).tearDown()
2212
2213     def cli_verify_no_response(self, cli):
2214         """ execute a CLI, asserting that the response is empty """
2215         self.assert_equal(self.vapi.cli(cli),
2216                           "",
2217                           "CLI command response")
2218
2219     def cli_verify_response(self, cli, expected):
2220         """ execute a CLI, asserting that the response matches expectation """
2221         self.assert_equal(self.vapi.cli(cli).strip(),
2222                           expected,
2223                           "CLI command response")
2224
2225     def test_show(self):
2226         """ show commands """
2227         k1 = self.factory.create_random_key(self)
2228         k1.add_vpp_config()
2229         k2 = self.factory.create_random_key(
2230             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2231         k2.add_vpp_config()
2232         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2233         s1.add_vpp_config()
2234         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2235                               sha1_key=k2)
2236         s2.add_vpp_config()
2237         self.logger.info(self.vapi.ppcli("show bfd keys"))
2238         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2239         self.logger.info(self.vapi.ppcli("show bfd"))
2240
2241     def test_set_del_sha1_key(self):
2242         """ set/delete SHA1 auth key """
2243         k = self.factory.create_random_key(self)
2244         self.registry.register(k, self.logger)
2245         self.cli_verify_no_response(
2246             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2247             (k.conf_key_id,
2248                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2249         self.assertTrue(k.query_vpp_config())
2250         self.vpp_session = VppBFDUDPSession(
2251             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2252         self.vpp_session.add_vpp_config()
2253         self.test_session = \
2254             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2255                            bfd_key_id=self.vpp_session.bfd_key_id)
2256         self.vapi.want_bfd_events()
2257         bfd_session_up(self)
2258         bfd_session_down(self)
2259         # try to replace the secret for the key - should fail because the key
2260         # is in-use
2261         k2 = self.factory.create_random_key(self)
2262         self.cli_verify_response(
2263             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2264             (k.conf_key_id,
2265                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2266             "bfd key set: `bfd_auth_set_key' API call failed, "
2267             "rv=-103:BFD object in use")
2268         # manipulating the session using old secret should still work
2269         bfd_session_up(self)
2270         bfd_session_down(self)
2271         self.vpp_session.remove_vpp_config()
2272         self.cli_verify_no_response(
2273             "bfd key del conf-key-id %s" % k.conf_key_id)
2274         self.assertFalse(k.query_vpp_config())
2275
2276     def test_set_del_meticulous_sha1_key(self):
2277         """ set/delete meticulous SHA1 auth key """
2278         k = self.factory.create_random_key(
2279             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2280         self.registry.register(k, self.logger)
2281         self.cli_verify_no_response(
2282             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2283             (k.conf_key_id,
2284                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2285         self.assertTrue(k.query_vpp_config())
2286         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2287                                             self.pg0.remote_ip6, af=AF_INET6,
2288                                             sha1_key=k)
2289         self.vpp_session.add_vpp_config()
2290         self.vpp_session.admin_up()
2291         self.test_session = \
2292             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2293                            bfd_key_id=self.vpp_session.bfd_key_id)
2294         self.vapi.want_bfd_events()
2295         bfd_session_up(self)
2296         bfd_session_down(self)
2297         # try to replace the secret for the key - should fail because the key
2298         # is in-use
2299         k2 = self.factory.create_random_key(self)
2300         self.cli_verify_response(
2301             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2302             (k.conf_key_id,
2303                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2304             "bfd key set: `bfd_auth_set_key' API call failed, "
2305             "rv=-103:BFD object in use")
2306         # manipulating the session using old secret should still work
2307         bfd_session_up(self)
2308         bfd_session_down(self)
2309         self.vpp_session.remove_vpp_config()
2310         self.cli_verify_no_response(
2311             "bfd key del conf-key-id %s" % k.conf_key_id)
2312         self.assertFalse(k.query_vpp_config())
2313
2314     def test_add_mod_del_bfd_udp(self):
2315         """ create/modify/delete IPv4 BFD UDP session """
2316         vpp_session = VppBFDUDPSession(
2317             self, self.pg0, self.pg0.remote_ip4)
2318         self.registry.register(vpp_session, self.logger)
2319         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2320             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2321             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2322                                 self.pg0.remote_ip4,
2323                                 vpp_session.desired_min_tx,
2324                                 vpp_session.required_min_rx,
2325                                 vpp_session.detect_mult)
2326         self.cli_verify_no_response(cli_add_cmd)
2327         # 2nd add should fail
2328         self.cli_verify_response(
2329             cli_add_cmd,
2330             "bfd udp session add: `bfd_add_add_session' API call"
2331             " failed, rv=-101:Duplicate BFD object")
2332         verify_bfd_session_config(self, vpp_session)
2333         mod_session = VppBFDUDPSession(
2334             self, self.pg0, self.pg0.remote_ip4,
2335             required_min_rx=2 * vpp_session.required_min_rx,
2336             desired_min_tx=3 * vpp_session.desired_min_tx,
2337             detect_mult=4 * vpp_session.detect_mult)
2338         self.cli_verify_no_response(
2339             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2340             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2341             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2342              mod_session.desired_min_tx, mod_session.required_min_rx,
2343              mod_session.detect_mult))
2344         verify_bfd_session_config(self, mod_session)
2345         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2346             "peer-addr %s" % (self.pg0.name,
2347                               self.pg0.local_ip4, self.pg0.remote_ip4)
2348         self.cli_verify_no_response(cli_del_cmd)
2349         # 2nd del is expected to fail
2350         self.cli_verify_response(
2351             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2352             " failed, rv=-102:No such BFD object")
2353         self.assertFalse(vpp_session.query_vpp_config())
2354
2355     def test_add_mod_del_bfd_udp6(self):
2356         """ create/modify/delete IPv6 BFD UDP session """
2357         vpp_session = VppBFDUDPSession(
2358             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2359         self.registry.register(vpp_session, self.logger)
2360         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2361             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2362             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2363                                 self.pg0.remote_ip6,
2364                                 vpp_session.desired_min_tx,
2365                                 vpp_session.required_min_rx,
2366                                 vpp_session.detect_mult)
2367         self.cli_verify_no_response(cli_add_cmd)
2368         # 2nd add should fail
2369         self.cli_verify_response(
2370             cli_add_cmd,
2371             "bfd udp session add: `bfd_add_add_session' API call"
2372             " failed, rv=-101:Duplicate BFD object")
2373         verify_bfd_session_config(self, vpp_session)
2374         mod_session = VppBFDUDPSession(
2375             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2376             required_min_rx=2 * vpp_session.required_min_rx,
2377             desired_min_tx=3 * vpp_session.desired_min_tx,
2378             detect_mult=4 * vpp_session.detect_mult)
2379         self.cli_verify_no_response(
2380             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2381             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2382             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2383              mod_session.desired_min_tx,
2384              mod_session.required_min_rx, mod_session.detect_mult))
2385         verify_bfd_session_config(self, mod_session)
2386         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2387             "peer-addr %s" % (self.pg0.name,
2388                               self.pg0.local_ip6, self.pg0.remote_ip6)
2389         self.cli_verify_no_response(cli_del_cmd)
2390         # 2nd del is expected to fail
2391         self.cli_verify_response(
2392             cli_del_cmd,
2393             "bfd udp session del: `bfd_udp_del_session' API call"
2394             " failed, rv=-102:No such BFD object")
2395         self.assertFalse(vpp_session.query_vpp_config())
2396
2397     def test_add_mod_del_bfd_udp_auth(self):
2398         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2399         key = self.factory.create_random_key(self)
2400         key.add_vpp_config()
2401         vpp_session = VppBFDUDPSession(
2402             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2403         self.registry.register(vpp_session, self.logger)
2404         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2405             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2406             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2407             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2408                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2409                vpp_session.detect_mult, key.conf_key_id,
2410                vpp_session.bfd_key_id)
2411         self.cli_verify_no_response(cli_add_cmd)
2412         # 2nd add should fail
2413         self.cli_verify_response(
2414             cli_add_cmd,
2415             "bfd udp session add: `bfd_add_add_session' API call"
2416             " failed, rv=-101:Duplicate BFD object")
2417         verify_bfd_session_config(self, vpp_session)
2418         mod_session = VppBFDUDPSession(
2419             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2420             bfd_key_id=vpp_session.bfd_key_id,
2421             required_min_rx=2 * vpp_session.required_min_rx,
2422             desired_min_tx=3 * vpp_session.desired_min_tx,
2423             detect_mult=4 * vpp_session.detect_mult)
2424         self.cli_verify_no_response(
2425             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2426             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2427             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2428              mod_session.desired_min_tx,
2429              mod_session.required_min_rx, mod_session.detect_mult))
2430         verify_bfd_session_config(self, mod_session)
2431         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2432             "peer-addr %s" % (self.pg0.name,
2433                               self.pg0.local_ip4, self.pg0.remote_ip4)
2434         self.cli_verify_no_response(cli_del_cmd)
2435         # 2nd del is expected to fail
2436         self.cli_verify_response(
2437             cli_del_cmd,
2438             "bfd udp session del: `bfd_udp_del_session' API call"
2439             " failed, rv=-102:No such BFD object")
2440         self.assertFalse(vpp_session.query_vpp_config())
2441
2442     def test_add_mod_del_bfd_udp6_auth(self):
2443         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2444         key = self.factory.create_random_key(
2445             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2446         key.add_vpp_config()
2447         vpp_session = VppBFDUDPSession(
2448             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2449         self.registry.register(vpp_session, self.logger)
2450         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2451             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2452             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2453             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2454                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2455                vpp_session.detect_mult, key.conf_key_id,
2456                vpp_session.bfd_key_id)
2457         self.cli_verify_no_response(cli_add_cmd)
2458         # 2nd add should fail
2459         self.cli_verify_response(
2460             cli_add_cmd,
2461             "bfd udp session add: `bfd_add_add_session' API call"
2462             " failed, rv=-101:Duplicate BFD object")
2463         verify_bfd_session_config(self, vpp_session)
2464         mod_session = VppBFDUDPSession(
2465             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2466             bfd_key_id=vpp_session.bfd_key_id,
2467             required_min_rx=2 * vpp_session.required_min_rx,
2468             desired_min_tx=3 * vpp_session.desired_min_tx,
2469             detect_mult=4 * vpp_session.detect_mult)
2470         self.cli_verify_no_response(
2471             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2472             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2473             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2474              mod_session.desired_min_tx,
2475              mod_session.required_min_rx, mod_session.detect_mult))
2476         verify_bfd_session_config(self, mod_session)
2477         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2478             "peer-addr %s" % (self.pg0.name,
2479                               self.pg0.local_ip6, self.pg0.remote_ip6)
2480         self.cli_verify_no_response(cli_del_cmd)
2481         # 2nd del is expected to fail
2482         self.cli_verify_response(
2483             cli_del_cmd,
2484             "bfd udp session del: `bfd_udp_del_session' API call"
2485             " failed, rv=-102:No such BFD object")
2486         self.assertFalse(vpp_session.query_vpp_config())
2487
2488     def test_auth_on_off(self):
2489         """ turn authentication on and off """
2490         key = self.factory.create_random_key(
2491             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2492         key.add_vpp_config()
2493         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2494         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2495                                         sha1_key=key)
2496         session.add_vpp_config()
2497         cli_activate = \
2498             "bfd udp session auth activate interface %s local-addr %s "\
2499             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2500             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2501                key.conf_key_id, auth_session.bfd_key_id)
2502         self.cli_verify_no_response(cli_activate)
2503         verify_bfd_session_config(self, auth_session)
2504         self.cli_verify_no_response(cli_activate)
2505         verify_bfd_session_config(self, auth_session)
2506         cli_deactivate = \
2507             "bfd udp session auth deactivate interface %s local-addr %s "\
2508             "peer-addr %s "\
2509             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2510         self.cli_verify_no_response(cli_deactivate)
2511         verify_bfd_session_config(self, session)
2512         self.cli_verify_no_response(cli_deactivate)
2513         verify_bfd_session_config(self, session)
2514
2515     def test_auth_on_off_delayed(self):
2516         """ turn authentication on and off (delayed) """
2517         key = self.factory.create_random_key(
2518             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2519         key.add_vpp_config()
2520         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2521         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2522                                         sha1_key=key)
2523         session.add_vpp_config()
2524         cli_activate = \
2525             "bfd udp session auth activate interface %s local-addr %s "\
2526             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2527             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2528                key.conf_key_id, auth_session.bfd_key_id)
2529         self.cli_verify_no_response(cli_activate)
2530         verify_bfd_session_config(self, auth_session)
2531         self.cli_verify_no_response(cli_activate)
2532         verify_bfd_session_config(self, auth_session)
2533         cli_deactivate = \
2534             "bfd udp session auth deactivate interface %s local-addr %s "\
2535             "peer-addr %s delayed yes"\
2536             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2537         self.cli_verify_no_response(cli_deactivate)
2538         verify_bfd_session_config(self, session)
2539         self.cli_verify_no_response(cli_deactivate)
2540         verify_bfd_session_config(self, session)
2541
2542     def test_admin_up_down(self):
2543         """ put session admin-up and admin-down """
2544         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2545         session.add_vpp_config()
2546         cli_down = \
2547             "bfd udp session set-flags admin down interface %s local-addr %s "\
2548             "peer-addr %s "\
2549             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2550         cli_up = \
2551             "bfd udp session set-flags admin up interface %s local-addr %s "\
2552             "peer-addr %s "\
2553             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2554         self.cli_verify_no_response(cli_down)
2555         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2556         self.cli_verify_no_response(cli_up)
2557         verify_bfd_session_config(self, session, state=BFDState.down)
2558
2559     def test_set_del_udp_echo_source(self):
2560         """ set/del udp echo source """
2561         self.create_loopback_interfaces(1)
2562         self.loopback0 = self.lo_interfaces[0]
2563         self.loopback0.admin_up()
2564         self.cli_verify_response("show bfd echo-source",
2565                                  "UDP echo source is not set.")
2566         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2567         self.cli_verify_no_response(cli_set)
2568         self.cli_verify_response("show bfd echo-source",
2569                                  "UDP echo source is: %s\n"
2570                                  "IPv4 address usable as echo source: none\n"
2571                                  "IPv6 address usable as echo source: none" %
2572                                  self.loopback0.name)
2573         self.loopback0.config_ip4()
2574         unpacked = unpack("!L", self.loopback0.local_ip4n)
2575         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2576         self.cli_verify_response("show bfd echo-source",
2577                                  "UDP echo source is: %s\n"
2578                                  "IPv4 address usable as echo source: %s\n"
2579                                  "IPv6 address usable as echo source: none" %
2580                                  (self.loopback0.name, echo_ip4))
2581         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2582         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2583                                             unpacked[2], unpacked[3] ^ 1))
2584         self.loopback0.config_ip6()
2585         self.cli_verify_response("show bfd echo-source",
2586                                  "UDP echo source is: %s\n"
2587                                  "IPv4 address usable as echo source: %s\n"
2588                                  "IPv6 address usable as echo source: %s" %
2589                                  (self.loopback0.name, echo_ip4, echo_ip6))
2590         cli_del = "bfd udp echo-source del"
2591         self.cli_verify_no_response(cli_del)
2592         self.cli_verify_response("show bfd echo-source",
2593                                  "UDP echo source is not set.")
2594
2595 if __name__ == '__main__':
2596     unittest.main(testRunner=VppTestRunner)