BFD: basic asynchronous session up/down
[vpp.git] / test / template_bd.py
1 #!/usr/bin/env python
2
3 from abc import abstractmethod, ABCMeta
4
5 from scapy.layers.l2 import Ether, Raw
6 from scapy.layers.inet import IP, UDP
7
8
9 class BridgeDomain(object):
10     """ Bridge domain abstraction """
11     __metaclass__ = ABCMeta
12
13     @property
14     def frame_pg0_to_pg1(self):
15         """ Ethernet frame sent from pg0 and expected to arrive at pg1 """
16         return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
17                 IP(src='1.2.3.4', dst='4.3.2.1') /
18                 UDP(sport=10000, dport=20000) /
19                 Raw('\xa5' * 100))
20
21     @property
22     def frame_pg1_to_pg0(self):
23         """ Ethernet frame sent from pg1 and expected to arrive at pg0 """
24         return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
25                 IP(src='4.3.2.1', dst='1.2.3.4') /
26                 UDP(sport=20000, dport=10000) /
27                 Raw('\xa5' * 100))
28
29     @abstractmethod
30     def encapsulate(self, pkt):
31         """ Encapsulate packet """
32         pass
33
34     @abstractmethod
35     def decapsulate(self, pkt):
36         """ Decapsulate packet """
37         pass
38
39     @abstractmethod
40     def check_encapsulation(self, pkt):
41         """ Verify the encapsulation """
42         pass
43
44     def test_decap(self):
45         """ Decapsulation test
46         Send encapsulated frames from pg0
47         Verify receipt of decapsulated frames on pg1
48         """
49
50         encapsulated_pkt = self.encapsulate(self.frame_pg0_to_pg1)
51
52         self.pg0.add_stream([encapsulated_pkt, ])
53
54         self.pg1.enable_capture()
55
56         self.pg_start()
57
58         # Pick first received frame and check if it's the non-encapsulated frame
59         out = self.pg1.get_capture()
60         self.assertEqual(len(out), 1,
61                          'Invalid number of packets on '
62                          'output: {}'.format(len(out)))
63         pkt = out[0]
64
65         # TODO: add error messages
66         self.assertEqual(pkt[Ether].src, self.frame_pg0_to_pg1[Ether].src)
67         self.assertEqual(pkt[Ether].dst, self.frame_pg0_to_pg1[Ether].dst)
68         self.assertEqual(pkt[IP].src, self.frame_pg0_to_pg1[IP].src)
69         self.assertEqual(pkt[IP].dst, self.frame_pg0_to_pg1[IP].dst)
70         self.assertEqual(pkt[UDP].sport, self.frame_pg0_to_pg1[UDP].sport)
71         self.assertEqual(pkt[UDP].dport, self.frame_pg0_to_pg1[UDP].dport)
72         self.assertEqual(pkt[Raw], self.frame_pg0_to_pg1[Raw])
73
74     def test_encap(self):
75         """ Encapsulation test
76         Send frames from pg1
77         Verify receipt of encapsulated frames on pg0
78         """
79         self.pg1.add_stream([self.frame_pg1_to_pg0])
80
81         self.pg0.enable_capture()
82
83         self.pg_start()
84
85         # Pick first received frame and check if it's corectly encapsulated.
86         out = self.pg0.get_capture()
87         self.assertEqual(len(out), 1,
88                          'Invalid number of packets on '
89                          'output: {}'.format(len(out)))
90         pkt = out[0]
91         self.check_encapsulation(pkt)
92
93         payload = self.decapsulate(pkt)
94         # TODO: add error messages
95         self.assertEqual(payload[Ether].src, self.frame_pg1_to_pg0[Ether].src)
96         self.assertEqual(payload[Ether].dst, self.frame_pg1_to_pg0[Ether].dst)
97         self.assertEqual(payload[IP].src, self.frame_pg1_to_pg0[IP].src)
98         self.assertEqual(payload[IP].dst, self.frame_pg1_to_pg0[IP].dst)
99         self.assertEqual(payload[UDP].sport, self.frame_pg1_to_pg0[UDP].sport)
100         self.assertEqual(payload[UDP].dport, self.frame_pg1_to_pg0[UDP].dport)
101         self.assertEqual(payload[Raw], self.frame_pg1_to_pg0[Raw])