Initial commit

This commit is contained in:
Daniel Alves Rösel
2026-04-02 18:47:14 +02:00
committed by GitHub
commit 90ad5e0260
94 changed files with 7797 additions and 0 deletions

4
alveslib/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .logger import get_logger
from .agent import ask, stream, ask_async, stream_async, Agent
__all__ = ["get_logger", "ask", "stream", "ask_async", "stream_async", "Agent"]

139
alveslib/agent.py Normal file
View File

@@ -0,0 +1,139 @@
"""
Thin async + sync wrappers over the Anthropic SDK for quick scripting and agent
patterns. Use this when you want direct API access with streaming; for full
agentic loops with file tools use `claude-agent-sdk` (pip install claude-agent-sdk).
Usage:
from alveslib.agent import ask, stream, Agent
# One-shot
reply = ask("Summarize this data: ...")
# Streaming to stdout
stream("Write a FastAPI endpoint that ...")
# Multi-turn agent
agent = Agent(system="You are an expert Python dev.")
reply = agent.chat("Generate a Celery task that processes CSV files")
follow = agent.chat("Now add error handling and retries")
"""
import os
import asyncio
from typing import Iterator, AsyncIterator
try:
import anthropic
_client: anthropic.Anthropic | None = anthropic.Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
_async_client: anthropic.AsyncAnthropic | None = anthropic.AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
except ImportError:
_client = None
_async_client = None
DEFAULT_MODEL = "claude-sonnet-4-5"
def _require_client() -> "anthropic.Anthropic":
if _client is None:
raise ImportError("pip install anthropic")
if not os.environ.get("ANTHROPIC_API_KEY"):
raise RuntimeError("ANTHROPIC_API_KEY not set")
return _client
def ask(prompt: str, system: str = "", model: str = DEFAULT_MODEL) -> str:
"""One-shot blocking request; returns full text."""
client = _require_client()
msg = client.messages.create(
model=model,
max_tokens=8096,
system=system or anthropic.NOT_GIVEN,
messages=[{"role": "user", "content": prompt}],
)
return msg.content[0].text
def stream(prompt: str, system: str = "", model: str = DEFAULT_MODEL) -> Iterator[str]:
"""Streaming generator; yields text deltas. Print as they arrive."""
client = _require_client()
with client.messages.stream(
model=model,
max_tokens=8096,
system=system or anthropic.NOT_GIVEN,
messages=[{"role": "user", "content": prompt}],
) as s:
yield from s.text_stream
async def ask_async(prompt: str, system: str = "", model: str = DEFAULT_MODEL) -> str:
"""Async one-shot request."""
if _async_client is None:
raise ImportError("pip install anthropic")
msg = await _async_client.messages.create(
model=model,
max_tokens=8096,
system=system or anthropic.NOT_GIVEN,
messages=[{"role": "user", "content": prompt}],
)
return msg.content[0].text
async def stream_async(
prompt: str, system: str = "", model: str = DEFAULT_MODEL
) -> AsyncIterator[str]:
"""Async streaming generator."""
if _async_client is None:
raise ImportError("pip install anthropic")
async with _async_client.messages.stream(
model=model,
max_tokens=8096,
system=system or anthropic.NOT_GIVEN,
messages=[{"role": "user", "content": prompt}],
) as s:
async for text in s.text_stream:
yield text
class Agent:
"""Stateful multi-turn conversation agent with optional system prompt."""
def __init__(self, system: str = "", model: str = DEFAULT_MODEL):
self.system = system
self.model = model
self.history: list[dict] = []
def chat(self, prompt: str) -> str:
client = _require_client()
self.history.append({"role": "user", "content": prompt})
msg = client.messages.create(
model=self.model,
max_tokens=8096,
system=self.system or anthropic.NOT_GIVEN,
messages=self.history,
)
reply = msg.content[0].text
self.history.append({"role": "assistant", "content": reply})
return reply
async def chat_async(self, prompt: str) -> str:
if _async_client is None:
raise ImportError("pip install anthropic")
self.history.append({"role": "user", "content": prompt})
msg = await _async_client.messages.create(
model=self.model,
max_tokens=8096,
system=self.system or anthropic.NOT_GIVEN,
messages=self.history,
)
reply = msg.content[0].text
self.history.append({"role": "assistant", "content": reply})
return reply
def reset(self) -> None:
self.history.clear()

