base: add roles system

parent 3f2f4a62
from . import admin as admin_keyboards
from . import start as start_keyboards
from . import menu as menu_keyboards
from . import profile as profile_keyboards
......
from telegrinder import InlineKeyboard, InlineButton
set_rule_kb = (
InlineKeyboard()
.add(InlineButton("Продолжить", callback_data="command/menu"))
).get_markup()
from telegrinder import Button, Keyboard
menu_kb = (
Keyboard()
.add(Button("Профиль"))
.row()
.add(Button("Отслеживание"))
.add(Button("Ошибки"))
.row()
.add(Button("Статистика"))
).get_markup()
from database.func import DB
def menu_kb(user_id: int):
user_rules = DB.user.get_roles(user_id)
kb = Keyboard()
kb.add(Button("Профиль"))
kb.row()
kb.add(Button("Отслеживание"))
kb.add(Button("Ошибки"))
kb.row()
if "news" in user_rules:
kb.add(Button("Новости"))
kb.row()
kb.add(Button("Статистика"))
return kb.get_markup()
import json
from .models import User
......@@ -17,6 +18,20 @@ class UserMethod:
user.save()
return True
return False
@classmethod
def add_role(cls, user_id: int, role: str):
"""добавление ролей пользователя"""
user = cls.get(user_id)
roles = cls.get_roles(user_id)
if user is None:
return False
if role not in roles:
roles.append(role)
user.roles = json.dumps(roles)
user.save()
return True
return False
@classmethod
def get(cls, user_id: int) -> User | None:
......@@ -28,6 +43,14 @@ class UserMethod:
def get_all(cls):
"""Получение записей всех пользователей"""
return [user for user in User.select()]
@classmethod
def get_roles(cls, user_id: int):
"""получение ролей пользователя"""
user = cls.get(user_id)
if user is None:
return []
return json.loads(user.roles)
@classmethod
def delete(cls, user_id: int):
......@@ -37,6 +60,20 @@ class UserMethod:
return False
user.delete_instance()
return True
@classmethod
def remove_role(cls, user_id: int, role: str):
"""удаление ролей пользователя"""
user = cls.get(user_id)
roles = cls.get_roles(user_id)
if user is None:
return False
if role in roles:
roles.remove(role)
user.roles = json.dumps(roles)
user.save()
return True
return False
@classmethod
def change_maintainer(cls, user_id: int, maintainer: str):
......
......@@ -26,6 +26,7 @@ class User(BaseModel):
default_branch = TextField( # Репозитоий по умолчанию
default="sisyphus"
)
roles = TextField(default="[]")
class Meta:
table_name = "users"
from telegrinder import Dispatch, Message
from telegrinder.rules import Command, Argument
from altrepo import altrepo
from data.keyboards import admin_keyboards
from database.func import DB
from rules import BotAdmin
from config import tg_api
dp = Dispatch()
dp.message.auto_rules = BotAdmin()
@dp.message(Command("altrepo_users"))
async def alrtrepo_users(m: Message):
users_data = DB.user.get_all()
user_message = ""
for user in users_data:
user_message += f"{user.user_id} | {user.maintainer} | {user.default_branch}\n"
await m.answer(user_message)
@dp.message(Command("altrepo_user", Argument("user_id")))
async def alrtrepo_users(m: Message, user_id: int):
if not user_id:
return
db_user = DB.user.get(user_id)
user_roles = DB.user.get_roles(user_id)
user = (await tg_api.get_chat(chat_id=user_id)).unwrap()
username = user.username.unwrap_or_none()
username = f"(@{username})\n" if username else "\n"
await m.answer(
f"{user.first_name.unwrap()} {user.last_name.unwrap()} {username}"
f"{user.first_name.unwrap_or("")} {user.last_name.unwrap_or("")} {username}"
f" Сопровождающий: {db_user.maintainer}\n"
f" Репозиторий: {db_user.default_branch}\n"
f" Роли: {", ".join(user_roles) or "пользователь"}\n"
)
@dp.message(
Command(
"altrepo_add_user",
Argument("user_id"),
Argument("maintainer"),
Argument("branch", optional=True),
)
)
async def alrtrepo_users(
m: Message, user_id: int, maintainer: str, branch: str | None = None
):
if not user_id or not maintainer:
return
if not branch:
branch = "sisyphus"
if DB.user.add(user_id, maintainer, branch):
await m.answer("Пользователь добавлен")
@dp.message(Command("altrepo_add_role", Argument("user_id"), Argument("role")))
async def alrtrepo_users(m: Message, user_id: int, role: str):
if not user_id or not role:
return
if DB.user.add_role(user_id, role):
await m.answer("Роль добавлена")
await tg_api.send_message(
chat_id=user_id,
text=f"Вам была выдана роль {role}",
reply_markup=admin_keyboards.set_rule_kb,
)
@dp.message(Command("altrepo_remove_role", Argument("user_id"), Argument("role")))
async def alrtrepo_users(m: Message, user_id: int, role: str):
if not user_id or not role:
return
print(1)
if DB.user.remove_role(user_id, role):
print(2)
await m.answer("Роль удалена")
await tg_api.send_message(
chat_id=user_id,
text=f"C вас была снята роль {role}",
reply_markup=admin_keyboards.set_rule_kb,
)
......@@ -22,6 +22,8 @@ async def profile_handler(m: Message, user: User | None) -> None:
maintainer_data = await altrepo.api.site.maintainer_info(user.default_branch, user.maintainer)
maintainer = maintainer_data.information
roles = DB.user.get_roles(m.from_user.id)
bugs_data = await altrepo.api.bug.bugzilla_by_maintainer(user.maintainer)
unresolved_bugs = [bug for bug in bugs_data.bugs if bug.status not in ["RESOLVED", "CLOSED"]]
......@@ -31,7 +33,8 @@ async def profile_handler(m: Message, user: User | None) -> None:
f"Исходные пакеты: {maintainer.count_source_pkg}\n"
"\n"
f"Всего багов: {bugs_data.length}\n"
f"Открытых багов: {len(unresolved_bugs)}\n",
f"Открытых багов: {len(unresolved_bugs)}\n\n"
f"Роли в боте: {", ".join(roles) or "пользователь"}",
reply_markup=profile_keyboards.profile_kb
)
......@@ -99,8 +102,8 @@ async def callback_confirm_handler(cb: CallbackQuery, branch: str) -> None:
@dp.callback_query(PayloadEqRule("command/menu"))
async def menu_handler(cb: CallbackQuery):
send_menu(cb)
await send_menu(cb=cb)
@dp.message(Command(["menu", "меню"]) | Text(["меню", "menu"]), IsPrivate())
async def menu_handler(m: Message):
await send_menu(m)
await send_menu(m=m)
......@@ -9,7 +9,7 @@ from altrepo import altrepo
from services.menu import send_menu
from config import DEFAUIL_BRANCHES
from config import config, DEFAUIL_BRANCHES
dp = Dispatch()
wm = WaiterMachine(dp)
......@@ -66,4 +66,8 @@ async def start_handler(m: Message, user: User | None) -> None:
maintainer,
branch
)
await send_menu(m)
if m.from_user.id in config.admins:
DB.user.add_role(m.from_user.id, "admin")
await send_menu(m=m)
from telegrinder import Message
from telegrinder.bot.rules.abc import ABCRule
from database.func import DB
from config import config
class BotAdmin(ABCRule):
async def check(self, m: Message) -> bool:
return m.from_user.id in config.admins
user_rules = DB.user.get_roles(m.from_user.id)
return m.from_user.id in config.admins or "admin" in user_rules
from telegrinder import Message
from telegrinder import Message, CallbackQuery
from data.keyboards import menu_keyboards
async def send_menu(m: Message) -> None:
await m.answer(
"menu:",
reply_markup=menu_keyboards.menu_kb
)
\ No newline at end of file
async def send_menu(m: Message | None= None, cb: CallbackQuery | None = None) -> None:
if m:
await m.answer(
"menu:",
reply_markup=menu_keyboards.menu_kb(m.from_user.id)
)
elif cb:
await cb.ctx_api.send_message(
chat_id=cb.chat_id.unwrap(),
text="menu:",
reply_markup=menu_keyboards.menu_kb(cb.from_user.id)
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment