BFD: SHA1 authentication
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2
3 import unittest
4 import hashlib
5 import binascii
6 import time
7 from random import randint
8 from bfd import *
9 from framework import *
10 from util import ppp
11
12 us_in_sec = 1000000
13
14
15 class AuthKeyFactory(object):
16     """Factory class for creating auth keys with unique conf key ID"""
17
18     def __init__(self):
19         self._conf_key_ids = {}
20
21     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
22         conf_key_id = randint(0, 0xFFFFFFFF)
23         while conf_key_id in self._conf_key_ids:
24             conf_key_id = randint(0, 0xFFFFFFFF)
25         self._conf_key_ids[conf_key_id] = 1
26         key = str(bytearray([randint(0, 255) for j in range(randint(1, 20))]))
27         return VppBFDAuthKey(test=test, auth_type=auth_type,
28                              conf_key_id=conf_key_id, key=key)
29
30
31 class BFDAPITestCase(VppTestCase):
32     """Bidirectional Forwarding Detection (BFD) - API"""
33
34     @classmethod
35     def setUpClass(cls):
36         super(BFDAPITestCase, cls).setUpClass()
37
38         try:
39             cls.create_pg_interfaces(range(2))
40             for i in cls.pg_interfaces:
41                 i.config_ip4()
42                 i.config_ip6()
43                 i.resolve_arp()
44
45         except Exception:
46             super(BFDAPITestCase, cls).tearDownClass()
47             raise
48
49     def setUp(self):
50         super(BFDAPITestCase, self).setUp()
51         self.factory = AuthKeyFactory()
52
53     def test_add_bfd(self):
54         """ create a BFD session """
55         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
56         session.add_vpp_config()
57         self.logger.debug("Session state is %s" % str(session.state))
58         session.remove_vpp_config()
59         session.add_vpp_config()
60         self.logger.debug("Session state is %s" % str(session.state))
61         session.remove_vpp_config()
62
63     def test_double_add(self):
64         """ create the same BFD session twice (negative case) """
65         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
66         session.add_vpp_config()
67
68         with self.vapi.expect_negative_api_retval():
69             session.add_vpp_config()
70
71         session.remove_vpp_config()
72
73     def test_add_bfd6(self):
74         """ create IPv6 BFD session """
75         session = VppBFDUDPSession(
76             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
77         session.add_vpp_config()
78         self.logger.debug("Session state is %s" % str(session.state))
79         session.remove_vpp_config()
80         session.add_vpp_config()
81         self.logger.debug("Session state is %s" % str(session.state))
82         session.remove_vpp_config()
83
84     def test_add_sha1_keys(self):
85         """ add SHA1 keys """
86         key_count = 10
87         keys = [self.factory.create_random_key(
88             self) for i in range(0, key_count)]
89         for key in keys:
90             self.assertFalse(key.query_vpp_config())
91         for key in keys:
92             key.add_vpp_config()
93         for key in keys:
94             self.assertTrue(key.query_vpp_config())
95         # remove randomly
96         indexes = range(key_count)
97         random.shuffle(indexes)
98         removed = []
99         for i in indexes:
100             key = keys[i]
101             key.remove_vpp_config()
102             removed.append(i)
103             for j in range(key_count):
104                 key = keys[j]
105                 if j in removed:
106                     self.assertFalse(key.query_vpp_config())
107                 else:
108                     self.assertTrue(key.query_vpp_config())
109         # should be removed now
110         for key in keys:
111             self.assertFalse(key.query_vpp_config())
112         # add back and remove again
113         for key in keys:
114             key.add_vpp_config()
115         for key in keys:
116             self.assertTrue(key.query_vpp_config())
117         for key in keys:
118             key.remove_vpp_config()
119         for key in keys:
120             self.assertFalse(key.query_vpp_config())
121
122     def test_add_bfd_sha1(self):
123         """ create a BFD session (SHA1) """
124         key = self.factory.create_random_key(self)
125         key.add_vpp_config()
126         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
127                                    sha1_key=key)
128         session.add_vpp_config()
129         self.logger.debug("Session state is %s" % str(session.state))
130         session.remove_vpp_config()
131         session.add_vpp_config()
132         self.logger.debug("Session state is %s" % str(session.state))
133         session.remove_vpp_config()
134
135     def test_double_add_sha1(self):
136         """ create the same BFD session twice (negative case) (SHA1) """
137         key = self.factory.create_random_key(self)
138         key.add_vpp_config()
139         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
140                                    sha1_key=key)
141         session.add_vpp_config()
142         with self.assertRaises(Exception):
143             session.add_vpp_config()
144
145     def test_add_authenticated_with_nonexistent_key(self):
146         """ create BFD session using non-existent SHA1 (negative case) """
147         session = VppBFDUDPSession(
148             self, self.pg0, self.pg0.remote_ip4,
149             sha1_key=self.factory.create_random_key(self))
150         with self.assertRaises(Exception):
151             session.add_vpp_config()
152
153     def test_shared_sha1_key(self):
154         """ share single SHA1 key between multiple BFD sessions """
155         key = self.factory.create_random_key(self)
156         key.add_vpp_config()
157         sessions = [
158             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
159                              sha1_key=key),
160             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
161                              sha1_key=key, af=AF_INET6),
162             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
163                              sha1_key=key),
164             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
165                              sha1_key=key, af=AF_INET6)]
166         for s in sessions:
167             s.add_vpp_config()
168         removed = 0
169         for s in sessions:
170             e = key.get_bfd_auth_keys_dump_entry()
171             self.assert_equal(e.use_count, len(sessions) - removed,
172                               "Use count for shared key")
173             s.remove_vpp_config()
174             removed += 1
175         e = key.get_bfd_auth_keys_dump_entry()
176         self.assert_equal(e.use_count, len(sessions) - removed,
177                           "Use count for shared key")
178
179     def test_activate_auth(self):
180         """ activate SHA1 authentication """
181         key = self.factory.create_random_key(self)
182         key.add_vpp_config()
183         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
184         session.add_vpp_config()
185         session.activate_auth(key)
186
187     def test_deactivate_auth(self):
188         """ deactivate SHA1 authentication """
189         key = self.factory.create_random_key(self)
190         key.add_vpp_config()
191         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
192         session.add_vpp_config()
193         session.activate_auth(key)
194         session.deactivate_auth()
195
196     def test_change_key(self):
197         key1 = self.factory.create_random_key(self)
198         key2 = self.factory.create_random_key(self)
199         while key2.conf_key_id == key1.conf_key_id:
200             key2 = self.factory.create_random_key(self)
201         key1.add_vpp_config()
202         key2.add_vpp_config()
203         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
204                                    sha1_key=key1)
205         session.add_vpp_config()
206         session.activate_auth(key2)
207
208
209 class BFDTestSession(object):
210     """ BFD session as seen from test framework side """
211
212     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
213                  bfd_key_id=None, our_seq_number=0xFFFFFFFF - 4):
214         self.test = test
215         self.af = af
216         self.sha1_key = sha1_key
217         self.bfd_key_id = bfd_key_id
218         self.interface = interface
219         self.udp_sport = 50000
220         self.our_seq_number = our_seq_number
221         self.vpp_seq_number = None
222         self.bfd_values = {
223             'my_discriminator': 0,
224             'desired_min_tx_interval': 100000,
225             'detect_mult': detect_mult,
226             'diag': BFDDiagCode.no_diagnostic,
227         }
228
229     def inc_seq_num(self):
230         if self.our_seq_number == 0xFFFFFFFF:
231             self.our_seq_number = 0
232         else:
233             self.our_seq_number += 1
234
235     def update(self, **kwargs):
236         self.bfd_values.update(kwargs)
237
238     def create_packet(self):
239         if self.sha1_key:
240             bfd = BFD(flags="A")
241             bfd.auth_type = self.sha1_key.auth_type
242             bfd.auth_len = BFD.sha1_auth_len
243             bfd.auth_key_id = self.bfd_key_id
244             bfd.auth_seq_num = self.our_seq_number
245             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
246         else:
247             bfd = BFD()
248         if self.af == AF_INET6:
249             packet = (Ether(src=self.interface.remote_mac,
250                             dst=self.interface.local_mac) /
251                       IPv6(src=self.interface.remote_ip6,
252                            dst=self.interface.local_ip6,
253                            hlim=255) /
254                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
255                       bfd)
256         else:
257             packet = (Ether(src=self.interface.remote_mac,
258                             dst=self.interface.local_mac) /
259                       IP(src=self.interface.remote_ip4,
260                          dst=self.interface.local_ip4,
261                          ttl=255) /
262                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
263                       bfd)
264         self.test.logger.debug("BFD: Creating packet")
265         for name, value in self.bfd_values.iteritems():
266             self.test.logger.debug("BFD: setting packet.%s=%s", name, value)
267             packet[BFD].setfieldval(name, value)
268         if self.sha1_key:
269             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
270                 "\0" * (20 - len(self.sha1_key.key))
271             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
272                                    hashlib.sha1(hash_material).hexdigest())
273             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
274         return packet
275
276     def send_packet(self):
277         p = self.create_packet()
278         self.test.logger.debug(ppp("Sending packet:", p))
279         self.test.pg0.add_stream([p])
280         self.test.pg_start()
281
282     def verify_sha1_auth(self, packet):
283         """ Verify correctness of authentication in BFD layer. """
284         bfd = packet[BFD]
285         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
286         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
287                                BFDAuthType)
288         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
289         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
290         if self.vpp_seq_number is None:
291             self.vpp_seq_number = bfd.auth_seq_num
292             self.test.logger.debug("Received initial sequence number: %s" %
293                                    self.vpp_seq_number)
294         else:
295             recvd_seq_num = bfd.auth_seq_num
296             self.test.logger.debug("Received followup sequence number: %s" %
297                                    recvd_seq_num)
298             if self.vpp_seq_number < 0xffffffff:
299                 if self.sha1_key.auth_type == \
300                         BFDAuthType.meticulous_keyed_sha1:
301                     self.test.assert_equal(recvd_seq_num,
302                                            self.vpp_seq_number + 1,
303                                            "BFD sequence number")
304                 else:
305                     self.test.assert_in_range(recvd_seq_num,
306                                               self.vpp_seq_number,
307                                               self.vpp_seq_number + 1,
308                                               "BFD sequence number")
309             else:
310                 if self.sha1_key.auth_type == \
311                         BFDAuthType.meticulous_keyed_sha1:
312                     self.test.assert_equal(recvd_seq_num, 0,
313                                            "BFD sequence number")
314                 else:
315                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
316                                        "BFD sequence number not one of "
317                                        "(%s, 0)" % self.vpp_seq_number)
318             self.vpp_seq_number = recvd_seq_num
319         # last 20 bytes represent the hash - so replace them with the key,
320         # pad the result with zeros and hash the result
321         hash_material = bfd.original[:-20] + self.sha1_key.key + \
322             "\0" * (20 - len(self.sha1_key.key))
323         expected_hash = hashlib.sha1(hash_material).hexdigest()
324         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
325                                expected_hash, "Auth key hash")
326
327     def verify_bfd(self, packet):
328         """ Verify correctness of BFD layer. """
329         bfd = packet[BFD]
330         self.test.assert_equal(bfd.version, 1, "BFD version")
331         self.test.assert_equal(bfd.your_discriminator,
332                                self.bfd_values['my_discriminator'],
333                                "BFD - your discriminator")
334         if self.sha1_key:
335             self.verify_sha1_auth(packet)
336
337
338 class BFDCommonCode:
339     """Common code used by both IPv4 and IPv6 Test Cases"""
340
341     def tearDown(self):
342         self.vapi.collect_events()  # clear the event queue
343         if not self.vpp_dead:
344             self.vapi.want_bfd_events(enable_disable=0)
345
346     def bfd_session_up(self):
347         """ Bring BFD session up """
348         self.pg_enable_capture([self.pg0])
349         self.logger.info("BFD: Waiting for slow hello")
350         p, timeout = self.wait_for_bfd_packet(2)
351         self.logger.info("BFD: Sending Init")
352         self.test_session.update(my_discriminator=randint(0, 40000000),
353                                  your_discriminator=p[BFD].my_discriminator,
354                                  state=BFDState.init,
355                                  required_min_rx_interval=100000)
356         self.test_session.send_packet()
357         self.logger.info("BFD: Waiting for event")
358         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
359         self.verify_event(e, expected_state=BFDState.up)
360         self.logger.info("BFD: Session is Up")
361         self.test_session.update(state=BFDState.up)
362         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
363
364     def bfd_session_down(self):
365         """ Bring BFD session down """
366         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
367         self.test_session.update(state=BFDState.down)
368         self.test_session.send_packet()
369         self.logger.info("BFD: Waiting for event")
370         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
371         self.verify_event(e, expected_state=BFDState.down)
372         self.logger.info("BFD: Session is Down")
373         self.assert_equal(self.vpp_session.state, BFDState.down, BFDState)
374
375     def verify_ip(self, packet):
376         """ Verify correctness of IP layer. """
377         if self.vpp_session.af == AF_INET6:
378             ip = packet[IPv6]
379             local_ip = self.pg0.local_ip6
380             remote_ip = self.pg0.remote_ip6
381             self.assert_equal(ip.hlim, 255, "IPv6 hop limit")
382         else:
383             ip = packet[IP]
384             local_ip = self.pg0.local_ip4
385             remote_ip = self.pg0.remote_ip4
386             self.assert_equal(ip.ttl, 255, "IPv4 TTL")
387         self.assert_equal(ip.src, local_ip, "IP source address")
388         self.assert_equal(ip.dst, remote_ip, "IP destination address")
389
390     def verify_udp(self, packet):
391         """ Verify correctness of UDP layer. """
392         udp = packet[UDP]
393         self.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
394         self.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
395                              "UDP source port")
396
397     def verify_event(self, event, expected_state):
398         """ Verify correctness of event values. """
399         e = event
400         self.logger.debug("BFD: Event: %s" % repr(e))
401         self.assert_equal(e.sw_if_index,
402                           self.vpp_session.interface.sw_if_index,
403                           "BFD interface index")
404         is_ipv6 = 0
405         if self.vpp_session.af == AF_INET6:
406             is_ipv6 = 1
407         self.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
408         if self.vpp_session.af == AF_INET:
409             self.assert_equal(e.local_addr[:4], self.vpp_session.local_addr_n,
410                               "Local IPv4 address")
411             self.assert_equal(e.peer_addr[:4], self.vpp_session.peer_addr_n,
412                               "Peer IPv4 address")
413         else:
414             self.assert_equal(e.local_addr, self.vpp_session.local_addr_n,
415                               "Local IPv6 address")
416             self.assert_equal(e.peer_addr, self.vpp_session.peer_addr_n,
417                               "Peer IPv6 address")
418         self.assert_equal(e.state, expected_state, BFDState)
419
420     def wait_for_bfd_packet(self, timeout=1):
421         """ wait for BFD packet
422
423         :param timeout: how long to wait max
424
425         :returns: tuple (packet, time spent waiting for packet)
426         """
427         self.logger.info("BFD: Waiting for BFD packet")
428         before = time.time()
429         p = self.pg0.wait_for_packet(timeout=timeout)
430         after = time.time()
431         self.logger.debug(ppp("BFD: Got packet:", p))
432         bfd = p[BFD]
433         if bfd is None:
434             raise Exception(ppp("Unexpected or invalid BFD packet:", p))
435         if bfd.payload:
436             raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
437         self.verify_ip(p)
438         self.verify_udp(p)
439         self.test_session.verify_bfd(p)
440         return p, after - before
441
442
443 class BFD4TestCase(VppTestCase, BFDCommonCode):
444     """Bidirectional Forwarding Detection (BFD)"""
445
446     @classmethod
447     def setUpClass(cls):
448         super(BFD4TestCase, cls).setUpClass()
449         try:
450             cls.create_pg_interfaces([0])
451             cls.pg0.config_ip4()
452             cls.pg0.configure_ipv4_neighbors()
453             cls.pg0.admin_up()
454             cls.pg0.resolve_arp()
455
456         except Exception:
457             super(BFD4TestCase, cls).tearDownClass()
458             raise
459
460     def setUp(self):
461         super(BFD4TestCase, self).setUp()
462         self.factory = AuthKeyFactory()
463         self.vapi.want_bfd_events()
464         try:
465             self.vpp_session = VppBFDUDPSession(self, self.pg0,
466                                                 self.pg0.remote_ip4)
467             self.vpp_session.add_vpp_config()
468             self.vpp_session.admin_up()
469             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
470         except:
471             self.vapi.want_bfd_events(enable_disable=0)
472             raise
473
474     def tearDown(self):
475         BFDCommonCode.tearDown(self)
476         VppTestCase.tearDown(self)
477
478     def test_session_up(self):
479         """ bring BFD session up """
480         self.bfd_session_up()
481
482     def test_session_down(self):
483         """ bring BFD session down """
484         self.bfd_session_up()
485         self.bfd_session_down()
486
487     def test_hold_up(self):
488         """ hold BFD session up """
489         self.bfd_session_up()
490         for i in range(5):
491             self.wait_for_bfd_packet()
492             self.test_session.send_packet()
493         self.assert_equal(len(self.vapi.collect_events()), 0,
494                           "number of bfd events")
495
496     def test_slow_timer(self):
497         """ verify slow periodic control frames while session down """
498         self.pg_enable_capture([self.pg0])
499         expected_packets = 3
500         self.logger.info("BFD: Waiting for %d BFD packets" % expected_packets)
501         self.wait_for_bfd_packet(2)
502         for i in range(expected_packets):
503             before = time.time()
504             self.wait_for_bfd_packet(2)
505             after = time.time()
506             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
507             # to work around timing issues
508             self.assert_in_range(
509                 after - before, 0.70, 1.05, "time between slow packets")
510             before = after
511
512     def test_zero_remote_min_rx(self):
513         """ no packets when zero BFD RemoteMinRxInterval """
514         self.pg_enable_capture([self.pg0])
515         p, timeout = self.wait_for_bfd_packet(2)
516         self.test_session.update(my_discriminator=randint(0, 40000000),
517                                  your_discriminator=p[BFD].my_discriminator,
518                                  state=BFDState.init,
519                                  required_min_rx_interval=0)
520         self.test_session.send_packet()
521         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
522         self.verify_event(e, expected_state=BFDState.up)
523
524         try:
525             p = self.pg0.wait_for_packet(timeout=1)
526         except:
527             return
528         raise Exception(ppp("Received unexpected BFD packet:", p))
529
530     def test_conn_down(self):
531         """ verify session goes down after inactivity """
532         self.bfd_session_up()
533         self.wait_for_bfd_packet()
534         self.assert_equal(len(self.vapi.collect_events()), 0,
535                           "number of bfd events")
536         self.wait_for_bfd_packet()
537         self.assert_equal(len(self.vapi.collect_events()), 0,
538                           "number of bfd events")
539         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
540         self.verify_event(e, expected_state=BFDState.down)
541
542     def test_large_required_min_rx(self):
543         """ large remote RequiredMinRxInterval """
544         self.bfd_session_up()
545         interval = 3000000
546         self.test_session.update(required_min_rx_interval=interval)
547         self.test_session.send_packet()
548         now = time.time()
549         count = 0
550         while time.time() < now + interval / us_in_sec:
551             try:
552                 p = self.wait_for_bfd_packet()
553                 if count > 1:
554                     self.logger.error(ppp("Received unexpected packet:", p))
555                 count += 1
556             except:
557                 pass
558         self.assert_in_range(count, 0, 1, "number of packets received")
559
560     def test_immediate_remote_min_rx_reduce(self):
561         """ immediately honor remote min rx reduction """
562         self.vpp_session.remove_vpp_config()
563         self.vpp_session = VppBFDUDPSession(
564             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
565         self.vpp_session.add_vpp_config()
566         self.test_session.update(desired_min_tx_interval=1000000,
567                                  required_min_rx_interval=1000000)
568         self.bfd_session_up()
569         self.wait_for_bfd_packet()
570         interval = 100000
571         self.test_session.update(required_min_rx_interval=interval)
572         self.test_session.send_packet()
573         p, ttp = self.wait_for_bfd_packet()
574         # allow extra 10% to work around timing issues, first packet is special
575         self.assert_in_range(ttp, 0, 1.10 * interval / us_in_sec,
576                              "time between BFD packets")
577         p, ttp = self.wait_for_bfd_packet()
578         self.assert_in_range(ttp, .9 * .75 * interval / us_in_sec,
579                              1.10 * interval / us_in_sec,
580                              "time between BFD packets")
581         p, ttp = self.wait_for_bfd_packet()
582         self.assert_in_range(ttp, .9 * .75 * interval / us_in_sec,
583                              1.10 * interval / us_in_sec,
584                              "time between BFD packets")
585
586
587 class BFD6TestCase(VppTestCase, BFDCommonCode):
588     """Bidirectional Forwarding Detection (BFD) (IPv6) """
589
590     @classmethod
591     def setUpClass(cls):
592         super(BFD6TestCase, cls).setUpClass()
593         try:
594             cls.create_pg_interfaces([0])
595             cls.pg0.config_ip6()
596             cls.pg0.configure_ipv6_neighbors()
597             cls.pg0.admin_up()
598             cls.pg0.resolve_ndp()
599
600         except Exception:
601             super(BFD6TestCase, cls).tearDownClass()
602             raise
603
604     def setUp(self):
605         super(BFD6TestCase, self).setUp()
606         self.factory = AuthKeyFactory()
607         self.vapi.want_bfd_events()
608         try:
609             self.vpp_session = VppBFDUDPSession(self, self.pg0,
610                                                 self.pg0.remote_ip6,
611                                                 af=AF_INET6)
612             self.vpp_session.add_vpp_config()
613             self.vpp_session.admin_up()
614             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
615             self.logger.debug(self.vapi.cli("show adj nbr"))
616         except:
617             self.vapi.want_bfd_events(enable_disable=0)
618             raise
619
620     def tearDown(self):
621         BFDCommonCode.tearDown(self)
622         VppTestCase.tearDown(self)
623
624     def test_session_up(self):
625         """ bring BFD session up """
626         self.bfd_session_up()
627
628     def test_hold_up(self):
629         """ hold BFD session up """
630         self.bfd_session_up()
631         for i in range(5):
632             self.wait_for_bfd_packet()
633             self.test_session.send_packet()
634         self.assert_equal(len(self.vapi.collect_events()), 0,
635                           "number of bfd events")
636         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
637
638
639 class BFDSHA1TestCase(VppTestCase, BFDCommonCode):
640     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
641
642     @classmethod
643     def setUpClass(cls):
644         super(BFDSHA1TestCase, cls).setUpClass()
645         try:
646             cls.create_pg_interfaces([0])
647             cls.pg0.config_ip4()
648             cls.pg0.admin_up()
649             cls.pg0.resolve_arp()
650
651         except Exception:
652             super(BFDSHA1TestCase, cls).tearDownClass()
653             raise
654
655     def setUp(self):
656         super(BFDSHA1TestCase, self).setUp()
657         self.factory = AuthKeyFactory()
658         self.vapi.want_bfd_events()
659
660     def tearDown(self):
661         BFDCommonCode.tearDown(self)
662         VppTestCase.tearDown(self)
663
664     def test_session_up(self):
665         """ bring BFD session up """
666         key = self.factory.create_random_key(self)
667         key.add_vpp_config()
668         self.vpp_session = VppBFDUDPSession(self, self.pg0,
669                                             self.pg0.remote_ip4,
670                                             sha1_key=key)
671         self.vpp_session.add_vpp_config()
672         self.vpp_session.admin_up()
673         self.test_session = BFDTestSession(
674             self, self.pg0, AF_INET, sha1_key=key,
675             bfd_key_id=self.vpp_session.bfd_key_id)
676         self.bfd_session_up()
677
678     def test_hold_up(self):
679         """ hold BFD session up """
680         key = self.factory.create_random_key(self)
681         key.add_vpp_config()
682         self.vpp_session = VppBFDUDPSession(self, self.pg0,
683                                             self.pg0.remote_ip4,
684                                             sha1_key=key)
685         self.vpp_session.add_vpp_config()
686         self.vpp_session.admin_up()
687         self.test_session = BFDTestSession(
688             self, self.pg0, AF_INET, sha1_key=key,
689             bfd_key_id=self.vpp_session.bfd_key_id)
690         self.bfd_session_up()
691         for i in range(5):
692             self.wait_for_bfd_packet()
693             self.test_session.send_packet()
694         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
695
696     def test_hold_up_meticulous(self):
697         """ hold BFD session up - meticulous auth """
698         key = self.factory.create_random_key(
699             self, BFDAuthType.meticulous_keyed_sha1)
700         key.add_vpp_config()
701         self.vpp_session = VppBFDUDPSession(self, self.pg0,
702                                             self.pg0.remote_ip4, sha1_key=key)
703         self.vpp_session.add_vpp_config()
704         self.vpp_session.admin_up()
705         self.test_session = BFDTestSession(
706             self, self.pg0, AF_INET, sha1_key=key,
707             bfd_key_id=self.vpp_session.bfd_key_id)
708         self.bfd_session_up()
709         for i in range(5):
710             self.wait_for_bfd_packet()
711             self.test_session.inc_seq_num()
712             self.test_session.send_packet()
713         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
714
715     def test_send_bad_seq_number(self):
716         """ session is not kept alive by msgs with bad seq numbers"""
717         key = self.factory.create_random_key(
718             self, BFDAuthType.meticulous_keyed_sha1)
719         key.add_vpp_config()
720         self.vpp_session = VppBFDUDPSession(self, self.pg0,
721                                             self.pg0.remote_ip4, sha1_key=key)
722         self.vpp_session.add_vpp_config()
723         self.vpp_session.admin_up()
724         self.test_session = BFDTestSession(
725             self, self.pg0, AF_INET, sha1_key=key,
726             bfd_key_id=self.vpp_session.bfd_key_id)
727         self.bfd_session_up()
728         self.wait_for_bfd_packet()
729         self.test_session.send_packet()
730         self.assert_equal(len(self.vapi.collect_events()), 0,
731                           "number of bfd events")
732         self.wait_for_bfd_packet()
733         self.test_session.send_packet()
734         self.assert_equal(len(self.vapi.collect_events()), 0,
735                           "number of bfd events")
736         self.wait_for_bfd_packet()
737         self.test_session.send_packet()
738         self.wait_for_bfd_packet()
739         self.test_session.send_packet()
740         e = self.vapi.collect_events()
741         # session should be down now, because the sequence numbers weren't
742         # updated
743         self.assert_equal(len(e), 1, "number of bfd events")
744         self.verify_event(e[0], expected_state=BFDState.down)
745
746     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
747                                        legitimate_test_session,
748                                        rogue_test_session,
749                                        rogue_bfd_values=None):
750         """ execute a rogue session interaction scenario
751
752         1. create vpp session, add config
753         2. bring the legitimate session up
754         3. copy the bfd values from legitimate session to rogue session
755         4. apply rogue_bfd_values to rogue session
756         5. set rogue session state to down
757         6. send message to take the session down from the rogue session
758         7. assert that the legitimate session is unaffected
759         """
760
761         self.vpp_session = vpp_bfd_udp_session
762         self.vpp_session.add_vpp_config()
763         self.vpp_session.admin_up()
764         self.test_session = legitimate_test_session
765         # bring vpp session up
766         self.bfd_session_up()
767         # send packet from rogue session
768         rogue_test_session.bfd_values = self.test_session.bfd_values.copy()
769         if rogue_bfd_values:
770             rogue_test_session.update(**rogue_bfd_values)
771         rogue_test_session.update(state=BFDState.down)
772         rogue_test_session.send_packet()
773         self.wait_for_bfd_packet()
774         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
775
776     def test_mismatch_auth(self):
777         """ session is not brought down by unauthenticated msg """
778         key = self.factory.create_random_key(self)
779         key.add_vpp_config()
780         vpp_session = VppBFDUDPSession(
781             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
782         legitimate_test_session = BFDTestSession(
783             self, self.pg0, AF_INET, sha1_key=key,
784             bfd_key_id=vpp_session.bfd_key_id)
785         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
786         self.execute_rogue_session_scenario(vpp_session,
787                                             legitimate_test_session,
788                                             rogue_test_session)
789
790     def test_mismatch_bfd_key_id(self):
791         """ session is not brought down by msg with non-existent key-id """
792         key = self.factory.create_random_key(self)
793         key.add_vpp_config()
794         vpp_session = VppBFDUDPSession(
795             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
796         # pick a different random bfd key id
797         x = randint(0, 255)
798         while x == vpp_session.bfd_key_id:
799             x = randint(0, 255)
800         legitimate_test_session = BFDTestSession(
801             self, self.pg0, AF_INET, sha1_key=key,
802             bfd_key_id=vpp_session.bfd_key_id)
803         rogue_test_session = BFDTestSession(
804             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
805         self.execute_rogue_session_scenario(vpp_session,
806                                             legitimate_test_session,
807                                             rogue_test_session)
808
809     def test_mismatched_auth_type(self):
810         """ session is not brought down by msg with wrong auth type """
811         key = self.factory.create_random_key(self)
812         key.add_vpp_config()
813         vpp_session = VppBFDUDPSession(
814             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
815         legitimate_test_session = BFDTestSession(
816             self, self.pg0, AF_INET, sha1_key=key,
817             bfd_key_id=vpp_session.bfd_key_id)
818         rogue_test_session = BFDTestSession(
819             self, self.pg0, AF_INET, sha1_key=key,
820             bfd_key_id=vpp_session.bfd_key_id)
821         self.execute_rogue_session_scenario(
822             vpp_session, legitimate_test_session, rogue_test_session,
823             {'auth_type': BFDAuthType.keyed_md5})
824
825     def test_restart(self):
826         """ simulate remote peer restart and resynchronization """
827         key = self.factory.create_random_key(
828             self, BFDAuthType.meticulous_keyed_sha1)
829         key.add_vpp_config()
830         self.vpp_session = VppBFDUDPSession(self, self.pg0,
831                                             self.pg0.remote_ip4, sha1_key=key)
832         self.vpp_session.add_vpp_config()
833         self.vpp_session.admin_up()
834         self.test_session = BFDTestSession(
835             self, self.pg0, AF_INET, sha1_key=key,
836             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
837         self.bfd_session_up()
838         # now we need to not respond for 2*detection_time (4 packets)
839         self.wait_for_bfd_packet()
840         self.assert_equal(len(self.vapi.collect_events()), 0,
841                           "number of bfd events")
842         self.wait_for_bfd_packet()
843         self.assert_equal(len(self.vapi.collect_events()), 0,
844                           "number of bfd events")
845         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
846         self.verify_event(e, expected_state=BFDState.down)
847         self.test_session.update(state=BFDState.down)
848         self.wait_for_bfd_packet()
849         self.assert_equal(len(self.vapi.collect_events()), 0,
850                           "number of bfd events")
851         self.wait_for_bfd_packet()
852         self.assert_equal(len(self.vapi.collect_events()), 0,
853                           "number of bfd events")
854         # reset sequence number
855         self.test_session.our_seq_number = 0
856         self.bfd_session_up()
857
858
859 class BFDAuthOnOffTestCase(VppTestCase, BFDCommonCode):
860     """Bidirectional Forwarding Detection (BFD) (changing auth) """
861
862     @classmethod
863     def setUpClass(cls):
864         super(BFDAuthOnOffTestCase, cls).setUpClass()
865         try:
866             cls.create_pg_interfaces([0])
867             cls.pg0.config_ip4()
868             cls.pg0.admin_up()
869             cls.pg0.resolve_arp()
870
871         except Exception:
872             super(BFDAuthOnOffTestCase, cls).tearDownClass()
873             raise
874
875     def setUp(self):
876         super(BFDAuthOnOffTestCase, self).setUp()
877         self.factory = AuthKeyFactory()
878         self.vapi.want_bfd_events()
879
880     def tearDown(self):
881         BFDCommonCode.tearDown(self)
882         VppTestCase.tearDown(self)
883
884     def test_auth_on_immediate(self):
885         """ turn auth on without disturbing session state (immediate) """
886         key = self.factory.create_random_key(self)
887         key.add_vpp_config()
888         self.vpp_session = VppBFDUDPSession(self, self.pg0,
889                                             self.pg0.remote_ip4)
890         self.vpp_session.add_vpp_config()
891         self.vpp_session.admin_up()
892         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
893         self.bfd_session_up()
894         for i in range(5):
895             self.wait_for_bfd_packet()
896             self.test_session.send_packet()
897         self.vpp_session.activate_auth(key)
898         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
899         self.test_session.sha1_key = key
900         for i in range(5):
901             self.wait_for_bfd_packet()
902             self.test_session.send_packet()
903         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
904         self.assert_equal(len(self.vapi.collect_events()), 0,
905                           "number of bfd events")
906
907     def test_auth_off_immediate(self):
908         """ turn auth off without disturbing session state (immediate) """
909         key = self.factory.create_random_key(self)
910         key.add_vpp_config()
911         self.vpp_session = VppBFDUDPSession(self, self.pg0,
912                                             self.pg0.remote_ip4, sha1_key=key)
913         self.vpp_session.add_vpp_config()
914         self.vpp_session.admin_up()
915         self.test_session = BFDTestSession(
916             self, self.pg0, AF_INET, sha1_key=key,
917             bfd_key_id=self.vpp_session.bfd_key_id)
918         self.bfd_session_up()
919         for i in range(5):
920             self.wait_for_bfd_packet()
921             self.test_session.send_packet()
922         self.vpp_session.deactivate_auth()
923         self.test_session.bfd_key_id = None
924         self.test_session.sha1_key = None
925         for i in range(5):
926             self.wait_for_bfd_packet()
927             self.test_session.send_packet()
928         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
929         self.assert_equal(len(self.vapi.collect_events()), 0,
930                           "number of bfd events")
931
932     def test_auth_change_key_immediate(self):
933         """ change auth key without disturbing session state (immediate) """
934         key1 = self.factory.create_random_key(self)
935         key1.add_vpp_config()
936         key2 = self.factory.create_random_key(self)
937         key2.add_vpp_config()
938         self.vpp_session = VppBFDUDPSession(self, self.pg0,
939                                             self.pg0.remote_ip4, sha1_key=key1)
940         self.vpp_session.add_vpp_config()
941         self.vpp_session.admin_up()
942         self.test_session = BFDTestSession(
943             self, self.pg0, AF_INET, sha1_key=key1,
944             bfd_key_id=self.vpp_session.bfd_key_id)
945         self.bfd_session_up()
946         for i in range(5):
947             self.wait_for_bfd_packet()
948             self.test_session.send_packet()
949         self.vpp_session.activate_auth(key2)
950         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
951         self.test_session.sha1_key = key2
952         for i in range(5):
953             self.wait_for_bfd_packet()
954             self.test_session.send_packet()
955         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
956         self.assert_equal(len(self.vapi.collect_events()), 0,
957                           "number of bfd events")
958
959     def test_auth_on_delayed(self):
960         """ turn auth on without disturbing session state (delayed) """
961         key = self.factory.create_random_key(self)
962         key.add_vpp_config()
963         self.vpp_session = VppBFDUDPSession(self, self.pg0,
964                                             self.pg0.remote_ip4)
965         self.vpp_session.add_vpp_config()
966         self.vpp_session.admin_up()
967         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
968         self.bfd_session_up()
969         for i in range(5):
970             self.wait_for_bfd_packet()
971             self.test_session.send_packet()
972         self.vpp_session.activate_auth(key, delayed=True)
973         for i in range(5):
974             self.wait_for_bfd_packet()
975             self.test_session.send_packet()
976         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
977         self.test_session.sha1_key = key
978         self.test_session.send_packet()
979         for i in range(5):
980             self.wait_for_bfd_packet()
981             self.test_session.send_packet()
982         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
983         self.assert_equal(len(self.vapi.collect_events()), 0,
984                           "number of bfd events")
985
986     def test_auth_off_delayed(self):
987         """ turn auth off without disturbing session state (delayed) """
988         key = self.factory.create_random_key(self)
989         key.add_vpp_config()
990         self.vpp_session = VppBFDUDPSession(self, self.pg0,
991                                             self.pg0.remote_ip4, sha1_key=key)
992         self.vpp_session.add_vpp_config()
993         self.vpp_session.admin_up()
994         self.test_session = BFDTestSession(
995             self, self.pg0, AF_INET, sha1_key=key,
996             bfd_key_id=self.vpp_session.bfd_key_id)
997         self.bfd_session_up()
998         for i in range(5):
999             self.wait_for_bfd_packet()
1000             self.test_session.send_packet()
1001         self.vpp_session.deactivate_auth(delayed=True)
1002         for i in range(5):
1003             self.wait_for_bfd_packet()
1004             self.test_session.send_packet()
1005         self.test_session.bfd_key_id = None
1006         self.test_session.sha1_key = None
1007         self.test_session.send_packet()
1008         for i in range(5):
1009             self.wait_for_bfd_packet()
1010             self.test_session.send_packet()
1011         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1012         self.assert_equal(len(self.vapi.collect_events()), 0,
1013                           "number of bfd events")
1014
1015     def test_auth_change_key_delayed(self):
1016         """ change auth key without disturbing session state (delayed) """
1017         key1 = self.factory.create_random_key(self)
1018         key1.add_vpp_config()
1019         key2 = self.factory.create_random_key(self)
1020         key2.add_vpp_config()
1021         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1022                                             self.pg0.remote_ip4, sha1_key=key1)
1023         self.vpp_session.add_vpp_config()
1024         self.vpp_session.admin_up()
1025         self.test_session = BFDTestSession(
1026             self, self.pg0, AF_INET, sha1_key=key1,
1027             bfd_key_id=self.vpp_session.bfd_key_id)
1028         self.bfd_session_up()
1029         for i in range(5):
1030             self.wait_for_bfd_packet()
1031             self.test_session.send_packet()
1032         self.vpp_session.activate_auth(key2, delayed=True)
1033         for i in range(5):
1034             self.wait_for_bfd_packet()
1035             self.test_session.send_packet()
1036         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1037         self.test_session.sha1_key = key2
1038         self.test_session.send_packet()
1039         for i in range(5):
1040             self.wait_for_bfd_packet()
1041             self.test_session.send_packet()
1042         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1043         self.assert_equal(len(self.vapi.collect_events()), 0,
1044                           "number of bfd events")
1045
1046 if __name__ == '__main__':
1047     unittest.main(testRunner=VppTestRunner)