Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
T
telegram_bot_antispammer
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Иванова Мария Кирилловна
telegram_bot_antispammer
Commits
81ea7ce2
Commit
81ea7ce2
authored
2 months ago
by
Иванова Мария Кирилловна
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
ac17fece
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
326 additions
and
0 deletions
+326
-0
bot.py
bot.py
+326
-0
No files found.
bot.py
0 → 100644
View file @
81ea7ce2
import
asyncio
from
aiogram
import
Bot
,
Dispatcher
,
Router
from
aiogram.types
import
ChatMemberUpdated
,
Message
from
aiogram.exceptions
import
AiogramError
from
pymorphy2
import
MorphAnalyzer
import
random
import
time
import
emoji
import
re
import
os
from
dotenv
import
load_dotenv
import
unicodedata
# Загрузка переменных из файла .env
load_dotenv
()
# Получение значения токена
bot_token
=
os
.
getenv
(
"BOT_TOKEN"
)
# Инициализация бота и диспетчера
bot
=
Bot
(
token
=
bot_token
)
dp
=
Dispatcher
()
router
=
Router
()
dp
.
include_router
(
router
)
# Для хранения информации о новых участниках
active_users
=
{}
# Список спамных слов и фраз
SPAM_KEYWORDS
=
[
# Коммерческий спам
"скидка"
,
"акция"
,
"бонус"
,
"бесплатно"
,
"купить"
,
"продать"
,
"продажа"
,
"дешево"
,
"доставка"
,
"магазин"
,
"реклама"
,
"распродажа"
,
"экономия"
,
"дешевле"
,
"недорого"
,
"спецпредложение"
,
"уникальное предложение"
,
"только сегодня"
,
# Работа
"удалённая работа"
,
"бакс"
,
"доллар"
,
"$"
,
"нужен человек"
,
"писать в лс"
,
# Мошенничество
"лотерея"
,
"быстрый выигрыш"
,
"выигрыш"
,
"приз"
,
"деньги"
,
"заработать"
,
"заработок"
,
"зп"
,
"подработка"
,
"легкий заработок"
,
"мгновенно"
,
"доход"
,
"доход онлайн"
,
"как заработать"
,
"схема заработка"
,
"пассивный доход"
,
"быстро заработать"
,
"богатство"
,
"инвестировать"
,
"инвестиция"
,
"биткоин"
,
"криптовалюта"
,
"forex"
,
"деньги на карта"
,
"дивиденд"
,
"процент"
,
"депозит"
,
"выплата"
,
"невероятный доход"
,
# Продажа услуг и мошеннические предложения
"гадалка"
,
"приворот"
,
"заговор"
,
"магия"
,
"ритуал"
,
"расклад таро"
,
"ясновидение"
,
"экстрасенс"
,
"предсказание"
,
"любовь"
,
"вернуть парня"
,
"снять порчу"
,
"порча"
,
# Фишинг и подозрительные ссылки
"кликнуть сюда"
,
"нажать сюда"
,
"писать сюда"
,
"переходить здесь"
,
"переходить по ссылка"
,
"перейти по ссылка"
,
"узнать больше"
,
"узнать тут"
,
"суперпредложение"
,
"уникальный"
,
"карта"
,
# Иностранный спам
"discount"
,
"free"
,
"sale"
,
"limited offer"
,
"bitcoin"
,
"crypto"
,
"click here"
,
"earn money"
,
"adult content"
,
"sex"
,
"xxx"
,
"viagra"
,
"pills"
,
"casino"
,
"porn"
,
"win"
,
"lottery"
,
"low price"
,
"buy now"
,
"sale today"
,
"shipping free"
,
# Продвижение подписок, лайков и сервисов
"подписка"
,
"раскрутка"
,
"продвижение"
,
"лайк"
,
"подписчик"
,
"просмотр"
,
"отзыв"
,
"рейтинг"
,
"рейтинг магазин"
,
"работа"
,
"вакансия"
,
"раскрутка аккаунт"
,
"реферальная ссылка"
,
"реферал"
,
# Политический спам
"митинг"
,
"партия"
,
"агитация"
,
"голосовать"
,
"выборы"
,
"кандидат"
,
"поддержка"
,
"голосование"
,
"поддерживать мы"
,
# Вредоносные URL
".tinyurl"
,
".bitly"
,
".ly"
,
".click"
,
".top"
,
".app"
,
".info"
,
".xyz"
,
".vip"
,
".tk"
,
".pw"
,
".cc"
,
".link"
,
# Общее (агрессивные призывы, массовая рассылка)
"добрый время сутки"
,
"поделиться"
,
"отправить"
,
"рассказать все"
,
"срочно"
,
"обязательно прочитать"
,
"пересылать"
,
"передать друг"
,
# Контент, связанный с азартными играми
"казино"
,
"ставка"
,
"азартная игра"
,
"рулетка"
,
"выигрыш в казино"
,
"выигрывать"
,
"промокод"
,
"слоты"
,
"подарок"
,
"забрать подарок"
,
"получить подарок"
,
"казино онлайн"
,
"бесплатные фишки"
,
"ставки на спорт"
,
"играть"
,
"выигрывать"
,
"фриспин"
,
"получить выигрыш"
,
"casino"
,
# Эмодзи, часто используемые в спаме
"🎁"
,
"🔥"
,
"💸"
,
"💰"
,
"💎"
,
"🤑"
,
"🤩"
,
"⚡"
,
"⭐"
,
"💥"
]
def
has_mixed_layout
(
text
):
"""
Проверяет, содержит ли текст слова, написанные на смешанной раскладке.
Например: "пр1в3т", "teст", "нeт".
"""
cyrillic_letters
=
"абвгдеёжзийклмнопрстуфхцчшщъыьэюя"
latin_letters
=
"abcdefghijklmnopqrstuvwxyz"
words
=
text
.
split
()
for
word
in
words
:
has_cyrillic
=
any
(
char
in
cyrillic_letters
for
char
in
word
.
lower
())
has_latin
=
any
(
char
in
latin_letters
for
char
in
word
.
lower
())
if
has_cyrillic
and
has_latin
:
return
True
# Найдено слово со смешанной раскладкой
return
False
morph
=
MorphAnalyzer
()
def
normalize_text_to_infinitive
(
text
):
"""
Приводит текст к нижнему регистру, нормализует слова до начальной формы,
удаляет лишние символы и обрабатывает смешанные раскладки.
"""
if
not
text
:
# Проверяем, что текст не None или пустой
return
[]
# Приведение текста к единой форме Unicode (NFKC)
text
=
unicodedata
.
normalize
(
"NFKC"
,
text
)
text
=
re
.
sub
(
r'(\w)\.(\w)'
,
r'\1 \2'
,
text
)
# Точка внутри слова
text
=
''
.
join
(
ch
for
ch
in
text
if
ch
.
isalnum
()
or
ch
.
isspace
()
or
emoji
.
is_emoji
(
ch
))
text
=
re
.
sub
(
r'([.,!?])'
,
r' \1 '
,
text
)
# Пробел вокруг пунктуации
# Убираем лишние пробелы
text
=
re
.
sub
(
r'\s+'
,
' '
,
text
)
.
strip
()
# Приведение к нижнему регистру
words
=
text
.
lower
()
.
split
()
# Нормализация слов до начальной формы
normalized_words
=
[]
for
word
in
words
:
try
:
normalized_words
.
append
(
morph
.
parse
(
word
)[
0
]
.
normal_form
)
except
Exception
as
e
:
# Если слово не удалось обработать, добавляем его как есть
normalized_words
.
append
(
word
)
return
normalized_words
def
preprocess_spam_keywords
(
keywords
):
"""Приводит ключевые слова к нормализованной форме."""
return
set
(
' '
.
join
(
normalize_text_to_infinitive
(
keyword
))
for
keyword
in
keywords
)
# Нормализуем ключевые слова
SPAM_KEYWORDS
=
preprocess_spam_keywords
(
SPAM_KEYWORDS
)
def
extract_emojis
(
text
):
return
text
.
join
(
c
for
c
in
text
if
unicodedata
.
category
(
c
)
.
startswith
(
'So'
))
def
is_spam
(
text
):
"""Проверяет, содержит ли текст спам."""
# Нормализуем текст
normalized_words
=
normalize_text_to_infinitive
(
text
)
emojis
=
extract_emojis
(
text
)
emoji_count
=
len
(
emojis
)
normalized_words
+=
emojis
# Объединяем нормализованные слова в выражения длиной 1, 2 и 3
phrases
=
set
()
for
n
in
range
(
1
,
4
):
phrases
.
update
(
" "
.
join
(
normalized_words
[
i
:
i
+
n
])
for
i
in
range
(
len
(
normalized_words
)
-
n
+
1
))
# Подсчитываем количество совпадений с ключевыми словами/выражениями
spam_count
=
sum
(
keyword
in
phrases
for
keyword
in
SPAM_KEYWORDS
)
spam_count
=
spam_count
+
emoji_count
# Возвращаем True, если найдено два или более совпадений
return
spam_count
>=
2
def
generate_math_problem
():
"""Генерация простого математического примера."""
a
=
random
.
randint
(
1
,
10
)
b
=
random
.
randint
(
1
,
10
)
return
f
"{a} + {b}"
,
a
+
b
@router.chat_member
()
async
def
welcome_new_user
(
event
:
ChatMemberUpdated
):
"""Обработчик новых участников группы."""
if
event
.
new_chat_member
.
status
==
"member"
and
event
.
old_chat_member
.
status
not
in
[
"member"
,
"administrator"
,
"creator"
]:
new_user
=
event
.
new_chat_member
.
user
math_problem
,
correct_answer
=
generate_math_problem
()
try
:
# Отправляем приветственное сообщение
welcome_message
=
await
bot
.
send_message
(
chat_id
=
event
.
chat
.
id
,
text
=
f
"Здравствуйте, {new_user.first_name}! Добро пожаловать в нашу группу!
\n\n
"
f
"Нам необходимо удостовериться, что вы человек. Вот математический пример: {math_problem}
\n
"
f
"Решите его, пожалуйста, в течение 1 минуты! В качестве ответа отправьте одно число."
)
# Сохраняем данные о пользователе
active_users
[
new_user
.
id
]
=
{
"message_ids"
:
[
welcome_message
.
message_id
],
# Список для хранения всех сообщений бота
"correct_answer"
:
correct_answer
,
"timestamp"
:
time
.
time
(),
"user_messages"
:
[],
# Сообщения пользователя
"user_answered"
:
False
,
"chat_id"
:
event
.
chat
.
id
,
}
print
(
f
"Приветственное сообщение отправлено пользователю {new_user.id} с примером {math_problem}"
)
except
Exception
as
e
:
print
(
f
"Не удалось отправить сообщение в группу {event.chat.id}. Ошибка: {e}"
)
# Ожидание ответа пользователя
await
check_answer
(
new_user
.
id
)
async
def
check_answer
(
user_id
):
"""Проверка ответа пользователя через 1 минуту."""
await
asyncio
.
sleep
(
60
)
user_data
=
active_users
.
get
(
user_id
)
if
user_data
and
not
user_data
[
"user_answered"
]:
try
:
# Удаляем все сообщения
await
cleanup_messages
(
user_data
)
# Удаляем пользователя из группы
await
bot
.
ban_chat_member
(
chat_id
=
user_data
[
"chat_id"
],
user_id
=
user_id
)
print
(
f
"Пользователь {user_id} не ответил вовремя. Удален из группы."
)
except
AiogramError
as
e
:
print
(
f
"Ошибка при удалении пользователя {user_id}. Ошибка: {e}"
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении пользователя {user_id}. Ошибка: {e}"
)
async
def
cleanup_messages
(
user_data
):
"""Удаление всех сообщений пользователя и бота."""
try
:
# Удаляем все сообщения бота
for
message_id
in
user_data
[
"message_ids"
]:
try
:
print
(
f
"Удаляю сообщение бота: chat_id={user_data['chat_id']}, message_id={message_id}"
)
await
bot
.
delete_message
(
chat_id
=
user_data
[
"chat_id"
],
message_id
=
message_id
)
print
(
f
"Сообщение бота для пользователя {user_data['chat_id']} удалено."
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения бота. Ошибка: {e}"
)
# Удаляем сообщения пользователя
for
user_message
in
user_data
[
"user_messages"
]:
try
:
if
user_message
:
await
bot
.
delete_message
(
chat_id
=
user_data
[
"chat_id"
],
message_id
=
user_message
.
message_id
)
print
(
f
"Сообщение пользователя {user_message.message_id} удалено."
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения пользователя. Ошибка: {e}"
)
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщений. Ошибка: {e}"
)
@router.message
()
async
def
handle_message
(
message
:
Message
):
"""Обработчик сообщений от пользователей для проверки ответа."""
text
=
message
.
text
or
message
.
caption
or
""
# Проверка на смешанную раскладку
if
text
and
has_mixed_layout
(
text
):
try
:
await
message
.
delete
()
print
(
f
"Удалено сообщение с смешанной раскладкой от пользователя {message.from_user.id}: {message.text}"
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения с смешанной раскладкой. Ошибка: {e}"
)
# Проверка на мультимедиа (фото/видео)
if
message
.
photo
or
message
.
video
or
message
.
document
:
try
:
if
is_spam
(
text
):
# Проверяем подпись к мультимедиа
await
message
.
delete
()
print
(
f
"Удалено спамное мультимедиа сообщение от пользователя {message.from_user.id}."
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении мультимедиа сообщения. Ошибка: {e}"
)
# Проверка на кнопки/ссылки
if
message
.
reply_markup
and
hasattr
(
message
.
reply_markup
,
'inline_keyboard'
):
for
row
in
message
.
reply_markup
.
inline_keyboard
:
for
button
in
row
:
if
button
.
url
and
any
(
spam_word
in
button
.
url
.
lower
()
for
spam_word
in
SPAM_KEYWORDS
):
try
:
await
message
.
delete
()
print
(
f
"Удалено сообщение с подозрительной кнопкой от пользователя {message.from_user.id}."
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении сообщения с кнопкой. Ошибка: {e}"
)
# Проверка на спам
if
is_spam
(
message
.
text
):
try
:
await
message
.
delete
()
print
(
f
"Удалено спамное сообщение от пользователя {message.from_user.id}: {message.text}"
)
return
except
Exception
as
e
:
print
(
f
"Ошибка при удалении спамного сообщения. Ошибка: {e}"
)
user_data
=
active_users
.
get
(
message
.
from_user
.
id
)
if
user_data
and
not
user_data
[
"user_answered"
]:
# Сохраняем сообщение пользователя
user_data
[
"user_messages"
]
.
append
(
message
)
if
message
.
text
.
isdigit
():
user_answer
=
int
(
message
.
text
)
if
user_answer
==
user_data
[
"correct_answer"
]:
user_data
[
"user_answered"
]
=
True
# Ответ правильный — сохраняем сообщение и удаляем все
correct_message
=
await
message
.
reply
(
"Благодарим!"
)
user_data
[
"message_ids"
]
.
append
(
correct_message
.
message_id
)
# Добавляем в список ID сообщений
print
(
f
"Пользователь {message.from_user.id} ответил правильно."
)
# Удаляем сообщения после правильного ответа
await
cleanup_messages
(
user_data
)
else
:
# Ответ неправильный, добавляем новое сообщение бота
incorrect_message
=
await
message
.
reply
(
"Неправильный ответ. Попробуйте еще раз."
)
user_data
[
"message_ids"
]
.
append
(
incorrect_message
.
message_id
)
print
(
f
"Отправлено сообщение о неправильном ответе для пользователя {message.from_user.id}"
)
else
:
# Если ответ не число
invalid_message
=
await
message
.
reply
(
"Пожалуйста, отправьте числовой ответ."
)
user_data
[
"message_ids"
]
.
append
(
invalid_message
.
message_id
)
print
(
f
"Отправлено сообщение с просьбой ввести числовой ответ для пользователя {message.from_user.id}"
)
async
def
main
():
"""Запуск бота."""
await
dp
.
start_polling
(
bot
)
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment