lab WIP: add new 3n-alt Arm testbed
[csit.git] / resources / tools / storage / storage.py
1 #!/usr/bin/env/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Storage Backend Class."""
17
18 from json import loads
19 from struct import unpack
20 from gzip import GzipFile
21
22 from boto3 import Session
23 from botocore import exceptions
24
25 S3_API_LIMIT = 1048576
26
27
28 class Storage:
29     """Class implementing storage object retrieval.
30     S3 Select API allows us to retrieve a subset of data by using simple SQL
31     expressions. By using Select API to retrieve only the data needed by the
32     application, drastic performance improvements can be achieved.
33     """
34     def __init__(self, endpoint_url, bucket, profile_name):
35         """Class init function to create S3 client object.
36
37         :param endpoint_url: S3 storage endpoint url.
38         :param bucket: S3 parent bucket.
39         :param profile_name: S3 storage configuration.
40         :type endpoint_url: str
41         :type bucket: str
42         :type profile_name: str
43         """
44         self.endpoint_url = endpoint_url
45         self.bucket = bucket
46         self.profile_name = profile_name
47
48         self.session = Session(profile_name=self.profile_name)
49         self.client = self.session.client(
50             service_name=u"s3", endpoint_url=self.endpoint_url
51         )
52         self.resource = self.session.resource(
53             service_name=u"s3", endpoint_url=self.endpoint_url
54         )
55
56     def __repr__(self):
57         """Return a string executable as Python constructor call.
58
59         :returns: Executable constructor call.
60         :rtype: str
61         """
62         return (
63             f"Storage(endpoint_url={self.endpoint_url!r}, "
64             f"bucket={self.bucket!r}, "
65             f"profile_name={self.profile_name!r})"
66         )
67
68     def _get_matching_s3_keys(
69             self, bucket, prefix=u"", suffix=u""):
70         """This function generates the keys in an S3 bucket. Function act as
71         a Python generator object.
72
73         :param bucket: Name of the S3 bucket.
74         :param prefix: Only fetch keys that start with this prefix (optional).
75         :param suffix: Only fetch keys that end with this suffix (optional).
76         :type bucket: str
77         :type prefix: str
78         :type suffix: str
79         :raises RuntimeError: If connection to storage fails.
80         """
81         kwargs = {
82             u"Bucket": bucket
83         }
84
85         prefixes = (prefix, ) if isinstance(prefix, str) else prefix
86
87         for key_prefix in prefixes:
88             kwargs[u"Prefix"] = key_prefix
89             try:
90                 paginator = self.client.get_paginator(u"list_objects_v2")
91                 for page in paginator.paginate(**kwargs):
92                     try:
93                         contents = page[u"Contents"]
94                     except KeyError:
95                         break
96
97                     for obj in contents:
98                         key = obj[u"Key"]
99                         if key.endswith(suffix):
100                             yield obj
101             except exceptions.EndpointConnectionError:
102                 raise RuntimeError(
103                     u"Connection Error!"
104                 )
105
106     def _get_matching_s3_content(
107             self, key, expression):
108         """This function filters the contents of an S3 object based on a simple
109         structured query language (SQL) statement. In the request, along with
110         the SQL expression, we are specifying JSON serialization of the object.
111         S3 uses this format to parse object data into records, and returns only
112         records that match the specified SQL expression. Data serialization
113         format for the response is set to JSON.
114
115         :param key: S3 Key (file path).
116         :param expression: S3 compatible SQL query.
117         :type key: str
118         :type expression: str
119         :returns: JSON content of interest.
120         :rtype: str
121         :raises RuntimeError: If connection to storage fails.
122         :raises ValueError: If JSON reading fails.
123         """
124         try:
125             content = self.client.select_object_content(
126                 Bucket=self.bucket,
127                 Key=key,
128                 ExpressionType=u"SQL",
129                 Expression=expression,
130                 InputSerialization={
131                     u"JSON": {
132                         u"Type": u"Document"
133                     },
134                     u"CompressionType": u"GZIP"
135                 },
136                 OutputSerialization={
137                     u"JSON": {
138                         u"RecordDelimiter": u""
139                     }
140                 }
141             )
142             records = u""
143             for event in content[u"Payload"]:
144                 if u"Records" in event:
145                     records = event[u"Records"][u"Payload"].decode(u"utf-8")
146             return records
147         except exceptions.EndpointConnectionError:
148             raise RuntimeError(
149                 u"Connection Error!"
150             )
151         except exceptions.EventStreamError:
152             raise ValueError(
153                 u"Malformed JSON content!"
154             )
155
156     def _get_matching_s3_object(
157             self, key):
158         """Gets full S3 object. If the file is gzip'd it will be unpacked.
159
160         :param key: Name of the S3 key (file).
161         :type key: str
162         :returns: JSON file of interest.
163         :rtype: str
164         :raises RuntimeError: If connection to storage fails.
165         """
166         try:
167             streaming_object = self.client.get_object(
168                 Bucket=self.bucket,
169                 Key=key
170             )[u"Body"]
171             with GzipFile(fileobj=streaming_object) as gzipfile:
172                 content = gzipfile.read()
173             return content
174         except exceptions.EndpointConnectionError:
175             raise RuntimeError(
176                 u"Connection Error!"
177             )
178
179     def _get_matching_s3_length(
180             self, key):
181         """Gets the file size of S3 object. If the file is gzip'd the packed
182         size is reported.
183
184         :param key: Name of the S3 key (file).
185         :type key: str
186         :returns: File size in bytes. Defaults to 0 if any error.
187         :rtype: int
188         :raises RuntimeError: If connection to storage fails.
189         """
190         try:
191             compressed_size = self.client.get_object(
192                 Bucket=self.bucket,
193                 Key=key
194             )[u"ContentLength"]
195             last_four_bytes = self.client.get_object(
196                 Bucket=self.bucket,
197                 Key=key,
198                 Range=f"bytes={compressed_size-4}-{compressed_size}"
199             )[u"Body"]
200             return unpack(u"I", last_four_bytes.read(4))[0]
201         except exceptions.EndpointConnectionError:
202             return 0
203
204     def is_large_file(
205             self, key):
206         """Returns True if file is larger then 1MB that S3 select allows.
207
208         :param key: Name of the S3 key (file).
209         :type key: str
210         :returns: Returns True if file is large then 1MB that S3 select allows.
211         :rtype: bool
212         """
213         return bool(
214             self._get_matching_s3_length(key=key[u"Key"]) > S3_API_LIMIT
215         )
216
217     def s3_file_processing(
218             self, prefix=u"", suffix=u"json.gz",
219             expression=u"select * from s3object s"):
220         """Batch S3 key processing. Function retrieves list of files and use
221         S3 Select API to query content.
222
223         :param prefix: Only fetch keys that start with this prefix (optional).
224         :param suffix: Only fetch keys that end with this suffix (optional).
225         :param expression: S3 compatible SQL query (optional).
226         :type prefix: str
227         :type suffix: str
228         :type expression: str
229         """
230         key_iterator = self._get_matching_s3_keys(
231             bucket=self.bucket,
232             prefix=prefix,
233             suffix=suffix
234         )
235
236         for key in key_iterator:
237             try:
238                 yield key[u"Key"], loads(
239                     self._get_matching_s3_content(
240                         key=key[u"Key"], expression=expression
241                     )
242                 )
243             except ValueError:
244                 return
245
246     def s3_dump_file_processing(
247             self, prefix=u"", suffix=u"json.gz"):
248         """Batch S3 key processing. Function retrieves list of files and use
249         S3 Get Object API to query content.
250
251         :param prefix: Only fetch keys that start with this prefix (optional).
252         :param suffix: Only fetch keys that end with this suffix (optional).
253         :type prefix: str
254         :type suffix: str
255         """
256         key_iterator = self._get_matching_s3_keys(
257             bucket=self.bucket,
258             prefix=prefix,
259             suffix=suffix
260         )
261
262         for key in key_iterator:
263             try:
264                 yield loads(
265                     self._get_matching_s3_object(
266                         key=key[u"Key"]
267                     )
268                 )
269             except ValueError:
270                 return