taskoteka: add client for git.altlinux.org/tasks/api

parent 55826f6d
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
- **Watch** — отслеживание устаревших пакетов через [watch.altlinux.org](https://watch.altlinux.org), как по отдельному мейнтейнеру, так и по всему репозиторию целиком - **Watch** — отслеживание устаревших пакетов через [watch.altlinux.org](https://watch.altlinux.org), как по отдельному мейнтейнеру, так и по всему репозиторию целиком
- **FTBFS** — список пакетов с ошибками пересборки - **FTBFS** — список пакетов с ошибками пересборки
- **AppStream** — загрузка, кеширование и поиск по AppStream-метаданным пакетов - **AppStream** — загрузка, кеширование и поиск по AppStream-метаданным пакетов
- **Taskoteka** — клиент для [git.altlinux.org/tasks/api](https://git.altlinux.org/tasks/api): быстрый доступ к задачам girar, фильтрация, полнотекстовый поиск и подписка на изменения через SSE
## Установка ## Установка
...@@ -109,6 +110,39 @@ news = await client.parser.news.packages_by_range( ...@@ -109,6 +110,39 @@ news = await client.parser.news.packages_by_range(
) )
``` ```
### Taskoteka — задачи girar
Быстрый доступ к данным о задачах через [Taskoteka API](https://git.altlinux.org/tasks/api), без необходимости SSH-доступа:
```python
# Список активных задач
tasks = await client.taskoteka.tasks()
for t in tasks[:5]:
print(f" #{t.id} [{t.state}] {t.repo} {t.owner}")
# Фильтрация по состоянию, репозиторию, пользователю
tasks = await client.taskoteka.tasks(state="EPERM", repo="p11")
tasks = await client.taskoteka.tasks(user="rider")
# Полнотекстовый поиск
tasks = await client.taskoteka.tasks(q="rider EPERM php")
for t in tasks:
print(f" #{t.id} matches={t.matches}")
# Только ID задач
ids = await client.taskoteka.tasks(state="BUILDING", brief=True)
# Полная информация о задаче
task = await client.taskoteka.tasks(task_id=389466)
print(f"#{task.taskid} [{task.state}] {task.owner}: {task.message}")
for num, sub in task.subtasks.items():
print(f" {num}: {sub.pkgname} ({sub.type})")
# Подписка на изменения в реальном времени (SSE)
async for event in client.taskoteka.events():
print(f"{event.event}: задача #{event.data.id} {event.data.state}")
```
### FTBFS — ошибки пересборки ### FTBFS — ошибки пересборки
```python ```python
......
...@@ -4,6 +4,7 @@ from .api import ALTRepoAPI ...@@ -4,6 +4,7 @@ from .api import ALTRepoAPI
from .appstream import ALTRepoAppStream from .appstream import ALTRepoAppStream
from .config import ALTRepoConfig from .config import ALTRepoConfig
from .parser import ALTRepoParser from .parser import ALTRepoParser
from .taskoteka import ALTRepoTaskoteka
class ALTRepo: class ALTRepo:
...@@ -14,6 +15,7 @@ class ALTRepo: ...@@ -14,6 +15,7 @@ class ALTRepo:
self.api: ALTRepoAPI | None = None self.api: ALTRepoAPI | None = None
self.appstream: ALTRepoAppStream | None = None self.appstream: ALTRepoAppStream | None = None
self.parser: ALTRepoParser | None = None self.parser: ALTRepoParser | None = None
self.taskoteka: ALTRepoTaskoteka | None = None
async def init(self, session: aiohttp.ClientSession | None = None): async def init(self, session: aiohttp.ClientSession | None = None):
if self._session is None: if self._session is None:
...@@ -21,6 +23,7 @@ class ALTRepo: ...@@ -21,6 +23,7 @@ class ALTRepo:
self.api = ALTRepoAPI(self._session, self.config) self.api = ALTRepoAPI(self._session, self.config)
self.appstream = ALTRepoAppStream(self._session, self.config) self.appstream = ALTRepoAppStream(self._session, self.config)
self.parser = ALTRepoParser(self._session, self.config) self.parser = ALTRepoParser(self._session, self.config)
self.taskoteka = ALTRepoTaskoteka(self._session, self.config)
async def close(self): async def close(self):
await self._session.close() await self._session.close()
......
...@@ -14,3 +14,4 @@ class ALTRepoConfig: ...@@ -14,3 +14,4 @@ class ALTRepoConfig:
) )
appstream_branches: list[str] = field(default_factory=lambda: ["sisyphus", "p11"]) appstream_branches: list[str] = field(default_factory=lambda: ["sisyphus", "p11"])
appstream_dir: str = "appstream" appstream_dir: str = "appstream"
taskoteka_base_url: str = "https://git.altlinux.org/tasks/api"
from .methods import ALTRepoTaskoteka
import asyncio
import json
import ssl
import aiohttp
from .models import (
SSEEventModel,
HeartbeatEventModel,
TaskNewEventModel,
TaskUpdatedEventModel,
TaskRemovedEventModel,
ReconnectEventModel,
ReconnectDataModel,
UnknownEventModel,
TaskEventDataModel,
)
_TASK_EVENT_MODELS = {
"task_new": TaskNewEventModel,
"task_updated": TaskUpdatedEventModel,
"task_removed": TaskRemovedEventModel,
}
class EventStream:
def __init__(
self,
base_url: str,
reconnect_delay: float = 3.0,
max_reconnect_delay: float = 60.0,
):
self._url = f"{base_url}/events"
self._reconnect_delay = reconnect_delay
self._max_reconnect_delay = max_reconnect_delay
self._last_event_id: str | None = None
async def stream(self):
sslctx = ssl.create_default_context()
sslctx.set_alpn_protocols(["http/1.1"])
connector = aiohttp.TCPConnector(ssl=sslctx)
session = aiohttp.ClientSession(connector=connector)
delay = self._reconnect_delay
try:
while True:
try:
headers = {}
if self._last_event_id:
headers["Last-Event-ID"] = self._last_event_id
async with session.get(self._url, headers=headers) as resp:
resp.raise_for_status()
delay = self._reconnect_delay
async for event in self._parse_stream(resp):
if event.event == "reconnect":
break
if event.event == "heartbeat":
continue
yield event
except (aiohttp.ClientError, asyncio.TimeoutError):
await asyncio.sleep(delay)
delay = min(delay * 2, self._max_reconnect_delay)
finally:
await session.close()
async def _parse_stream(self, resp: aiohttp.ClientResponse):
event_type = ""
data_lines = []
event_id = ""
buf = ""
async for chunk in resp.content.iter_any():
buf += chunk.decode("utf-8")
while "\n" in buf:
raw_line, buf = buf.split("\n", 1)
line = raw_line.rstrip("\r")
if line == "":
if data_lines:
data_str = "\n".join(data_lines)
try:
data = json.loads(data_str)
except json.JSONDecodeError:
data = None
if event_id:
self._last_event_id = event_id
yield _build_event(event_type or "message", data)
event_type = ""
data_lines = []
event_id = ""
elif line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data_lines.append(line[5:].strip())
elif line.startswith("id:"):
event_id = line[3:].strip()
elif line.startswith("retry:"):
try:
self._reconnect_delay = int(line[6:].strip()) / 1000
except ValueError:
pass
def _build_event(event_type: str, data) -> SSEEventModel:
if event_type == "heartbeat":
return HeartbeatEventModel(event=event_type, data=data)
if event_type == "reconnect":
return ReconnectEventModel(event=event_type, data=ReconnectDataModel(**data))
if event_type in _TASK_EVENT_MODELS:
cls = _TASK_EVENT_MODELS[event_type]
return cls(event=event_type, data=TaskEventDataModel(**data))
return UnknownEventModel(event=event_type, data=data)
import aiohttp
from typing import Literal, overload
from ..api.methods import BaseAPI
from . import models
from .events import EventStream
class ALTRepoTaskoteka:
def __init__(self, session: aiohttp.ClientSession, config: "ALTRepoConfig"):
self._client = BaseAPI(session, config.taskoteka_base_url)
self._events = EventStream(config.taskoteka_base_url)
async def health(self) -> models.HealthModel:
data = await self._client.get("/health")
return models.HealthModel(**data)
async def owners(self) -> list[str]:
return await self._client.get("/owners")
@overload
async def tasks(
self, *, task_id: int, brief: Literal[True],
) -> int: ...
@overload
async def tasks(
self, *, task_id: int, brief: Literal[False] = ...,
) -> models.TaskModel: ...
@overload
async def tasks(
self, *,
state: str | None = ...,
repo: str | None = ...,
user: str | None = ...,
needs_approval: Literal["maint", "tester"] | None = ...,
q: str | None = ...,
brief: Literal[True],
) -> list[int]: ...
@overload
async def tasks(
self, *,
state: str | None = ...,
repo: str | None = ...,
user: str | None = ...,
needs_approval: Literal["maint", "tester"] | None = ...,
q: str | None = ...,
brief: Literal[False] = ...,
) -> list[models.TaskSummaryModel]: ...
async def tasks(
self,
task_id: int | None = None,
state: str | None = None,
repo: str | None = None,
user: str | None = None,
needs_approval: Literal["maint", "tester"] | None = None,
q: str | None = None,
brief: bool = False,
) -> list[models.TaskSummaryModel] | list[int] | models.TaskModel | int:
if task_id is not None:
params = {"brief": "true"} if brief else None
data = await self._client.get(f"/tasks/{task_id}", params)
if brief:
return data
return models.TaskModel(**data)
params = {
k: v
for k, v in {
"state": state,
"repo": repo,
"user": user,
"needs_approval": needs_approval,
"q": q,
"brief": "true" if brief else None,
}.items()
if v is not None
}
data = await self._client.get("/tasks", params or None)
if brief:
return data
return [models.TaskSummaryModel(**item) for item in data]
def events(self):
return self._events.stream()
from pydantic import BaseModel, Field
from typing import Any, Literal
class HealthModel(BaseModel):
status: str
total_tasks: int
active_tasks: int
class ApprovalCommentModel(BaseModel):
date: str
text: str
class SubtaskSummaryModel(BaseModel):
dir: str | None = None
tag_name: str | None = None
type: str | None = None
userid: str | None = None
pkgname: str | None = None
class SubtaskModel(BaseModel):
dir: str | None = None
tag_name: str | None = None
tag_id: str | None = None
tag_author: str | None = None
spec_type: str | None = None
fetched: str | None = None
rebuild_from: str | None = None
type: str | None = None
srpm: str | None = None
copy_repo: str | None = None
pkgname: str | None = None
approved_by: list[str] = Field(default_factory=list)
disapproved_by: list[str] = Field(default_factory=list)
approval_comments: dict[str, ApprovalCommentModel] | None = None
disapproval_comments: dict[str, ApprovalCommentModel] | None = None
relax_git_inheritance_check_for_commit: str | None = None
relax_lastchange_inheritance_check_for_evr: str | None = None
userid: str | None = None
epoch: str | None = None
version: str | None = None
release: str | None = None
build: dict[str, str] = Field(default_factory=dict)
class TaskSummaryModel(BaseModel):
model_config = {"populate_by_name": True}
id: int
state: str
repo: str
owner: str
build_time: str | None = None
age: int
created: str | None = None
test_only: bool
fail_early: bool
try_: int = Field(alias="try")
iter: int
message: str | None = None
subtasks: dict[str, SubtaskSummaryModel] = Field(default_factory=dict)
matches: list[str] | None = None
class TaskModel(BaseModel):
model_config = {"populate_by_name": True}
updated: int
taskid: int
shared: bool
fail_early: bool
test_only: bool
swift: bool | None = None
aborted_by: str | None = None
repo: str
state: str
try_: int = Field(alias="try")
iter: int
depends: list[int] = Field(default_factory=list)
age: int
created: str | None = None
build_time: str | None = None
message: str | None = None
owner: str
subtasks: dict[str, SubtaskModel] = Field(default_factory=dict)
class TaskEventDataModel(BaseModel):
id: int
state: str | None = None
prev_state: str | None = None
owner: str | None = None
class ReconnectDataModel(BaseModel):
reason: str
class HeartbeatEventModel(BaseModel):
event: Literal["heartbeat"]
data: int
class TaskNewEventModel(BaseModel):
event: Literal["task_new"]
data: TaskEventDataModel
class TaskUpdatedEventModel(BaseModel):
event: Literal["task_updated"]
data: TaskEventDataModel
class TaskRemovedEventModel(BaseModel):
event: Literal["task_removed"]
data: TaskEventDataModel
class ReconnectEventModel(BaseModel):
event: Literal["reconnect"]
data: ReconnectDataModel
class UnknownEventModel(BaseModel):
event: str
data: Any = None
SSEEventModel = (
HeartbeatEventModel
| TaskNewEventModel
| TaskUpdatedEventModel
| TaskRemovedEventModel
| ReconnectEventModel
| UnknownEventModel
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment