import subprocess
import sys
import tempfile
-from mimetypes import MimeTypes
import boto3
from botocore.exceptions import ClientError
logging.getLogger(u"botocore").setLevel(logging.INFO)
+FILE_TYPE = {
+ u"xml": u"application/xml",
+ u"html": u"text/html",
+ u"txt": u"text/plain",
+ u"log": u"text/plain",
+ u"css": u"text/css",
+ u"md": u"text/markdown",
+ u"rst": u"text/x-rst",
+ u"csv": u"text/csv",
+ u"svg": u"image/svg+xml",
+ u"jpg": u"image/jpeg",
+ u"png": u"image/png",
+ u"gif": u"image/gif",
+ u"js": u"application/javascript",
+ u"pdf": u"application/pdf",
+ u"json": u"application/json",
+ u"otf": u"font/otf",
+ u"ttf": u"font/ttf",
+ u"woff": u"font/woff",
+ u"woff2": u"font/woff2"
+}
+
+
def compress_text(src_dpath):
"""Compress all text files in directory.
raise RuntimeError(u"Not a directory.")
else:
logging.debug("Archives dir {} does exist.".format(archives_dir))
- for file_or_dir in os.listdir(archives_dir):
- f = os.path.join(archives_dir, file_or_dir)
+ for item in os.listdir(archives_dir):
+ src = os.path.join(archives_dir, item)
+ dst = os.path.join(dest_dir, item)
try:
- logging.debug(u"Copying " + f)
- shutil.copy(f, dest_dir)
+ if os.path.isdir(src):
+ shutil.copytree(src, dst, symlinks=False, ignore=None)
+ else:
+ shutil.copy2(src, dst)
except shutil.Error as e:
logging.error(e)
- raise RuntimeError(u"Could not copy " + f)
+ raise RuntimeError(u"Could not copy " + src)
else:
logging.error(u"Archives dir does not exist.")
raise RuntimeError(u"Missing directory " + archives_dir)
:type src_fpath: str
:type s3_path: str
"""
- extra_args = {
- u"ContentType": u"text/plain"
- }
- text_html_extra_args = {
- u"ContentType": u"text/html",
- u"ContentEncoding": MimeTypes().guess_type(src_fpath)[1]
- }
- text_plain_extra_args = {
- u"ContentType": u"text/plain",
- u"ContentEncoding": MimeTypes().guess_type(src_fpath)[1]
- }
- app_xml_extra_args = {
- u"ContentType": u"application/xml",
- u"ContentEncoding": MimeTypes().guess_type(src_fpath)[1]
- }
-
- mime = MimeTypes().guess_type(src_fpath)[0]
- encoding = MimeTypes().guess_type(src_fpath)[1]
-
- if mime is None and encoding is None:
- extra_args = extra_args
- elif mime is None or mime in u"text/plain":
- extra_args = text_plain_extra_args
- elif mime in u"text/html":
- extra_args = text_html_extra_args
- elif mime in u"application/xml":
- extra_args = app_xml_extra_args
- else:
- extra_args = extra_args
+ def is_gzip_file(filepath):
+ with open(filepath, u"rb") as test_f:
+ return test_f.read(2) == b"\x1f\x8b"
+
+ if os.path.isdir(src_fpath):
+ return
+ if os.path.isfile(src_fpath):
+ file_name, file_extension = os.path.splitext(src_fpath)
+ content_encoding = u""
+ content_type = u"application/octet-stream"
+ if is_gzip_file(src_fpath):
+ file_name, file_extension = os.path.splitext(file_name)
+ content_encoding = "gzip"
+ content_type = FILE_TYPE.get(
+ file_extension.strip("."),
+ u"application/octet-stream"
+ )
+
+ extra_args = dict()
+ extra_args[u"ContentType"] = content_type
+ if content_encoding:
+ extra_args[u"ContentEncoding"] = content_encoding
try:
s3_resource.Bucket(s3_bucket).upload_file(
except KeyError:
s3_resource = boto3.resource(
u"s3"
- )
+ )
previous_dir = os.getcwd()
work_dir = tempfile.mkdtemp(prefix="backup-s3.")