1 #!/usr/bin/env/env python3
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Storage Backend Class."""
18 from json import loads
19 from struct import unpack
20 from gzip import GzipFile
22 from boto3 import Session
23 from botocore import exceptions
25 S3_API_LIMIT = 1048576
29 """Class implementing storage object retrieval.
30 S3 Select API allows us to retrieve a subset of data by using simple SQL
31 expressions. By using Select API to retrieve only the data needed by the
32 application, drastic performance improvements can be achieved.
34 def __init__(self, endpoint_url, bucket, profile_name):
35 """Class init function to create S3 client object.
37 :param endpoint_url: S3 storage endpoint url.
38 :param bucket: S3 parent bucket.
39 :param profile_name: S3 storage configuration.
40 :type endpoint_url: str
42 :type profile_name: str
44 self.endpoint_url = endpoint_url
46 self.profile_name = profile_name
48 self.session = Session(profile_name=self.profile_name)
49 self.client = self.session.client(
50 service_name=u"s3", endpoint_url=self.endpoint_url
52 self.resource = self.session.resource(
53 service_name=u"s3", endpoint_url=self.endpoint_url
57 """Return a string executable as Python constructor call.
59 :returns: Executable constructor call.
63 f"Storage(endpoint_url={self.endpoint_url!r}, "
64 f"bucket={self.bucket!r}, "
65 f"profile_name={self.profile_name!r})"
68 def _get_matching_s3_keys(
69 self, bucket, prefix=u"", suffix=u""):
70 """This function generates the keys in an S3 bucket. Function act as
71 a Python generator object.
73 :param bucket: Name of the S3 bucket.
74 :param prefix: Only fetch keys that start with this prefix (optional).
75 :param suffix: Only fetch keys that end with this suffix (optional).
79 :raises RuntimeError: If connection to storage fails.
85 prefixes = (prefix, ) if isinstance(prefix, str) else prefix
87 for key_prefix in prefixes:
88 kwargs[u"Prefix"] = key_prefix
90 paginator = self.client.get_paginator(u"list_objects_v2")
91 for page in paginator.paginate(**kwargs):
93 contents = page[u"Contents"]
99 if key.endswith(suffix):
101 except exceptions.EndpointConnectionError:
106 def _get_matching_s3_content(
107 self, key, expression):
108 """This function filters the contents of an S3 object based on a simple
109 structured query language (SQL) statement. In the request, along with
110 the SQL expression, we are specifying JSON serialization of the object.
111 S3 uses this format to parse object data into records, and returns only
112 records that match the specified SQL expression. Data serialization
113 format for the response is set to JSON.
115 :param key: S3 Key (file path).
116 :param expression: S3 compatible SQL query.
118 :type expression: str
119 :returns: JSON content of interest.
121 :raises RuntimeError: If connection to storage fails.
122 :raises ValueError: If JSON reading fails.
125 content = self.client.select_object_content(
128 ExpressionType=u"SQL",
129 Expression=expression,
134 u"CompressionType": u"GZIP"
136 OutputSerialization={
138 u"RecordDelimiter": u""
143 for event in content[u"Payload"]:
144 if u"Records" in event:
145 records = event[u"Records"][u"Payload"].decode(u"utf-8")
147 except exceptions.EndpointConnectionError:
151 except exceptions.EventStreamError:
153 u"Malformed JSON content!"
156 def _get_matching_s3_object(
158 """Gets full S3 object. If the file is gzip'd it will be unpacked.
160 :param key: Name of the S3 key (file).
162 :returns: JSON file of interest.
164 :raises RuntimeError: If connection to storage fails.
167 streaming_object = self.client.get_object(
171 with GzipFile(fileobj=streaming_object) as gzipfile:
172 content = gzipfile.read()
174 except exceptions.EndpointConnectionError:
179 def _get_matching_s3_length(
181 """Gets the file size of S3 object. If the file is gzip'd the packed
184 :param key: Name of the S3 key (file).
186 :returns: File size in bytes. Defaults to 0 if any error.
188 :raises RuntimeError: If connection to storage fails.
191 compressed_size = self.client.get_object(
195 last_four_bytes = self.client.get_object(
198 Range=f"bytes={compressed_size-4}-{compressed_size}"
200 return unpack(u"I", last_four_bytes.read(4))[0]
201 except exceptions.EndpointConnectionError:
206 """Returns True if file is larger then 1MB that S3 select allows.
208 :param key: Name of the S3 key (file).
210 :returns: Returns True if file is large then 1MB that S3 select allows.
214 self._get_matching_s3_length(key=key[u"Key"]) > S3_API_LIMIT
217 def s3_file_processing(
218 self, prefix=u"", suffix=u"json.gz",
219 expression=u"select * from s3object s"):
220 """Batch S3 key processing. Function retrieves list of files and use
221 S3 Select API to query content.
223 :param prefix: Only fetch keys that start with this prefix (optional).
224 :param suffix: Only fetch keys that end with this suffix (optional).
225 :param expression: S3 compatible SQL query (optional).
228 :type expression: str
230 key_iterator = self._get_matching_s3_keys(
236 for key in key_iterator:
238 yield key[u"Key"], loads(
239 self._get_matching_s3_content(
240 key=key[u"Key"], expression=expression
246 def s3_dump_file_processing(
247 self, prefix=u"", suffix=u"json.gz"):
248 """Batch S3 key processing. Function retrieves list of files and use
249 S3 Get Object API to query content.
251 :param prefix: Only fetch keys that start with this prefix (optional).
252 :param suffix: Only fetch keys that end with this suffix (optional).
256 key_iterator = self._get_matching_s3_keys(
262 for key in key_iterator:
265 self._get_matching_s3_object(