OpenAI и MCP‑инструменты
Этот раздел описывает, как Telegram‑бот обращается к OpenAI Responses API и MCP‑серверу, как определяется пользователь (tg_id → user_id) и как используются MCP‑инструменты.
Определение пользователя через MCP
Файл: telegram-bot/services/identity.py
Класс MCPIdentityResolver отвечает за сопоставление tg_id → user_id. Он использует библиотеку fastmcp и транспорт StreamableHttpTransport, подключается к MCP‑серверу по MCP_LOCAL_URL и вызывает MCP‑инструмент verify_user.
get_identity_resolver() лениво создаёт и кэширует экземпляр MCPIdentityResolver и читает из окружения:
MCP_LOCAL_URL— обязателен (иначе выбрасываетсяValueError);MCP_AUTH_TOKEN— опциональный токен (если MCP‑сервер защищён JWT), передаётся вStreamableHttpTransport(..., auth=...).
Метод resolve_user_id(tg_id, phone=None) выполняет несколько последовательных шагов:
- Преобразует
tg_idк целому числу. - Проверяет локальный кэш и, если
user_idуже известен и телефон не передан, сразу возвращает его. - Делает до двух попыток вызова MCP‑инструмента
verify_user(при ошибке сбрасывает клиент и пробует ещё раз). - Извлекает
user_idизresult.dataилиresult.structured_content(ожидаются ключиuser_idилиid). - Сохраняет
user_idв кэше и возвращает его вызывающему коду.
Note
В клиенте есть проверка info.get("conflict") и исключение VerifyUserConflictError, но текущая реализация verify_user на MCP‑сервере конфликт не возвращает: при несовпадении tg_id и phone tg_id перепривязывается к пользователю по телефону.
Warning
Это поведение полезно в проде (помогает «узнать» пользователя по телефону), но в тестах может удивить.
Если вы используете один и тот же номер телефона для разных tg_id, verify_user может перепривязать tg_id к существующей записи — и история/роль будут связаны уже с этой записью. Для тестов проще держать отдельную базу и уникальные номера.
Обработка бизнес‑запроса и телефон
Файл: telegram-bot/utils/ai_processor.py
Функция process_ai_request(...) отвечает за полный цикл обработки текстового бизнес‑сообщения. Обработчик (handlers/messages.py) передаёт в неё не объект Message, а примитивные поля — это упрощает тестирование и не связывает бизнес‑логику с aiogram:
async def process_ai_request(
tg_id: int,
request_text: str,
bot,
message_id: int,
business_connection_id: str | None = None,
) -> None:
...
Что делает process_ai_request:
- Получает
identity_resolver = get_identity_resolver()иservice = get_openai_service(). - Определяет
user_idчерезidentity_resolver.resolve_user_id(tg_id)(MCP‑инструментverify_user). При исключении логирует ошибку и временно используетuser_id = tg_id. - Проверяет наличие пользователя в БД (
service.memory.user_exists(user_id)). Если пользователя нет, сбрасывает кэшMCPIdentityResolverи повторяетverify_user; если запись так и не появляется — отправляет пользователю сообщение об ошибке и завершает обработку. - Если
request_textявляется reset‑триггером (utils.trigger_handler.is_reset_trigger) — очищает контекстservice.clear_context(user_id, reset_role=False), отправляет подтверждение и завершает обработку. - Если передан
business_connection_id, пытается пометить сообщение прочитанным черезReadBusinessMessage(ошибка не фатальна и только логируется). - Нормализует текст (проверка на
str,strip()), игнорирует пустые события. - Если в БД нет телефона, пытается извлечь его из текста через
utils.phone_normalizer.normalize_phone_digits(канонический формат380XXXXXXXXXбез плюса) и сохранить черезMySQLMemory.update_phone. ПриDuplicatePhoneErrorможет перепривязатьtg_idк существующей записи (MySQLMemory.reassign_tg_id) и обновить кэшMCPIdentityResolver. - Вызывает
OpenAIService.process_message(user_id=user_id, message=request_text)и отправляет ответ пользователю черезbot.send_message(..., business_connection_id=...). Также отправляетchat_action="typing"перед запросом к модели.
Note
Канонический формат телефона в системе — 380XXXXXXXXX (E.164 без плюса). Telegram‑бот сохраняет телефон в users.phone именно в этом виде (через normalize_phone_digits). E.164 с плюсом (+380...) используется только на границе интеграций, которым он нужен (например, Telethon).
OpenAIService и Responses API
Файл: telegram-bot/services/openai/service.py
Класс OpenAIService инкапсулирует работу с конфигурацией (OpenAIConfig), базой данных (MySQLMemory), клиентом OpenAI Responses API (OpenAIResponsesClient), MCP‑инструментами (ToolRegistry), инструкциями (InstructionProvider) и историей диалога (ConversationManager). Внешний код взаимодействует с ним в основном через методы process_message и switch_user_role.
Служебное состояние (user_state)
Файлы: telegram-bot/services/openai/state.py, telegram-bot/services/mysql_memory.py, telegram-bot/services/openai/service.py
Чтобы бот не «забывал» служебный контекст между перезапусками, он хранит небольшой JSON‑state в таблице user_state (по одной строке на пользователя). В нём лежит:
session.last_role— последняя роль, с которой работала модель;session.last_response_id—previous_response_idдля продолжения цепочки Responses API;last_tool_calls— последние MCP‑вызовы, которые модель сделала в Responses (до 5 штук);doc_flow— служебный контекст по документам (включается черезPOST /cargo-assignedи обновляется послеPOST /cargo-docx).
Если таблицы user_state нет или JSON сломан, бот продолжит работу, но не сможет сохранять это состояние. DDL для таблицы лежит в telegram-bot/scripts/20260113_user_state.sql.
Основной сценарий process_message
Метод process_message(user_id: int, message: str, user_info: dict | None = None) -> str — центральная точка входа для работы модели с текстом пользователя в Telegram. Он последовательно определяет актуальную роль пользователя, обновляет историю диалога, подготавливает вход для модели, выбирает между режимом с MCP‑инструментами и текстовым режимом и, при необходимости, повторяет запрос после смены роли. Внешний код ожидает от метода одну строку — финальный текст ответа для отправки пользователю.
Параметры метода process_message
| Name | Type | Description | Default |
|---|---|---|---|
user_id |
int |
Внутренний идентификатор пользователя (из таблицы users.user_id), по которому выбирается история и кэшируются данные о роли и response_id. |
— |
message |
str |
Очищенный текст сообщения пользователя, который нужно обработать моделью. | — |
user_info |
dict \| None |
Дополнительная информация о пользователе (например, роль), полученная на предыдущих шагах; может влиять на выбор active_role и обновление в базе. |
None |
Возвращаемое значение
| Name | Type | Description |
|---|---|---|
result |
str |
Финальный текст ответа модели, который должен быть отправлен пользователю в Telegram‑чат. |
Определение активной роли
В начале работы метод пытается загрузить user_state из БД и прогреть им in‑memory кэши (_last_role_by_user, _prev_response_id_by_user). После этого он извлекает последнюю известную роль пользователя из _last_role_by_user, анализирует user_info (если он передан) и берёт поле role только в том случае, если оно является строкой и входит в допустимый набор ролей {"client", "transporter"}. Если роль из user_info отличается от ранее сохранённой, process_message считает её новой активной ролью и пытается сохранить её в базе данных через MySQLMemory.update_user_info(user_id, active_role). Ошибка сохранения логируется, но не прерывает обработку.
Если активная роль всё ещё не определена (например, в кэше и user_info роли нет), метод обращается к MySQLMemory.get_user_stats(user_id) и, при наличии валидного значения в поле role, использует его. Когда роль меняется по сравнению с предыдущей (previous_role != active_role), метод:
фиксирует это в журнале,
очищает кэш истории в ConversationManager.reset_cache(user_id),
удаляет сохранённый response_id в _prev_response_id_by_user для данного пользователя.
В конце этого этапа _last_role_by_user[user_id] обновляется до актуальной роли (или остаётся None, если явная роль так и не определена).
История диалога и контекст
После выбора роли входящее сообщение пользователя добавляется в историю через ConversationManager.append(user_id, "user", message). Затем process_message загружает недавние сообщения диалога с учётом лимита history_limit методом ConversationManager.history(user_id) и формирует текст контекста с помощью build_context_message(message, history). В итоговый текст, который будет отправлен в модель, добавляется префикс вида [USER_ID: <id>], чтобы модель могла однозначно связывать контекст и пользователя.
Параллельно метод формирует инструкции для модели: через InstructionProvider.compose(active_role) объединяются системные инструкции, ролевой блок (для текущей роли либо базовый вариант, если роль не задана) и небольшой runtime‑фрагмент с текущей датой и временем. Эти инструкции передаются в Responses API в поле instructions.
Если удалось загрузить user_state, бот добавляет к instructions блок «служебные данные» через render_state_context(state). Он нужен, чтобы модель видела, например, что включён режим документов (doc_flow) или какие инструменты она вызывала в прошлый раз. Этот блок помечен как внутренний и не должен попадать в ответ пользователю.
Вызов Responses API и MCP‑инструментов
Основной вызов к OpenAI выполняется во внутреннем цикле. На каждой итерации:
определяется список MCP‑инструментов через ToolRegistry.for_role(active_role) и вычисляется флаг их доступности has_tools,
вычисляется previous_response_id с помощью _select_previous_response_id(user_id, active_role, has_tools), который возвращает сохранённый идентификатор цепочки только если ранее был успешный запрос и инструменты доступны,
формируется запрос к Responses API через OpenAIResponsesClient.create(...), куда передаются модель primary_model, текст с контекстом, инструкции, список инструментов, previous_response_id и флаг store=True, чтобы API продолжило или создало новую цепочку.
Если при вызове с MCP‑инструментами происходит исключение, метод различает два сценария. При первой неудаче, когда инструменты формально доступны, он считает возможной причиной устаревший previous_response_id: логирует предупреждение, очищает запись в _prev_response_id_by_user и повторяет попытку без привязки к предыдущему ответу. Если ошибка повторяется или инструменты изначально недоступны, process_message переключается на резервную текстовую модель fallback_model. В этом режиме запрос отправляется без списка инструментов и без сохранения цепочки (store=False), флаг used_fallback выставляется в True, а сохранённый response_id для пользователя очищается.
Если после всех попыток объект ответа так и не получен (response is None), метод возбуждает исключение, которое перехватывается во внешнем try и приводит к возврату пользователю универсального сообщения о проблеме с серверами.
Повторный запрос при смене роли
После успешного ответа process_message извлекает текст из структуры ответа через extract_final_output_text(response). Если текст получить не удалось, используется заранее подготовленное нейтральное сообщение об ошибке. Далее метод ещё раз обращается к MySQLMemory.get_user_stats(user_id), чтобы проверить, не изменилась ли роль пользователя во время работы (например, после вызова MCP‑инструмента set_role).
Если новая роль (new_role) присутствует и отличается от текущей active_role, а повтор ещё не выполнялся (флаг role_retry_available истинен), метод:
логирует факт смены роли,
обновляет _last_role_by_user[user_id] до новой роли,
очищает previous_response_id для пользователя в _prev_response_id_by_user,
возвращается к началу внешнего цикла и повторяет весь блок построения контекста и вызова модели уже с новыми инструкциями.
Повтор выполняется не более одного раза: если роль не изменилась или повтор уже был, внешний цикл завершается.
Кэширование цепочек и финализация
Перед возвратом результата метод обновляет кэш идентификаторов цепочек. Если использовалась основная модель с MCP‑инструментами и Responses API вернуло ненулевой id, этот идентификатор сохраняется в _prev_response_id_by_user[user_id] и будет использован как previous_response_id при следующем запросе. Во всех остальных случаях (fallback‑модель либо отсутствие id) кэш для пользователя очищается, чтобы последующий вызов начался с новой цепочки.
Параллельно бот обновляет и сохраняет user_state: пишет туда актуальную роль, last_response_id (или очищает его при fallback), а также сохраняет последние MCP‑вызовы, извлечённые из response.output (type="mcp_call"). Ошибки сохранения user_state не роняют обработку — они только логируются.
Финальный текст добавляется в историю диалога через ConversationManager.append(user_id, "assistant", final_text), а _last_role_by_user[user_id] ещё раз фиксируется как фактически использованная роль. Внешний try/except вокруг всего тела метода страхует от любых непредвиденных исключений: при ошибке в лог попадает подробное сообщение, а пользователю отправляется аккуратное уведомление о временной недоступности сервиса без раскрытия внутренних деталей.
MySQLMemory и ConversationManager
Файл: telegram-bot/services/mysql_memory.py
Класс MySQLMemory инкапсулирует работу с базой MariaDB (через mysql+aiomysql и SQLAlchemy) и предоставляет методы чтения и записи истории диалога. Основные операции — это сохранение сообщений (save_message), загрузка последних реплик (load_user_history) и очистка истории (clear_user_history) с опциональным сбросом роли в таблице users. В load_user_history записи с content_type in {"system","tool"} превращаются в роль system, чтобы модель видела их как системные сообщения. Дополнительно класс умеет получать агрегированную информацию о пользователе (get_user_stats), искать пользователя по номеру телефона (get_user_by_phone), обновлять роль (update_user_info), управлять профилем 1С (clear_onec_profile), безопасно переименовывать Telegram‑ID (reassign_tg_id) и номер телефона (update_phone), проверять существование пользователя (user_exists) и переносить связанные сущности на другой user_id (reassign_cargo_requests, reassign_dialogue_history, clone_user_snapshot). Также MySQLMemory умеет читать/писать user_state (get_user_state, set_user_state, clear_user_state).
Класс ConversationManager (services/openai/conversation.py) работает поверх MySQLMemory. Он кэширует историю в памяти процесса, добавляет новые сообщения одновременно в кэш и в базу данных и умеет очищать историю и сбрасывать кэш для конкретного пользователя. Благодаря этому запросы к модели работают с короткой, но согласованной историей, а полная лента диалога остаётся в БД.
MCP‑инструменты, доступные боту
Файл: telegram-bot/services/openai/tools.py
ToolRegistry строит конфигурацию MCP‑инструмента для Responses API:
- базовый объект
mcp‑инструмента содержит: type="mcp";server_label="primeai_logistics";server_url— HTTP‑URL MCP‑сервера (MCP_SERVER_URL);require_approval="never";headers:MCP-Protocol-Version: "2025-06-18";Authorization: "Bearer <MCP_AUTH_TOKEN>"(если заданMCP_AUTH_TOKEN).
- списки допустимых инструментов (
allowed_tools): - по умолчанию (до определения роли) —
["set_role"]; - для роли
client—["cargo_add", "get_user_cargoes", "send_docx_form", "show_company_info"]; - для роли
transporter—["lookup_company_cargo", "prepare_margin_offer", "get_user_transportations", "show_company_info", "store_transporter_match"].
Конкретные инструменты реализованы в mcp-server/tools/. При изменении набора инструментов необходимо синхронно обновлять конфигурацию ToolRegistry, чтобы модель видела актуальный набор возможностей.
Warning
show_company_info перечислен в ToolRegistry, но в текущей версии mcp-server такой MCP‑инструмент отсутствует. Если модель попытается вызвать его, Responses API получит ошибку «tool not found» до тех пор, пока инструмент не будет добавлен на стороне MCP‑сервера или удалён из allowed_tools.
Инструкции и вспомогательные компоненты OpenAI
InstructionProvider
Файл: telegram-bot/services/openai/instructions.py
Класс InstructionProvider отвечает за загрузку и комбинирование инструкций для модели. При инициализации он сначала пытается прочитать системный текст из SYSTEM_INSTRUCTIONS_PATH, если эта переменная указана, а затем из config/instructions/base.md. Ролевые инструкции ищутся в каталоге config/instructions: для каждой роли (client, transporter и др.) может существовать отдельный файл <role>.md. Файлы с базовыми именами (base, default, common, shared, system) пропускаются, чтобы они не воспринимались как отдельные роли.
Метод compose(role) объединяет три части: системные инструкции, ролевой блок (если для указанной роли есть файл) и runtime‑инструкцию с текущей датой и временем. В результате модель получает единый текст, который задаёт и общие правила поведения, и контекст конкретной роли пользователя.
Формирование контекста и извлечение ответа
Файл: telegram-bot/services/openai/messages.py
Функция build_context_message(message, history) принимает текущий запрос и историю диалога в виде списка словарей {role, content}. На их основе она формирует текст, который кратко описывает недавний контекст и текущий вопрос пользователя. Именно этот текст передаётся в модель в качестве основного содержимого input.
Функция extract_final_output_text(response) анализирует поля output и output_text в ответе Responses API, извлекает текст последнего сообщения модели и возвращает его в виде строки. Если в ответе нет содержимого, функция возвращает None. Этот helper используется в OpenAIService.process_message, чтобы привести ответы модели к простому тексту.
Клиент Responses API
Файл: telegram-bot/services/openai/client.py
Класс OpenAIResponsesClient представляет собой тонкую обёртку над AsyncOpenAI. Его метод create(...) принимает модель, текстовый ввод, инструкции и дополнительные параметры (в том числе список MCP‑инструментов, previous_response_id и флаг store), формирует словарь параметров и вызывает self._client.responses.create(**payload). В вызывающий код возвращается объект ответа Responses API без дополнительной пост‑обработки.
Дополнительный сервис LardiAPI
Файл: telegram-bot/services/lardi.py
Класс LardiAPI инкапсулирует прямые HTTP‑вызовы к Lardi API через aiohttp. Он использует токен LARDI_API и базовый URL https://api.lardi-trans.com по умолчанию. Метод get_user(ref_id, language="ru") запрашивает информацию о пользователе по его refId и при успехе возвращает словарь с полями success, status_code, message, data или raw_response. Метод get_my_cargo_proposals() постранично запрашивает список собственных заявок по грузу по пути /v2/proposals/my/cargoes/published, агрегирует все страницы в один список и возвращает структуру с полями success, status_code, message, data. Основной поток обработки сообщений этот модуль не использует, но он полезен для отладки интеграции с Lardi и написания вспомогательных утилит.