83
alveslib/logger.py Normal file
View File

@@ -0,0 +1,83 @@
import json
import logging
import os
from datetime import datetime
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
try:
from logging_loki import LokiHandler
LOKI_AVAILABLE = True
except ImportError:
LOKI_AVAILABLE = False
def get_logger(service_name: str, level: str = "INFO") -> logging.Logger:
"""
Get a configured logger for UltiPlate services.
Args:
service_name: Name of the service/module
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
Returns:
Configured logger instance
"""
logger = logging.getLogger(service_name)
logger.setLevel(getattr(logging, level.upper()))
if not logger.handlers:
# Console handler with JSON formatting
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter(service_name))
logger.addHandler(handler)
# File handler - writes to logs directory
logs_dir = Path(os.getenv("LOGDIR", "./logs"))
logs_dir.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(logs_dir / f"{service_name}.log")
file_handler.setFormatter(JsonFormatter(service_name))
logger.addHandler(file_handler)
# Loki handler - sends logs directly to Loki
if LOKI_AVAILABLE:
loki_port = os.getenv("LOKI_PORT", "3100")
loki_url = f"http://localhost:{loki_port}/loki/api/v1/push"
try:
loki_handler = LokiHandler(
url=loki_url,
tags={"service": service_name},
version="1"
)
logger.addHandler(loki_handler)
except Exception as e:
# If Loki is not available, just continue with file/console logging
pass
return logger
class JsonFormatter(logging.Formatter):
def __init__(self, service_name: str):
super().__init__()
self.service_name = service_name
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"service": self.service_name,
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)

32
alveslib/project.json Normal file
View File

@@ -0,0 +1,32 @@
{
"name": "alveslib",
"root": "alveslib",
"sourceRoot": "alveslib",
"projectType": "library",
"targets": {
"build": {
"executor": "nx:run-commands",
"options": {
"command": "python3.12 -m compileall alveslib"
}
},
"lint": {
"executor": "nx:run-commands",
"options": {
"command": ".venv/bin/ruff check alveslib"
}
},
"typecheck": {
"executor": "nx:run-commands",
"options": {
"command": ".venv/bin/mypy alveslib"
}
},
"test": {
"executor": "nx:run-commands",
"options": {
"command": ".venv/bin/pytest alveslib -v"
}
}
}
}

70
alveslib/scraper.py Normal file
View File

@@ -0,0 +1,70 @@
import hashlib
import pickle
import os
from pathlib import Path
from seleniumbase import SB
from bs4 import BeautifulSoup
from typing import Optional
class ScraperCache:
def __init__(self, cache_dir: str = ".scraper_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_key(self, url: str) -> str:
return hashlib.md5(url.encode()).hexdigest()
def _get_cache_path(self, cache_key: str) -> Path:
return self.cache_dir / f"{cache_key}.pkl"
def get(self, url: str) -> Optional[BeautifulSoup]:
cache_key = self._get_cache_key(url)
cache_path = self._get_cache_path(cache_key)
if cache_path.exists():
try:
with open(cache_path, 'rb') as f:
return pickle.load(f)
except:
pass
return None
def set(self, url: str, soup: BeautifulSoup) -> None:
cache_key = self._get_cache_key(url)
cache_path = self._get_cache_path(cache_key)
try:
with open(cache_path, 'wb') as f:
pickle.dump(soup, f)
except:
pass
_cache = ScraperCache() # glob
def scrape_url(url: str, use_cache: bool = True) -> BeautifulSoup:
if use_cache:
cached_soup = _cache.get(url)
if cached_soup:
return cached_soup
with SB(test=True, uc=True) as sb:
sb.open(url)
html = sb.get_page_source()
soup = BeautifulSoup(html, 'html.parser')
if use_cache:
_cache.set(url, soup)
return soup
if __name__ == "__main__":
url = "https://httpbin.org/html"
print("Testing scraper...")
soup = scrape_url(url)
print(f"Title: {soup.title.text if soup.title else 'No title'}")
print(f"Found {len(soup.find_all('p'))} paragraphs")
print("\nTesting cache...")
soup2 = scrape_url(url)
print("Cache test completed")