mirror of
https://github.com/velocitatem/cvfs.git
synced 2026-05-31 08:43:37 +00:00
Finish MVP and dockerize
This commit is contained in:
5
dlib/__init__.py
Normal file
5
dlib/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Domain-specific helpers for Resume Branches."""
|
||||
|
||||
from . import cv, ai, storage, auth # noqa: F401
|
||||
|
||||
__all__ = ["cv", "ai", "storage", "auth"]
|
||||
3
dlib/ai/__init__.py
Normal file
3
dlib/ai/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .tailoring import generate_tailoring_suggestions, TailoringContext
|
||||
|
||||
__all__ = ["generate_tailoring_suggestions", "TailoringContext"]
|
||||
134
dlib/ai/tailoring.py
Normal file
134
dlib/ai/tailoring.py
Normal file
@@ -0,0 +1,134 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import textwrap
|
||||
from typing import Sequence
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from alveslib import ask
|
||||
|
||||
from dlib.cv.schema import (
|
||||
PatchOperation,
|
||||
PatchSuggestion,
|
||||
StructuredBlock,
|
||||
StructuredDocument,
|
||||
)
|
||||
|
||||
|
||||
class TailoringContext(BaseModel):
|
||||
job_description: str
|
||||
focus_keywords: list[str] = Field(default_factory=list)
|
||||
prohibited_terms: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
def generate_tailoring_suggestions(
|
||||
context: TailoringContext,
|
||||
document: StructuredDocument,
|
||||
*,
|
||||
max_changes: int = 12,
|
||||
) -> list[PatchSuggestion]:
|
||||
if not document.blocks:
|
||||
return []
|
||||
if not os.getenv("ANTHROPIC_API_KEY"):
|
||||
return _rule_based_suggestions(context, document, max_changes)
|
||||
|
||||
prompt = _build_prompt(context, document, max_changes)
|
||||
raw = ask(prompt)
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
candidates = payload.get("patches", payload)
|
||||
except json.JSONDecodeError:
|
||||
return _rule_based_suggestions(context, document, max_changes)
|
||||
|
||||
suggestions: list[PatchSuggestion] = []
|
||||
for candidate in candidates[:max_changes]:
|
||||
try:
|
||||
suggestions.append(PatchSuggestion.model_validate(candidate))
|
||||
except Exception:
|
||||
continue
|
||||
return suggestions or _rule_based_suggestions(context, document, max_changes)
|
||||
|
||||
|
||||
def _rule_based_suggestions(
|
||||
context: TailoringContext,
|
||||
document: StructuredDocument,
|
||||
max_changes: int,
|
||||
) -> list[PatchSuggestion]:
|
||||
keywords = set([kw.lower() for kw in context.focus_keywords])
|
||||
if not keywords:
|
||||
keywords = set(_extract_keywords(context.job_description))
|
||||
suggestions: list[PatchSuggestion] = []
|
||||
for block in document.blocks:
|
||||
overlap = keywords.intersection({kw.lower() for kw in block.keywords})
|
||||
if not overlap and len(suggestions) < max_changes:
|
||||
keyword = next(iter(keywords), None)
|
||||
if keyword:
|
||||
suggestions.append(
|
||||
PatchSuggestion(
|
||||
target_path=block.path,
|
||||
operation=PatchOperation.BOOST_KEYWORD,
|
||||
new_value=keyword,
|
||||
rationale="Surface JD keyword in existing bullet",
|
||||
keywords=[keyword],
|
||||
confidence=0.4,
|
||||
)
|
||||
)
|
||||
elif overlap and len(suggestions) < max_changes:
|
||||
keyword = next(iter(overlap))
|
||||
suggestions.append(
|
||||
PatchSuggestion(
|
||||
target_path=block.path,
|
||||
operation=PatchOperation.REPLACE_TEXT,
|
||||
new_value=_strengthen_sentence(block, keyword),
|
||||
old_value=block.text,
|
||||
rationale=f"Highlight {keyword}",
|
||||
keywords=[keyword],
|
||||
confidence=0.55,
|
||||
)
|
||||
)
|
||||
return suggestions[:max_changes]
|
||||
|
||||
|
||||
def _strengthen_sentence(block: StructuredBlock, keyword: str) -> str:
|
||||
text = block.text.strip()
|
||||
if keyword.lower() not in text.lower():
|
||||
return f"{text} — emphasized {keyword} impact"
|
||||
return re.sub(keyword, keyword.upper(), text, flags=re.IGNORECASE)
|
||||
|
||||
|
||||
def _extract_keywords(job_description: str, limit: int = 8) -> list[str]:
|
||||
tokens = {}
|
||||
for token in re.findall(r"[A-Za-z][A-Za-z0-9+./-]{2,}", job_description):
|
||||
t = token.lower()
|
||||
tokens[t] = tokens.get(t, 0) + 1
|
||||
return [
|
||||
token
|
||||
for token, _ in sorted(tokens.items(), key=lambda kv: kv[1], reverse=True)[
|
||||
:limit
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
def _build_prompt(
|
||||
context: TailoringContext, document: StructuredDocument, max_changes: int
|
||||
) -> str:
|
||||
lines = [f"{block.path}: {block.text}" for block in document.blocks]
|
||||
doc_preview = "\n".join(lines[:40])
|
||||
focus = ", ".join(context.focus_keywords) or "n/a"
|
||||
prohibited = ", ".join(context.prohibited_terms) or "n/a"
|
||||
return textwrap.dedent(
|
||||
f"""
|
||||
You are an ATS-preserving copy editor. Job description:\n{context.job_description}\n---\n
|
||||
Existing resume snippets:\n{doc_preview}
|
||||
|
||||
Provide at most {max_changes} JSON patch objects with fields
|
||||
target_path, operation, new_value, rationale, keywords, confidence.
|
||||
Allowed operations: replace_text, boost_keyword, suppress_block.
|
||||
Focus keywords: {focus}. Forbidden topics: {prohibited}.
|
||||
Ensure every change is truthful and preserves formatting.
|
||||
Respond with JSON: {{"patches": [{{...}}]}} only.
|
||||
"""
|
||||
).strip()
|
||||
3
dlib/auth/__init__.py
Normal file
3
dlib/auth/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .oidc import AuthenticatedUser, OidcTokenValidator, build_validator
|
||||
|
||||
__all__ = ["AuthenticatedUser", "OidcTokenValidator", "build_validator"]
|
||||
92
dlib/auth/oidc.py
Normal file
92
dlib/auth/oidc.py
Normal file
@@ -0,0 +1,92 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from jose import JWTError, jwt
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class AuthenticatedUser(BaseModel):
|
||||
sub: str
|
||||
email: str | None = None
|
||||
name: str | None = None
|
||||
picture: str | None = None
|
||||
roles: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class TokenValidationError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class OidcTokenValidator:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
issuer: str | None,
|
||||
audience: str | None,
|
||||
jwks_url: str | None = None,
|
||||
disable: bool = False,
|
||||
) -> None:
|
||||
self.issuer = issuer
|
||||
self.audience = audience
|
||||
self.jwks_url = jwks_url or (
|
||||
f"{issuer.rstrip('/')}/.well-known/jwks.json" if issuer else None
|
||||
)
|
||||
self.disable = disable or not issuer
|
||||
self._jwks: dict[str, Any] | None = None
|
||||
self._jwks_expiry: float = 0
|
||||
|
||||
async def validate(self, token: str) -> AuthenticatedUser:
|
||||
if self.disable or not token:
|
||||
return AuthenticatedUser(
|
||||
sub="dev-user", email="dev@example.com", name="Developer"
|
||||
)
|
||||
header = jwt.get_unverified_header(token)
|
||||
key = await self._get_key(header.get("kid"))
|
||||
if not key:
|
||||
raise TokenValidationError("Unable to resolve signing key")
|
||||
try:
|
||||
claims = jwt.decode(
|
||||
token,
|
||||
key,
|
||||
algorithms=[key.get("alg", "RS256")],
|
||||
audience=self.audience,
|
||||
issuer=self.issuer,
|
||||
)
|
||||
except JWTError as exc:
|
||||
raise TokenValidationError(str(exc)) from exc
|
||||
roles = claims.get("roles") or claims.get("app_metadata", {}).get("roles") or []
|
||||
if isinstance(roles, str):
|
||||
roles = [roles]
|
||||
return AuthenticatedUser(
|
||||
sub=str(claims.get("sub")),
|
||||
email=claims.get("email"),
|
||||
name=claims.get("name"),
|
||||
picture=claims.get("picture"),
|
||||
roles=roles,
|
||||
)
|
||||
|
||||
async def _get_key(self, kid: str | None) -> dict[str, Any] | None:
|
||||
if not self.jwks_url:
|
||||
return None
|
||||
if not self._jwks or time.time() > self._jwks_expiry:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
response = await client.get(self.jwks_url)
|
||||
response.raise_for_status()
|
||||
self._jwks = response.json()
|
||||
self._jwks_expiry = time.time() + 3600
|
||||
keys = self._jwks.get("keys", []) if isinstance(self._jwks, dict) else []
|
||||
if kid:
|
||||
for key in keys:
|
||||
if key.get("kid") == kid:
|
||||
return key
|
||||
return keys[0] if keys else None
|
||||
|
||||
|
||||
def build_validator(
|
||||
*, issuer: str | None, audience: str | None, disable: bool
|
||||
) -> OidcTokenValidator:
|
||||
return OidcTokenValidator(issuer=issuer, audience=audience, disable=disable)
|
||||
22
dlib/cv/__init__.py
Normal file
22
dlib/cv/__init__.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from .schema import (
|
||||
StructuredBlock,
|
||||
StructuredDocument,
|
||||
PatchPayload,
|
||||
PatchSuggestion,
|
||||
PatchOperation,
|
||||
)
|
||||
from .parser import parse_docx_bytes, summarize_keywords
|
||||
from .patcher import apply_patchset
|
||||
from .ats_guard import validate_patchset
|
||||
|
||||
__all__ = [
|
||||
"StructuredBlock",
|
||||
"StructuredDocument",
|
||||
"PatchPayload",
|
||||
"PatchSuggestion",
|
||||
"PatchOperation",
|
||||
"parse_docx_bytes",
|
||||
"summarize_keywords",
|
||||
"apply_patchset",
|
||||
"validate_patchset",
|
||||
]
|
||||
44
dlib/cv/ats_guard.py
Normal file
44
dlib/cv/ats_guard.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Iterable
|
||||
|
||||
from .schema import PatchPayload, PatchOperation, StructuredDocument
|
||||
|
||||
|
||||
class PatchValidationError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
def validate_patchset(
|
||||
document: StructuredDocument,
|
||||
patches: Iterable[PatchPayload],
|
||||
*,
|
||||
max_changes: int = 12,
|
||||
max_growth_ratio: float = 1.45,
|
||||
) -> None:
|
||||
patch_list = list(patches)
|
||||
if len(patch_list) > max_changes:
|
||||
raise PatchValidationError(
|
||||
f"Patchset exceeds max changes ({len(patch_list)} > {max_changes})"
|
||||
)
|
||||
block_map = {block.path: block for block in document.blocks}
|
||||
for patch in patch_list:
|
||||
block = block_map.get(patch.target_path)
|
||||
if not block:
|
||||
raise PatchValidationError(
|
||||
f"Target path {patch.target_path} does not exist in base document"
|
||||
)
|
||||
if patch.operation == PatchOperation.REPLACE_TEXT:
|
||||
if not patch.new_value:
|
||||
raise PatchValidationError("replace_text requires new_value")
|
||||
baseline = len(block.text.strip()) or 1
|
||||
if len(patch.new_value.strip()) / baseline > max_growth_ratio:
|
||||
raise PatchValidationError("Patch grows text beyond ATS safe threshold")
|
||||
if (
|
||||
patch.operation
|
||||
in {PatchOperation.REMOVE_BLOCK, PatchOperation.SUPPRESS_BLOCK}
|
||||
and block.block_type == "heading"
|
||||
):
|
||||
raise PatchValidationError(
|
||||
"Headings cannot be removed without manual confirmation"
|
||||
)
|
||||
104
dlib/cv/parser.py
Normal file
104
dlib/cv/parser.py
Normal file
@@ -0,0 +1,104 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from io import BytesIO
|
||||
from typing import Iterable
|
||||
|
||||
from docx import Document
|
||||
|
||||
from .schema import StructuredBlock, StructuredDocument
|
||||
|
||||
|
||||
def _detect_block_type(style_name: str | None, paragraph) -> str:
|
||||
style = (style_name or "").lower()
|
||||
if style.startswith("heading"):
|
||||
return "heading"
|
||||
if (
|
||||
"bullet" in style
|
||||
or "list" in style
|
||||
or getattr(paragraph, "style", None)
|
||||
and getattr(paragraph.style, "name", "").lower().startswith("list")
|
||||
):
|
||||
return "bullet"
|
||||
return "text"
|
||||
|
||||
|
||||
def _build_path(block_type: str, counter: int, extra: str | None = None) -> str:
|
||||
suffix = f"{block_type}[{counter}]"
|
||||
if extra:
|
||||
return f"{suffix}.{extra}"
|
||||
return suffix
|
||||
|
||||
|
||||
def parse_docx_bytes(
|
||||
file_bytes: bytes, *, version_label: str | None = None
|
||||
) -> StructuredDocument:
|
||||
document = Document(BytesIO(file_bytes))
|
||||
counters: defaultdict[str, int] = defaultdict(int)
|
||||
blocks: list[StructuredBlock] = []
|
||||
|
||||
for paragraph in document.paragraphs:
|
||||
text = paragraph.text.strip()
|
||||
if not text:
|
||||
continue
|
||||
block_type = _detect_block_type(
|
||||
getattr(paragraph.style, "name", None), paragraph
|
||||
)
|
||||
counters[block_type] += 1
|
||||
keywords = summarize_keywords([text])
|
||||
blocks.append(
|
||||
StructuredBlock(
|
||||
path=_build_path(block_type, counters[block_type]),
|
||||
block_type="heading"
|
||||
if block_type == "heading"
|
||||
else ("bullet" if block_type == "bullet" else "text"),
|
||||
text=text,
|
||||
keywords=keywords,
|
||||
metadata={
|
||||
"style": getattr(getattr(paragraph, "style", None), "name", "")
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
for table_index, table in enumerate(document.tables):
|
||||
for row_index, row in enumerate(table.rows):
|
||||
for cell_index, cell in enumerate(row.cells):
|
||||
text = cell.text.strip()
|
||||
if not text:
|
||||
continue
|
||||
counters["table"] += 1
|
||||
blocks.append(
|
||||
StructuredBlock(
|
||||
path=_build_path(
|
||||
"table",
|
||||
counters["table"],
|
||||
extra=f"{row_index}-{cell_index}",
|
||||
),
|
||||
block_type="table",
|
||||
text=text,
|
||||
keywords=summarize_keywords([text]),
|
||||
metadata={
|
||||
"table_index": table_index,
|
||||
"row": row_index,
|
||||
"cell": cell_index,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
return StructuredDocument(version_label=version_label, blocks=blocks)
|
||||
|
||||
|
||||
def summarize_keywords(lines: Iterable[str], *, max_keywords: int = 6) -> list[str]:
|
||||
terms: dict[str, int] = {}
|
||||
for line in lines:
|
||||
for raw in line.split():
|
||||
cleaned = raw.strip().strip(",.;:()[]").lower()
|
||||
if len(cleaned) <= 2:
|
||||
continue
|
||||
terms[cleaned] = terms.get(cleaned, 0) + 1
|
||||
return [
|
||||
term
|
||||
for term, _ in sorted(terms.items(), key=lambda kv: kv[1], reverse=True)[
|
||||
:max_keywords
|
||||
]
|
||||
]
|
||||
53
dlib/cv/patcher.py
Normal file
53
dlib/cv/patcher.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
from .schema import PatchOperation, PatchPayload, StructuredDocument
|
||||
|
||||
|
||||
def apply_patchset(
|
||||
document: StructuredDocument, patches: list[PatchPayload]
|
||||
) -> StructuredDocument:
|
||||
working = StructuredDocument.model_validate(deepcopy(document.model_dump()))
|
||||
for patch in patches:
|
||||
block = working.get_block(patch.target_path)
|
||||
if not block:
|
||||
continue
|
||||
if patch.operation == PatchOperation.REPLACE_TEXT:
|
||||
block.metadata["previous_text"] = block.text
|
||||
if patch.new_value:
|
||||
block.text = patch.new_value
|
||||
elif patch.operation == PatchOperation.REMOVE_BLOCK:
|
||||
working.blocks = [
|
||||
candidate
|
||||
for candidate in working.blocks
|
||||
if candidate.path != patch.target_path
|
||||
]
|
||||
elif patch.operation == PatchOperation.REORDER_SECTION:
|
||||
target_index = (
|
||||
patch.metadata.get("target_index") if patch.metadata else None
|
||||
)
|
||||
if target_index is None:
|
||||
continue
|
||||
to_move = next(
|
||||
(
|
||||
candidate
|
||||
for candidate in working.blocks
|
||||
if candidate.path == patch.target_path
|
||||
),
|
||||
None,
|
||||
)
|
||||
if not to_move:
|
||||
continue
|
||||
working.blocks = [
|
||||
candidate
|
||||
for candidate in working.blocks
|
||||
if candidate.path != patch.target_path
|
||||
]
|
||||
working.blocks.insert(int(target_index), to_move)
|
||||
elif patch.operation == PatchOperation.BOOST_KEYWORD and patch.new_value:
|
||||
if patch.new_value not in block.keywords:
|
||||
block.keywords.insert(0, patch.new_value)
|
||||
elif patch.operation == PatchOperation.SUPPRESS_BLOCK:
|
||||
block.metadata["suppressed"] = True
|
||||
return working
|
||||
54
dlib/cv/schema.py
Normal file
54
dlib/cv/schema.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, Field, ConfigDict
|
||||
|
||||
|
||||
class StructuredBlock(BaseModel):
|
||||
"""Editable slice of a DOCX document."""
|
||||
|
||||
model_config = ConfigDict(extra="allow")
|
||||
|
||||
path: str
|
||||
block_type: Literal[
|
||||
"heading", "summary", "bullet", "skills", "table", "meta", "text"
|
||||
]
|
||||
text: str
|
||||
keywords: list[str] = Field(default_factory=list)
|
||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class StructuredDocument(BaseModel):
|
||||
model_config = ConfigDict(extra="allow")
|
||||
|
||||
version_label: str | None = None
|
||||
blocks: list[StructuredBlock] = Field(default_factory=list)
|
||||
|
||||
def get_block(self, path: str) -> StructuredBlock | None:
|
||||
return next((block for block in self.blocks if block.path == path), None)
|
||||
|
||||
|
||||
class PatchOperation(str, Enum):
|
||||
REPLACE_TEXT = "replace_text"
|
||||
REMOVE_BLOCK = "remove_block"
|
||||
REORDER_SECTION = "reorder_section"
|
||||
BOOST_KEYWORD = "boost_keyword"
|
||||
SUPPRESS_BLOCK = "suppress_block"
|
||||
|
||||
|
||||
class PatchPayload(BaseModel):
|
||||
model_config = ConfigDict(extra="allow")
|
||||
|
||||
target_path: str
|
||||
operation: PatchOperation
|
||||
new_value: str | None = None
|
||||
old_value: str | None = None
|
||||
rationale: str | None = None
|
||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class PatchSuggestion(PatchPayload):
|
||||
confidence: float | None = None
|
||||
keywords: list[str] = Field(default_factory=list)
|
||||
3
dlib/storage/__init__.py
Normal file
3
dlib/storage/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .s3 import S3StorageClient, S3StorageConfig
|
||||
|
||||
__all__ = ["S3StorageClient", "S3StorageConfig"]
|
||||
92
dlib/storage/minio.py
Normal file
92
dlib/storage/minio.py
Normal file
@@ -0,0 +1,92 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import mimetypes
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import timedelta
|
||||
from typing import BinaryIO
|
||||
from uuid import uuid4
|
||||
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MinioStorageConfig:
|
||||
bucket_name: str
|
||||
region_name: str = "us-east-1"
|
||||
endpoint_url: str | None = None
|
||||
access_key_id: str | None = None
|
||||
secret_access_key: str | None = None
|
||||
path_prefix: str = "artifacts"
|
||||
|
||||
|
||||
class MinioStorageClient:
|
||||
def __init__(self, config: MinioStorageConfig):
|
||||
self.config = config
|
||||
self._client = boto3.client(
|
||||
"s3",
|
||||
region_name=config.region_name,
|
||||
endpoint_url=config.endpoint_url,
|
||||
aws_access_key_id=config.access_key_id or os.getenv("MINIO_ROOT_USER"),
|
||||
aws_secret_access_key=config.secret_access_key
|
||||
or os.getenv("MINIO_ROOT_PASSWORD"),
|
||||
)
|
||||
|
||||
def build_key(
|
||||
self, *, stem: str | None = None, extension: str | None = None
|
||||
) -> str:
|
||||
suffix = extension or ""
|
||||
if suffix and not suffix.startswith("."):
|
||||
suffix = f".{suffix}"
|
||||
filename = f"{stem or uuid4().hex}{suffix or ''}"
|
||||
prefix = self.config.path_prefix.strip("/")
|
||||
return f"{prefix}/{filename}" if prefix else filename
|
||||
|
||||
def upload_bytes(
|
||||
self, *, key: str, data: bytes, content_type: str | None = None
|
||||
) -> str:
|
||||
content_type = (
|
||||
content_type or mimetypes.guess_type(key)[0] or "application/octet-stream"
|
||||
)
|
||||
self._client.put_object(
|
||||
Bucket=self.config.bucket_name, Key=key, Body=data, ContentType=content_type
|
||||
)
|
||||
return key
|
||||
|
||||
def upload_fileobj(
|
||||
self, *, fileobj: BinaryIO, key: str, content_type: str | None = None
|
||||
) -> str:
|
||||
content_type = (
|
||||
content_type or mimetypes.guess_type(key)[0] or "application/octet-stream"
|
||||
)
|
||||
self._client.upload_fileobj(
|
||||
fileobj,
|
||||
self.config.bucket_name,
|
||||
key,
|
||||
ExtraArgs={"ContentType": content_type},
|
||||
)
|
||||
return key
|
||||
|
||||
def generate_presigned_url(self, *, key: str, expires_in: int = 900) -> str | None:
|
||||
try:
|
||||
return self._client.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": self.config.bucket_name, "Key": key},
|
||||
ExpiresIn=int(timedelta(seconds=expires_in).total_seconds()),
|
||||
)
|
||||
except ClientError:
|
||||
return None
|
||||
|
||||
def delete_object(self, *, key: str) -> None:
|
||||
try:
|
||||
self._client.delete_object(Bucket=self.config.bucket_name, Key=key)
|
||||
except ClientError:
|
||||
pass
|
||||
|
||||
def download_bytes(self, *, key: str) -> bytes:
|
||||
response = self._client.get_object(Bucket=self.config.bucket_name, Key=key)
|
||||
body = response.get("Body")
|
||||
if body:
|
||||
return body.read()
|
||||
return b""
|
||||
Reference in New Issue
Block a user