fib: fib api updates
[vpp.git] / test / vpp_ip.py
1 """
2   IP Types
3
4 """
5 import logging
6
7 from ipaddress import ip_address
8 from socket import AF_INET, AF_INET6
9 from vpp_papi import VppEnum
10 try:
11     text_type = unicode
12 except NameError:
13     text_type = str
14
15 _log = logging.getLogger(__name__)
16
17
18 class DpoProto:
19     DPO_PROTO_IP4 = 0
20     DPO_PROTO_IP6 = 1
21     DPO_PROTO_MPLS = 2
22     DPO_PROTO_ETHERNET = 3
23     DPO_PROTO_BIER = 4
24     DPO_PROTO_NSH = 5
25
26
27 INVALID_INDEX = 0xffffffff
28
29
30 class VppIpAddressUnion():
31     def __init__(self, addr):
32         self.addr = addr
33         self.ip_addr = ip_address(text_type(self.addr))
34
35     def encode(self):
36         if self.version == 6:
37             return {'ip6': self.ip_addr}
38         else:
39             return {'ip4': self.ip_addr}
40
41     @property
42     def version(self):
43         return self.ip_addr.version
44
45     @property
46     def address(self):
47         return self.addr
48
49     @property
50     def length(self):
51         return self.ip_addr.max_prefixlen
52
53     @property
54     def bytes(self):
55         return self.ip_addr.packed
56
57     def __eq__(self, other):
58         if isinstance(other, self.__class__):
59             return self.ip_addr == other.ip_addr
60         elif hasattr(other, "ip4") and hasattr(other, "ip6"):
61             # vl_api_address_union_t
62             if 4 == self.version:
63                 return self.ip_addr.packed == other.ip4
64             else:
65                 return self.ip_addr.packed == other.ip6
66         else:
67             _log.error("Comparing VppIpAddressUnions:%s"
68                        " with incomparable type: %s",
69                        self, other)
70             return NotImplemented
71
72     def __str__(self):
73         return str(self.ip_addr)
74
75
76 class VppIpAddress():
77     def __init__(self, addr):
78         self.addr = VppIpAddressUnion(addr)
79
80     def encode(self):
81         if self.addr.version == 6:
82             return {
83                 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP6,
84                 'un': self.addr.encode()
85             }
86         else:
87             return {
88                 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
89                 'un': self.addr.encode()
90             }
91
92     def __eq__(self, other):
93         if isinstance(other, self.__class__):
94             return self.addr == other.addr
95         elif hasattr(other, "af") and hasattr(other, "un"):
96             # a vp_api_address_t
97             if 4 == self.version:
98                 return other.af == \
99                     VppEnum.vl_api_address_family_t.ADDRESS_IP4 and \
100                     other.un == self.addr
101             else:
102                 return other.af == \
103                     VppEnum.vl_api_address_family_t.ADDRESS_IP6 and \
104                     other.un == self.addr
105         else:
106             _log.error(
107                 "Comparing VppIpAddress:<%s> %s with incomparable "
108                 "type: <%s> %s",
109                 self.__class__.__name__, self,
110                 other.__class__.__name__, other)
111             return NotImplemented
112
113     def __ne__(self, other):
114         return not (self == other)
115
116     def __str__(self):
117         return self.address
118
119     @property
120     def bytes(self):
121         return self.addr.bytes
122
123     @property
124     def address(self):
125         return self.addr.address
126
127     @property
128     def length(self):
129         return self.addr.length
130
131     @property
132     def version(self):
133         return self.addr.version
134
135     @property
136     def is_ip6(self):
137         return (self.version == 6)
138
139     @property
140     def af(self):
141         if self.version == 6:
142             return AF_INET6
143         else:
144             return AF_INET
145
146     @property
147     def dpo_proto(self):
148         if self.version == 6:
149             return DpoProto.DPO_PROTO_IP6
150         else:
151             return DpoProto.DPO_PROTO_IP4
152
153
154 class VppIpPrefix():
155     def __init__(self, addr, len):
156         self.addr = VppIpAddress(addr)
157         self.len = len
158
159     def __eq__(self, other):
160         if self.address == other.address and self.len == other.len:
161             return True
162         return False
163
164     def encode(self):
165         return {'address': self.addr.encode(),
166                 'address_length': self.len}
167
168     @property
169     def version(self):
170         return self.addr.version
171
172     @property
173     def address(self):
174         return self.addr.address
175
176     @property
177     def bytes(self):
178         return self.addr.bytes
179
180     @property
181     def length(self):
182         return self.len
183
184     @property
185     def is_ip6(self):
186         return self.addr.is_ip6
187
188     def __str__(self):
189         return "%s/%d" % (self.address, self.length)
190
191     def __eq__(self, other):
192         if isinstance(other, self.__class__):
193             return (self.len == other.len and self.addr == other.addr)
194         elif hasattr(other, "address") and hasattr(other, "address_length"):
195             # vl_api_prefix_t
196             return self.len == other.address_length and \
197                    self.addr == other.address
198         else:
199             _log.error(
200                 "Comparing VppIpPrefix:%s with incomparable type: %s" %
201                 (self, other))
202             return NotImplemented
203
204
205 class VppIpMPrefix():
206     def __init__(self, saddr, gaddr, glen):
207         self.saddr = saddr
208         self.gaddr = gaddr
209         self.glen = glen
210         self.ip_saddr = VppIpAddressUnion(text_type(self.saddr))
211         self.ip_gaddr = VppIpAddressUnion(text_type(self.gaddr))
212         if self.ip_saddr.version != self.ip_gaddr.version:
213             raise ValueError('Source and group addresses must be of the '
214                              'same address family.')
215
216     def encode(self):
217         if 6 == self.ip_saddr.version:
218             prefix = {
219                 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP6,
220                 'grp_address': {
221                     'ip6': self.gaddr
222                 },
223                 'src_address': {
224                     'ip6': self.saddr
225                 },
226                 'grp_address_length': self.glen,
227             }
228         else:
229             prefix = {
230                 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
231                 'grp_address': {
232                     'ip4': self.gaddr
233                 },
234                 'src_address': {
235                     'ip4':  self.saddr
236                 },
237                 'grp_address_length': self.glen,
238             }
239         return prefix
240
241     @property
242     def length(self):
243         return self.glen
244
245     @property
246     def version(self):
247         return self.ip_gaddr.version
248
249     def __str__(self):
250         return "(%s,%s)/%d" % (self.saddr, self.gaddr, self.glen)
251
252     def __eq__(self, other):
253         if isinstance(other, self.__class__):
254             return (self.glen == other.glen and
255                     self.ip_saddr == other.ip_gaddr and
256                     self.ip_saddr == other.ip_saddr)
257         elif (hasattr(other, "grp_address_length") and
258               hasattr(other, "grp_address") and
259               hasattr(other, "src_address")):
260             # vl_api_mprefix_t
261             if 4 == self.ip_saddr.version:
262                 if self.glen == other.grp_address_length and \
263                    self.gaddr == str(other.grp_address.ip4) and \
264                    self.saddr == str(other.src_address.ip4):
265                     return True
266                 return False
267             else:
268                 return (self.glen == other.grp_address_length and
269                         self.gaddr == other.grp_address.ip6 and
270                         self.saddr == other.src_address.ip6)
271         else:
272             raise Exception("Comparing VppIpPrefix:%s with unknown type: %s" %
273                             (self, other))
274         return False