mirror of
https://github.com/velocitatem/cvfs.git
synced 2026-05-31 16:53:38 +00:00
93 lines
2.9 KiB
Python
93 lines
2.9 KiB
Python
from __future__ import annotations
|
|
|
|
import mimetypes
|
|
import os
|
|
from dataclasses import dataclass
|
|
from datetime import timedelta
|
|
from typing import BinaryIO
|
|
from uuid import uuid4
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class MinioStorageConfig:
|
|
bucket_name: str
|
|
region_name: str = "us-east-1"
|
|
endpoint_url: str | None = None
|
|
access_key_id: str | None = None
|
|
secret_access_key: str | None = None
|
|
path_prefix: str = "artifacts"
|
|
|
|
|
|
class MinioStorageClient:
|
|
def __init__(self, config: MinioStorageConfig):
|
|
self.config = config
|
|
self._client = boto3.client(
|
|
"s3",
|
|
region_name=config.region_name,
|
|
endpoint_url=config.endpoint_url,
|
|
aws_access_key_id=config.access_key_id or os.getenv("MINIO_ROOT_USER"),
|
|
aws_secret_access_key=config.secret_access_key
|
|
or os.getenv("MINIO_ROOT_PASSWORD"),
|
|
)
|
|
|
|
def build_key(
|
|
self, *, stem: str | None = None, extension: str | None = None
|
|
) -> str:
|
|
suffix = extension or ""
|
|
if suffix and not suffix.startswith("."):
|
|
suffix = f".{suffix}"
|
|
filename = f"{stem or uuid4().hex}{suffix or ''}"
|
|
prefix = self.config.path_prefix.strip("/")
|
|
return f"{prefix}/{filename}" if prefix else filename
|
|
|
|
def upload_bytes(
|
|
self, *, key: str, data: bytes, content_type: str | None = None
|
|
) -> str:
|
|
content_type = (
|
|
content_type or mimetypes.guess_type(key)[0] or "application/octet-stream"
|
|
)
|
|
self._client.put_object(
|
|
Bucket=self.config.bucket_name, Key=key, Body=data, ContentType=content_type
|
|
)
|
|
return key
|
|
|
|
def upload_fileobj(
|
|
self, *, fileobj: BinaryIO, key: str, content_type: str | None = None
|
|
) -> str:
|
|
content_type = (
|
|
content_type or mimetypes.guess_type(key)[0] or "application/octet-stream"
|
|
)
|
|
self._client.upload_fileobj(
|
|
fileobj,
|
|
self.config.bucket_name,
|
|
key,
|
|
ExtraArgs={"ContentType": content_type},
|
|
)
|
|
return key
|
|
|
|
def generate_presigned_url(self, *, key: str, expires_in: int = 900) -> str | None:
|
|
try:
|
|
return self._client.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": self.config.bucket_name, "Key": key},
|
|
ExpiresIn=int(timedelta(seconds=expires_in).total_seconds()),
|
|
)
|
|
except ClientError:
|
|
return None
|
|
|
|
def delete_object(self, *, key: str) -> None:
|
|
try:
|
|
self._client.delete_object(Bucket=self.config.bucket_name, Key=key)
|
|
except ClientError:
|
|
pass
|
|
|
|
def download_bytes(self, *, key: str) -> bytes:
|
|
response = self._client.get_object(Bucket=self.config.bucket_name, Key=key)
|
|
body = response.get("Body")
|
|
if body:
|
|
return body.read()
|
|
return b""
|