initial: extract altrepo module as standalone library

parents
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.venv/
import aiohttp
from .api import ALTRepoAPI
from .appstream import ALTRepoAppStream
from .config import ALTRepoConfig
from .parser import ALTRepoParser
class ALTRepo:
def __init__(self, config: ALTRepoConfig | None = None):
self.config = config or ALTRepoConfig()
self._session: aiohttp.ClientSession | None = None
self.api: ALTRepoAPI | None = None
self.appstream: ALTRepoAppStream | None = None
self.parser: ALTRepoParser | None = None
async def init(self, session: aiohttp.ClientSession | None = None):
if self._session is None:
self._session = session or aiohttp.ClientSession()
self.api = ALTRepoAPI(self._session, self.config)
self.appstream = ALTRepoAppStream(self._session, self.config)
self.parser = ALTRepoParser(self._session, self.config)
async def close(self):
await self._session.close()
__all__ = ("ALTRepo", "ALTRepoConfig")
from .methods import ALTRepoAPI
class RequestValidationError(Exception):
"""Ошибка валидации параметров запроса (400)"""
pass
class DataNotFoundError(Exception):
"""Данные не найдены в базе (404)"""
pass
class TooManyRequests(Exception):
"""Cлишком много запросо (429)"""
pass
class UnexpectedResponseError(Exception):
"""Непредвиденный ответ от сервера"""
def __init__(self, status_code: int, message: str = ""):
self.status_code = status_code
self.message = message or f"Unexpected response status: {status_code}"
super().__init__(self.message)
from .methods import ALTRepoAppStream
import os
import aiohttp
from lxml import etree
class AppStreamClient:
def __init__(self, session: aiohttp.ClientSession, appstream_url: str):
self.session = session
self._appstream_url = appstream_url
async def _handle_response(self, resp: aiohttp.ClientResponse):
match resp.status:
case 200 | 201:
return await resp.text(encoding="utf-8")
case _:
return None
async def get(self, branch: str) -> str | None:
url = self._appstream_url.format(branch=branch)
async with self.session.get(url) as resp:
return await self._handle_response(resp)
class DataInfo:
def __init__(self, client: AppStreamClient, appstream_dir: str):
self.client = client
self.dir = appstream_dir
self._on_update = None
os.makedirs(self.dir, exist_ok=True)
async def load_by_branch(self, branch: str, version: str):
text = await self.client.get(branch)
if not text:
return
old_file = self.get_file_path(branch)
if old_file:
os.remove(old_file)
filename = f"{branch}-{version}.xml"
filepath = os.path.join(self.dir, filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(text)
if self._on_update:
self._on_update(branch)
def get_file_path(self, branch: str) -> str | None:
for fname in os.listdir(self.dir):
if fname.startswith(f"{branch}-") and fname.endswith(".xml"):
return os.path.join(self.dir, fname)
return None
def get_current_version(self, branch: str) -> str | None:
path = self.get_file_path(branch)
if path:
fname = os.path.basename(path)
return fname[len(branch) + 1 : -4]
return None
def has_file(self, branch: str) -> bool:
return self.get_file_path(branch) is not None
class PackageInfo:
def __init__(self, client: AppStreamClient, data: DataInfo):
self.client = client
self._data = data
self._cache: dict[str, dict[str, str]] = {}
def rebuild_cache(self, branch: str) -> None:
path = self._data.get_file_path(branch)
if not path:
self._cache.pop(branch, None)
return
result: dict[str, str] = {}
context = etree.iterparse(path, events=("end",), recover=True)
for _, elem in context:
if elem.tag == "component" and elem.get("type") != "addon":
pkgname = elem.findtext("pkgname")
app_id = elem.findtext("id")
if pkgname and app_id:
result[pkgname] = app_id
elem.clear()
self._cache[branch] = result
def id_by_pkgname(self, pkgname: str, branch: str) -> str | None:
if branch not in self._cache:
return None
return self._cache[branch].get(pkgname)
class ALTRepoAppStream:
def __init__(self, session: aiohttp.ClientSession, config: "ALTRepoConfig"):
self.branches = config.appstream_branches
self._client = AppStreamClient(session, config.appstream_url)
self.data = DataInfo(self._client, config.appstream_dir)
self.package = PackageInfo(self._client, self.data)
self.data._on_update = self.package.rebuild_cache
for branch in self.branches:
if self.data.has_file(branch):
self.package.rebuild_cache(branch)
from dataclasses import dataclass, field
@dataclass
class ALTRepoConfig:
api_base_url: str = "https://rdb.altlinux.org/api"
cybertalk_url: str = "https://lists.altlinux.org/pipermail/sisyphus-cybertalk/{}/"
ftbfs_url: str = "https://git.altlinux.org/beehive/stats/Sisyphus-x86_64/ftbfs-joined"
watch_url: str = "https://watch.altlinux.org/pub/watch/{by_acl}/{nickname}.txt"
appstream_url: str = (
"https://git.altlinux.org/gears/a/appstream-data-desktop.git?"
"a=blob_plain;f=xmls/altlinux.xml;hb=refs/heads/{branch}"
)
appstream_branches: list[str] = field(default_factory=lambda: ["sisyphus", "p11"])
appstream_dir: str = "appstream"
from .methods import ALTRepoParser
import aiohttp
from datetime import date
from typing import List, Literal
from . import models
from .news import urls_parser, urls_for_range, packages_parser, bugs_parser
from .packages import ftbfs_parser, watch_parser
class BaseParser:
def __init__(self, session: aiohttp.ClientSession):
self.session = session
async def get(self, url: str, encoding: str = "utf-8"):
async with self.session.get(url) as resp:
resp.raise_for_status()
return await resp.text(encoding=encoding)
async def get_bytes(self, url: str) -> bytes:
async with self.session.get(url) as resp:
resp.raise_for_status()
return await resp.read()
class NewsInfo:
def __init__(self, client: BaseParser, cybertalk_url: str):
self.client = client
self._cybertalk_url = cybertalk_url
async def news_urls(self) -> models.NewsURL:
return await urls_parser(self.client, self._cybertalk_url)
async def _get_packages(self, branch: str) -> models.PackagesModel | None:
url = getattr(await self.news_urls(), branch, None)
if not url:
return None
html = await self.client.get(url, "koi8-r")
return await packages_parser(html, url, self.client)
async def bugs(self) -> models.BugsModel | None:
url = (await self.news_urls()).bugs
if not url:
return None
html = await self.client.get(url, "koi8-r")
return await bugs_parser(html, url)
async def sisyphus(self) -> models.PackagesModel | None:
return await self._get_packages("sisyphus")
async def p11(self) -> models.PackagesModel | None:
return await self._get_packages("p11")
async def p10(self) -> models.PackagesModel | None:
return await self._get_packages("p10")
async def bugs_by_range(
self, date_from: date, date_to: date
) -> dict[str, int] | None:
urls = await urls_for_range(self.client, date_from, date_to, self._cybertalk_url, "bugs")
if not urls:
return None
totals = {
"quickly_resolved": 0, "new": 0, "old": 0,
"resolved": 0, "reopened": 0, "random": 0,
}
for _, url in urls:
html = await self.client.get(url, "koi8-r")
data = await bugs_parser(html, url)
if data and isinstance(data, models.BugsModel):
for key in totals:
items = getattr(data, key, None)
if items:
totals[key] += len(items)
if all(v == 0 for v in totals.values()):
return None
return totals
async def packages_by_range(
self, date_from: date, date_to: date
) -> models.PackagesModel | None:
urls = await urls_for_range(self.client, date_from, date_to, self._cybertalk_url)
if not urls:
return None
all_packages = []
for _, url in urls:
html = await self.client.get(url, "koi8-r")
packages = await packages_parser(html, url, self.client)
if packages and isinstance(packages, models.PackagesModel):
all_packages.append(packages)
if not all_packages:
return None
if len(all_packages) == 1:
return all_packages[0]
return _aggregate_packages(all_packages)
def _aggregate_packages(
packages_list: list[models.PackagesModel],
) -> models.PackagesModel:
latest_added = {}
latest_removed = {}
latest_updated = {}
for packages in packages_list:
for pkg in (packages.added or []):
latest_added[pkg.name] = pkg
for pkg in (packages.removed or []):
latest_removed[pkg.name] = pkg
for pkg in (packages.updated or []):
latest_updated[pkg.name] = pkg
# Добавлен + удалён → убираем из обоих
cancelled = set(latest_added) & set(latest_removed)
for name in cancelled:
del latest_added[name]
del latest_removed[name]
# Добавлен + обновлён → оставляем в "добавлено" с последними данными
for name in list(latest_updated):
if name in latest_added:
latest_added[name] = latest_updated.pop(name)
elif name in latest_removed:
del latest_updated[name]
added = list(latest_added.values())
removed = list(latest_removed.values())
updated = list(latest_updated.values())
return models.PackagesModel(
url=packages_list[-1].url,
total=packages_list[-1].total,
added=added or None,
removed=removed or None,
updated=updated or None,
)
class PackagesInfo:
def __init__(self, client: BaseParser, ftbfs_url: str, watch_url: str):
self.client = client
self._ftbfs_url = ftbfs_url
self._watch_url = watch_url
async def ftbfs(self) -> List[models.FTBFSModel]:
text = await self.client.get(self._ftbfs_url)
return ftbfs_parser(text)
async def watch_by_maintainer(
self,
maintainer_nickname: str,
by_acl: Literal["by-acl", "by-expanded-acl", "by-expanded-leader", "by-leader"],
) -> List[models.WatchByMaintainerModel]:
url = self._watch_url.format(by_acl=by_acl, nickname=maintainer_nickname)
try:
text = await self.client.get(url)
return watch_parser(text)
except:
return []
class ALTRepoParser:
def __init__(self, session: aiohttp.ClientSession, config: "ALTRepoConfig"):
self._client = BaseParser(session)
self.news = NewsInfo(self._client, config.cybertalk_url)
self.packages = PackagesInfo(self._client, config.ftbfs_url, config.watch_url)
from pydantic import BaseModel
from typing import List
class NewsURL(BaseModel):
sisyphus: str | None = None
p11: str | None = None
p10: str | None = None
bugs: str | None = None
class BugsElementModel(BaseModel):
id: int
component: str
priority: str
status: str | None = None
summary: str
class BugsModel(BaseModel):
url: str
quickly_resolved: List[BugsElementModel] | None = None
new: List[BugsElementModel] | None = None
old: List[BugsElementModel] | None = None
resolved: List[BugsElementModel] | None = None
reopened: List[BugsElementModel] | None = None
random: List[BugsElementModel] | None = None
class RemovedPackageElementModel(BaseModel):
name: str
version: str
class PackageElementModel(BaseModel):
name: str
description: str
maintainer_name: str
maintainer_nick: str
class PackagesModel(BaseModel):
url: str
total: int
added: List[PackageElementModel] | None = None
removed: List[RemovedPackageElementModel] | None = None
updated: List[PackageElementModel] | None = None
class FTBFSModel(BaseModel):
name: str
version: str
ftbfs_weeks: int
maintainers: List[str]
class WatchByMaintainerModel(BaseModel):
pkg_name: str
old_version: str
new_version: str
url: str
from .urls import urls_parser, urls_for_range
from .packages import packages_parser
from .bugs import bugs_parser
from bs4 import BeautifulSoup
import re
from .. import models
async def bugs_parser(html: str, url: str):
soup = BeautifulSoup(html, "html.parser")
pre_tag = soup.find("pre")
if not pre_tag:
return {}
text = pre_tag.get_text()
lines = text.strip().splitlines()
data = {}
section_name = None
bug_pattern = re.compile(r"#(\d+)\s+([^\t]+)\s+([^\t]+)\s+([^\t]+)")
description_buffer = ""
current_bug = None
for line in lines:
header_match = re.match(
r"^\s*(\d+)\s+(NEW|RESOLVED|REOPENED|RANDOM|OLD)\s+bugs?.*", line)
if header_match:
if current_bug and section_name:
current_bug["summary"] = description_buffer.strip()
data.setdefault(section_name, []).append(current_bug)
current_bug = None
description_buffer = ""
section_name = _get_bug_section_name(line)
continue
bug_match = bug_pattern.match(line)
if bug_match:
if current_bug and section_name:
current_bug["summary"] = description_buffer.strip()
data.setdefault(section_name, []).append(current_bug)
status = bug_match.group(4).strip()
if status == "---":
status = None
current_bug = {
"id": int(bug_match.group(1)),
"component": bug_match.group(2).strip(),
"priority": bug_match.group(3).strip(),
"status": status
}
description_buffer = ""
elif current_bug:
if not line.strip():
continue
if re.match(r"^Total\s+\d+\s+pending bugs", line.strip(), re.IGNORECASE):
break
description_buffer += line.strip() + " "
if current_bug and section_name:
current_bug["summary"] = description_buffer.strip()
data.setdefault(section_name, []).append(current_bug)
data["url"] = url
return models.BugsModel(**data)
def _get_bug_section_name(line: str) -> str:
line = line.lower()
if "new" in line and "resolved" in line:
return "quickly_resolved"
elif "new" in line:
return "new"
elif "old" in line:
return "old"
elif "resolved" in line:
return "resolved"
elif "reopened" in line:
return "reopened"
elif "random" in line:
return "random"
from bs4 import BeautifulSoup
import re
import io
import gzip
from .. import models
async def packages_parser(html: str, url: str, client=None):
soup = BeautifulSoup(html, "html.parser")
pre_tag = soup.find("pre")
if not pre_tag:
return {}
pre_text = pre_tag.get_text(strip=True)
if "Было удалено вложение" in pre_text and "attachment" in pre_text:
attachment_link = pre_tag.find("a", href=True)
if attachment_link and client:
attachment_url = attachment_link["href"]
compressed_data = await client.get_bytes(attachment_url)
with gzip.GzipFile(fileobj=io.BytesIO(compressed_data)) as gz:
text = gz.read().decode('utf-8')
else:
return models.PackagesModel(**{"added": [], "removed": [], "updated": [], "url": "none"})
else:
text = pre_tag.get_text()
lines = text.strip().splitlines()
sections = {"added": [], "removed": [], "updated": []}
current_section = None
current_package = {}
seen_changelog = False
for line in lines:
# Смена секции
if re.search(r"^\s*\d+\s+ADDED package[s]?", line):
if current_package and current_section:
sections[current_section].append(current_package)
current_section = "added"
current_package = {}
seen_changelog = False
continue
elif re.search(r"^\s*\d+\s+REMOVED package[s]?", line):
if current_package and current_section:
sections[current_section].append(current_package)
current_section = "removed"
current_package = {}
seen_changelog = False
continue
elif re.search(r"^\s*\d+\s+UPDATED packages[s]?", line):
if current_package and current_section:
sections[current_section].append(current_package)
current_section = "updated"
current_package = {}
seen_changelog = False
continue
if current_section == "removed":
if line.strip():
parts = line.strip().split("\t")
if len(parts) == 2:
name, version = parts
sections["removed"].append({
"name": name.strip(),
"version": version.strip()
})
continue
match = re.match(r"^(\S+)\s+-\s+(.*)", line)
if match and not line.startswith("- "):
if current_package and current_section:
sections[current_section].append(current_package)
current_package = {
"name": match.group(1),
"description": _clean_description(match.group(2)),
}
seen_changelog = False
continue
if (
current_package
and current_section != "removed"
and not seen_changelog
):
match = re.match(r"^\*\s+\w+\s+\w+\s+\d+\s+\d+\s+(.+?) <([^@\s>]+)(?:@altlinux| на altlinux)>", line)
if match:
current_package["maintainer_name"] = match.group(1).strip()
current_package["maintainer_nick"] = match.group(2).strip()
seen_changelog = True
continue
match = re.search(r'^Total (\d+) source packages\.$', line)
if match:
sections["total"] = int(match.group(1))
if current_package and current_section:
sections[current_section].append(current_package)
sections["url"] = url
return models.PackagesModel(**sections)
def _clean_description(desc: str):
desc = desc.strip()
desc = re.sub(r'\s+', ' ', desc)
desc = re.sub(r'\[\d+[KMG]?\]', '', desc).strip()
return desc
from bs4 import BeautifulSoup
import re
from datetime import datetime, date
from .. import models
async def urls_parser(client, cybertalk_url: str):
now = datetime.now()
year_month = f"{now.year}-{now.strftime("%B")}"
today = now.strftime("%Y%m%d")
base_url = cybertalk_url.format(year_month)
html = await client.get(f"{base_url}date.html", "koi8-r")
soup = BeautifulSoup(html, "html.parser")
result = {"sisyphus": None, "bugs": None, "p11": None}
for li in soup.find_all("li"):
a = li.find("a")
if not a or not a.get("href"):
continue
href = a["href"]
text = a.get_text(strip=True)
url = base_url + href
if f"Sisyphus-{today} packages" in text:
result["sisyphus"] = url
elif f"Sisyphus-{today} bugs" in text:
result["bugs"] = url
elif "p11/branch packages" in text and await _check_date(url, client):
result["p11"] = url
elif "p10/branch packages" in text and await _check_date(url, client):
result["p10"] = url
return models.NewsURL(**result)
async def urls_for_range(
client, date_from: date, date_to: date,
cybertalk_url: str, news_type: str = "packages",
) -> list[tuple[date, str]]:
results = []
pattern = re.compile(rf"Sisyphus-(\d{{8}}) {re.escape(news_type)}")
current = date_from.replace(day=1)
while current <= date_to:
year_month = f"{current.year}-{current.strftime('%B')}"
base_url = cybertalk_url.format(year_month)
try:
html = await client.get(f"{base_url}date.html", "koi8-r")
except:
current = _next_month(current)
continue
soup = BeautifulSoup(html, "html.parser")
for li in soup.find_all("li"):
a = li.find("a")
if not a or not a.get("href"):
continue
text = a.get_text(strip=True)
match = pattern.search(text)
if match:
news_date = datetime.strptime(match.group(1), "%Y%m%d").date()
if date_from <= news_date <= date_to:
results.append((news_date, base_url + a["href"]))
current = _next_month(current)
return sorted(results)
def _next_month(d: date) -> date:
if d.month == 12:
return d.replace(year=d.year + 1, month=1)
return d.replace(month=d.month + 1)
async def _check_date(url, client):
html = await client.get(url, "koi8-r")
soup = BeautifulSoup(html, "html.parser")
tag = soup.find("i")
if not tag:
return False
parts = tag.text.strip().split()
if len(parts) < 3:
return False
try:
return int(parts[2]) == datetime.now().day
except ValueError:
return False
from .ftbfs import ftbfs_parser
from .watch import watch_parser
from .. import models
def ftbfs_parser(text: str):
packages = []
for line in text.strip().splitlines():
parts = line.split('\t')
if len(parts) != 4:
continue
name, version, weeks, maintainers = parts
packages.append(models.FTBFSModel(
name = name,
version = version,
ftbfs_weeks = int(weeks),
maintainers = maintainers.split(','),
))
return packages
from .. import models
def watch_parser(text: str):
return [
models.WatchByMaintainerModel(
pkg_name=parts[0],
old_version=parts[1],
new_version=parts[2],
url=parts[3],
)
for parts in (line.split("\t") for line in text.strip().splitlines())
if len(parts) == 4
]
[project]
name = "altrepo"
version = "0.1.0"
description = "Async client for ALT Linux repository services (rdb API, packages parser, AppStream)"
authors = [
{name = "Kirill Unitsaev",email = "fiersik@altlinux.org"}
]
license = {text = "AGPL-3.0-or-later"}
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"aiohttp>=3.9.0",
"pydantic>=2.0.0",
"beautifulsoup4>=4.12.0",
"lxml>=5.0.0",
]
[tool.poetry]
packages = [{include = "altrepo"}]
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment