Files
session_redis_public/attachment_s3/models/ir_attachment.py
T

180 lines
6.7 KiB
Python

# Copyright 2016-2019 Camptocamp SA
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
import io
import logging
import os
from urllib.parse import urlsplit
from typing import Optional
from odoo import api, models
from odoo.exceptions import UserError
from ..s3uri import S3Uri
_logger = logging.getLogger(__name__)
try:
import boto3
from botocore.exceptions import ClientError, EndpointConnectionError
except ImportError:
boto3 = None # noqa
ClientError = None # noqa
EndpointConnectionError = None # noqa
_logger.debug("Cannot 'import boto3'.")
class IrAttachment(models.Model):
_inherit = "ir.attachment"
def _get_stores(self):
return ["s3"] + super()._get_stores()
@api.model
def _get_s3_bucket(self, name: Optional[str] = None):
"""Connect to S3 and return the bucket
The following environment variables can be set:
* ``AWS_HOST``
* ``AWS_REGION``
* ``AWS_ACCESS_KEY_ID``
* ``AWS_SECRET_ACCESS_KEY``
* ``AWS_BUCKETNAME``
If a name is provided, we'll read this bucket, otherwise, the bucket
from the environment variable ``AWS_BUCKETNAME`` will be read.
"""
if not boto3:
raise UserError(self.env._("boto3 library is required for S3 integration."))
host = os.getenv("AWS_HOST").strip()
# Ensure `host`` is prefixed with a scheme (use https as default)
if host and not urlsplit(host).scheme:
host = f"https://{host}"
region_name = os.getenv("AWS_REGION")
access_key = os.getenv("AWS_ACCESS_KEY_ID")
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
bucket_name = (name or os.getenv("AWS_BUCKETNAME", "")).format(db=self.env.cr.dbname)
if not all([access_key, secret_key, bucket_name]):
msg = self.env._(
"Missing AWS credentials."
"If you want to read from the %(bucket_name)s S3 bucket, the following "
"environment variables must be set:\n"
"* AWS_ACCESS_KEY_ID\n"
"* AWS_SECRET_ACCESS_KEY\n"
"If you want to write in the %(bucket_name)s S3 bucket, this variable "
"must be set as well:\n"
"* AWS_BUCKETNAME\n"
"Optionally, the S3 host can be changed with:\n"
"* AWS_HOST\n"
).format(bucket_name=bucket_name)
raise UserError(msg)
s3_params = {
"aws_access_key_id": access_key,
"aws_secret_access_key": secret_key,
}
if host:
s3_params["endpoint_url"] = host
if region_name:
s3_params["region_name"] = region_name
s3 = boto3.resource("s3", **s3_params)
bucket = s3.Bucket(bucket_name)
try:
s3.meta.client.head_bucket(Bucket=bucket_name)
except ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the bucket does not exist.
if e.response.get("Error", {}).get("Code") == "404":
_logger.warning(f"S3 bucket '{bucket_name}' does not exist.")
return self._create_s3_bucket(s3, bucket_name, region_name)
raise UserError(f"Failed to connect to S3 bucket: {str(e)}") from None
except EndpointConnectionError as error:
# log verbose error from s3, return short message for user
_logger.exception("Error during S3 connection.")
raise UserError(str(error)) from None
return bucket
def _create_s3_bucket(self, s3, bucket_name: str, region_name: Optional[str]):
"""Create an S3 bucket if it does not exist."""
params = {"Bucket": bucket_name}
if region_name:
params["CreateBucketConfiguration"] = {"LocationConstraint": region_name}
try:
return s3.create_bucket(**params)
except ClientError as e:
_logger.exception(f"Failed to create S3 bucket '{bucket_name}'")
raise UserError(f"Bucket creation failed: {str(e)}") from None
@api.model
def _store_file_read(self, fname: str):
if fname.startswith("s3://"):
s3uri = S3Uri(fname)
try:
bucket = self._get_s3_bucket(name=s3uri.bucket)
except UserError:
_logger.exception(f"Error reading attachment '{fname}' from S3.")
return ""
key = s3uri.item()
try:
bucket.meta.client.head_object(Bucket=bucket.name, Key=key)
with io.BytesIO() as res:
bucket.download_fileobj(key, res)
res.seek(0)
return res.read()
except ClientError:
_logger.info(f"Attachment '{fname}' missing on S3.")
return ""
return super()._store_file_read(fname)
@api.model
def _store_file_write(self, key: str, bin_data: bytes) -> str:
location = self.env.context.get("storage_location") or self._storage()
if location == "s3":
bucket = self._get_s3_bucket()
obj = bucket.Object(key=key)
filename = f"s3://{bucket.name}/{key}"
try:
with io.BytesIO(bin_data) as file:
obj.upload_fileobj(file)
except ClientError as error:
# log verbose error from s3, return short message for user
_logger.exception(f"Error storing file {filename} on S3.")
raise UserError(self.env._("The file could not be stored: %s") % str(error)) from None
return filename
return super()._store_file_write(key, bin_data)
@api.model
def _store_file_delete(self, fname: str):
if fname.startswith("s3://"):
s3uri = S3Uri(fname)
bucket_name = s3uri.bucket()
item_name = s3uri.item()
# delete the file only if it is on the current configured bucket
# otherwise, we might delete files used on a different environment
if bucket_name == os.getenv("AWS_BUCKETNAME"):
bucket = self._get_s3_bucket()
obj = bucket.Object(key=item_name)
try:
bucket.meta.client.head_object(Bucket=bucket.name, Key=item_name)
obj.delete()
_logger.info(f"File {fname} deleted from S3.")
except ClientError:
# log verbose error from s3, return short message for user
_logger.exception(f"Error deleting file {fname} from S3.")
else:
return super()._store_file_delete(fname)