Перейти к содержанию

Доменная логика

Этот раздел описывает модули в каталоге prime_lardi/, которые реализуют бизнес‑логику MCP‑сервера. Для ключевых модулей приведены сигнатуры функций, формат входных данных, структура результата и побочные эффекты.

Константы и справочники

constants.py

В этом модуле собраны константы, которые задают базовое поведение интеграции с Lardi. DEFAULT_REQUEST_SETTINGS содержит типовую конфигурацию заявки: города отправления и назначения, типы кузова, габариты, параметры оплаты и специальные флаги. Кортеж REQUIRED_SETTINGS определяет минимальный набор полей, без которых Lardi не примет заявку. Словарь CURRENCY_SYNONYMS помогает приводить пользовательские обозначения валют к каноническим названиям, понятным API. Регулярные выражения ISO_DATE_RE и YEAR_PATTERN используются при проверке и разборе дат. Все эти значения применяются при сборке и валидации payload.

references.py

Модуль references.py отвечает за работу со справочниками Lardi. Здесь определены функции, которые по человекочитаемым названиям (тип кузова, тип загрузки, валюта, вид упаковки, город) возвращают идентификаторы из соответствующих справочников API. Все они используют общий клиент fetch_reference из api_client.py и внутри обходятся без деталей HTTP‑уровня, предоставляя более удобный интерфейс для остальной логики.

Низкоуровневый клиент Lardi

api_client.py

Этот модуль содержит функции нижнего уровня для обращения к Lardi через httpx. require_token() читает токен из переменной окружения LARDI_API и проверяет, что он задан. create_session(token) создаёт асинхронный HTTP‑клиент с заголовками Accept: application/json и Authorization. Функция fetch_reference(session, path, params) выполняет GET‑запрос к заданному пути API и возвращает разобранный JSON. create_cargo(session, payload) отправляет POST‑запрос для создания заявки, добавляя параметр языка CREATE_LANGUAGE. В случае ошибки статуса выбрасывается LardiAPIError с кодом и деталями ответа. Дополнительные вспомогательные функции отвечают за логирование запросов и ответов и за безопасное извлечение тела ответа даже при исключениях.

Построение payload для заявок

payload_builder.py

Файл payload_builder.py отвечает за подготовку тела заявки, которое будет отправлено в Lardi. Проще всего думать о нём так: он берёт «настройки заявки» в нашей форме (человекочитаемые поля, допускающие разные ключи) и превращает их в строгий JSON‑payload, который ждёт Lardi API.

Чтобы было понятно «откуда → куда», держите в голове цепочку вызовов при создании заявки:

  1. Модель вызывает MCP‑инструмент cargo_add (mcp-server/tools/lardi_add_cargo.py) и передаёт туда settings (то, что собрали из диалога) и user_id.
  2. cargo_add вызывает prime_lardi.cargo_service.create_cargo_request.
  3. Внутри create_cargo_request:
  4. вытаскиваются реквизиты для 1С (settings["one_c"]) и сохраняются в users.onec_profile;
  5. user_id тоже обрабатывается отдельно;
  6. оставшиеся поля превращаются в «чистые настройки заявки» и проходят через merge_settings(...);
  7. затем build_payload(session, settings) собирает payload Lardi.

Термины, которые дальше будут встречаться:

  • settings — словарь в нашей форме (обычно snake_case: source_towns, cargo_body_types, payment_value и т. п.). В коде допускаются и camelCase‑ключи (например, sourceTowns), потому что get_setting(...) умеет искать по нескольким вариантам ключа.
  • payload — словарь в формате Lardi (camelCase‑ключи: waypointListSource, paymentCurrencyId, cargoBodyTypeIds и т. п.), который уходит в POST на создание заявки.
def merge_settings(user_settings: Optional[Mapping[str, Any]]) -> Dict[str, Any]:
    ...

Что на входе: user_settings — настройки, которые пришли в create_cargo_request (обычно это то, что модель прислала через cargo_add, уже без one_c и без user_id).

Что на выходе: словарь настроек, готовый к сборке payload.

Что делает merge_settings:

  • если user_settings is None — берёт DEFAULT_REQUEST_SETTINGS (это полезно для CLI/ручных тестов);
  • если user_settings передан — берёт только переданные ключи (и удаляет те, где значение None);
  • проверяет наличие обязательных полей из REQUIRED_SETTINGS с учётом альтернатив:
  • content_name может быть пропущен, если задан content_id;
  • currency_name может быть пропущен, если задан payment_currency_id;
  • source_town / target_town могут быть пропущены, если заданы waypoint‑списки или списки городов (source_towns / target_towns);
  • если date_to не задан и offset_duration_days тоже не задан, подставляет offset_duration_days из DEFAULT_REQUEST_SETTINGS (обычно это означает «сделать dateTo равным dateFrom»).
async def build_payload(session: httpx.AsyncClient, settings: Mapping[str, Any]) -> Dict[str, Any]:
    ...

Что на входе:

  • settings — результат merge_settings(...) (настройки заявки в нашей форме);
  • sessionhttpx.AsyncClient, через который будут запрашиваться справочники Lardi (города, валюты, типы кузова и т. п.).

Что на выходе: словарь payload в формате Lardi, который уходит в POST создания заявки.

Главная идея build_payload: на вход можно передавать «человеческие» значения (например, названия городов и валют), а на выходе получится payload с нужными ID и структурами, которые ждёт Lardi.

Основные шаги сборки (что берём → что получаем):

  • формирует waypointListSource и waypointListTarget:
  • если вы уже знаете townId — можно передать готовые списки waypoint_list_source/waypoint_list_target (это список словарей вида {"townId": ..., ...}) и избежать лишних запросов к справочникам;
  • иначе можно передать source_towns/target_towns (список названий городов) или одиночные source_town/target_town;
  • названия городов превращаются в точки маршрута через fetch_town(session, name)build_waypoint(...).
  • определяет валюту:
  • если задан payment_currency_id, он валидируется по справочнику /references/currencies;
  • иначе берётся currency_name и через fetch_currency_id подбирается ID.
  • определяет типы кузовов:
  • либо по cargo_body_type_ids, либо по cargo_body_types через fetch_body_type_ids;
  • определяет типы загрузки:
  • либо по load_type_ids, либо по load_types через fetch_load_type_ids;
  • формирует упаковку:
  • либо из уже «разрешённого» списка cargo_packaging_resolved,
  • либо по пользовательскому списку cargo_packaging через fetch_packaging;
  • проверяет и нормализует базовые числовые поля:
  • payment_value — должна быть числом; в сам payload записывается paymentValue=0.0, потому что фактическая цена хранится отдельно (и используется в наших расчётах/хранении);
  • size_masssizeMass — масса груза;
  • lorryAmount — количество машин (по умолчанию 1);
  • добавляет в payload даты через build_dates(settings);
  • дополняет payload опциональными булевыми, числовыми и строковыми полями (cmr, groupage, t1, tir, payment_unit_id, size_volume, note, payment_price и др.);
  • гарантирует, что указан либо content_id, либо content_name.

Important

Сейчас в payload_builder.py жёстко ограничена валюта: поддерживается только гривна (UAH).
Если передать другую валюту (по имени или ID), сборка payload упадёт с ошибкой.

Практический пример (который можно повторить)

1) Что приходит в MCP‑инструмент cargo_add

cargo_add вызывается моделью с двумя аргументами: user_id и settings. Внутри settings должен быть блок one_c (минимум okpo и name), иначе create_cargo_request упадёт с ошибкой.

Пример валидного вызова:

{
  "user_id": 42,
  "settings": {
    "one_c": { "okpo": "12345678", "name": "ТОВ \"Приклад\"" },
    "date_from": "2026-01-15",
    "source_towns": ["Київ"],
    "target_towns": ["Львів"],
    "content_name": "Палети",
    "cargo_body_types": ["тент"],
    "load_types": ["задня"],
    "currency_name": "UAH",
    "payment_value": 25000,
    "size_mass": 20
  }
}

Note

user_id берётся не «из головы»: его возвращает MCP‑инструмент verify_user (tg_id → user_id / phone → user_id).

2) Как увидеть реальный payload, который собирает build_payload

Ниже — самый простой способ воспроизвести сборку payload без создания заявки во внешнем API: мы только читаем справочники (GET) и печатаем результат build_payload.

cd mcp-server
python3 - <<'PY'
import asyncio
import json

from prime_lardi.api_client import create_session, require_token
from prime_lardi.payload_builder import build_payload, merge_settings


async def main() -> None:
    settings = merge_settings(
        {
            "date_from": "2026-01-15",
            "source_towns": ["Київ"],
            "target_towns": ["Львів"],
            "content_name": "Палети",
            "cargo_body_types": ["тент"],
            "load_types": ["задня"],
            "currency_name": "UAH",
            "payment_value": 25000,
            "size_mass": 20,
        }
    )

    async with create_session(require_token()) as session:
        payload = await build_payload(session, settings)

    print(json.dumps(payload, ensure_ascii=False, indent=2))


asyncio.run(main())
PY

Note

В реальном пути cargo_add → create_cargo_request сервис дополнительно проставляет contact_id из DEFAULT_REQUEST_SETTINGS, поэтому в payload обычно присутствует поле contactId.

Warning

Для этого примера нужны LARDI_API и LARDI_BASE_URL.
Если город/тип кузова/тип загрузки не находится, значит в settings передано значение, которого нет в справочнике Lardi — возьмите точное имя из сообщения об ошибке (там перечисляются доступные варианты).

Функция build_dates(settings) возвращает {"dateFrom": "...", "dateTo": "..."}. При текстовых датах использует dateparser.parse и поддерживает фразы вроде «завтра», «через 2 дня», автоматически сдвигая год вперёд, если дата уже в прошлом и год не указан.

Вспомогательная функция get_setting(settings, *keys) ищет значение по нескольким ключам (snake_case и camelCase) и возвращает первое непустое. Набор _ensure_*, _coerce_* и _parse_date_text обеспечивает строгую проверку типов: при некорректных значениях выбрасывается ValueError с подробным текстом.

Расчёт маржи и оплат

pricing.py

Модуль pricing.py инкапсулирует правила расчёта маржи и выплат перевозчику. Перечисление DealType задаёт тип сделки (например, soft или bank_transfer), а класс PricingRule хранит параметры маржинального коридора: маржу по умолчанию и нижнюю границу (минимальную маржу). Функция compute_offer(gross_amount, deal_type, margin_level) принимает исходную сумму и тип сделки, нормализует и проверяет сумму, выбирает подходящую маржу в зависимости от уровня (default или floor), рассчитывает выплаты и возвращает объект PricingResult. В этом объекте содержатся все интересующие величины: выбранная маржа, выплаты по дефолтной и минимальной марже и признак того, была ли маржа «поджата» к границам.

Создание заявок

cargo_service.py

В cargo_service.py собрана высокоуровневая логика, связанная с созданием заявок.

Вспомогательная функция:

async def _create_and_return(session: httpx.AsyncClient, settings: Mapping[str, Any]) -> Dict[str, Any]:
    ...
  • собирает payload через build_payload(session, settings);
  • вызывает preview_payload(payload) (лог‑превью);
  • отправляет запрос create_cargo(session, payload) в Lardi;
  • при LardiAPIError возвращает {"success": False, "message": ..., "payload": payload, "status_code": ..., "error": ...};
  • при успехе возвращает {"success": True, "payload": payload, "response": data}.

Основная функция:

async def create_cargo_request(
    settings: Optional[Mapping[str, Any]] = None,
    onec_customer: Optional[Mapping[str, Any]] = None,
) -> Dict[str, Any]:
    ...

Назначение — создать заявку на груз, сохранив профиль клиента для 1С, при необходимости рассчитав маржу, и записать заявку в cargo_requests. Подробный алгоритм описан в разделе выше: извлечение one_c/onec_customer, сохранение профиля через store_profile_async, вычисление маржи через compute_offer, вызов Lardi и запись в cargo_requests через record_cargo_request_async. Наружу возвращается «безопасный» результат, где чувствительные ценовые поля убраны, но есть превью payload и ответа, а также сводка по марже.

cargo_storage.py и transporter_match_storage.py

Модули cargo_storage и transporter_match_storage отвечают за запись данных в базу.

cargo_storage.record_cargo_request_async(...):

  • нормализует входные значения (идентификаторы, цену, валюту, статус перевозчика);
  • сериализует настройки заявки и их «человекочитаемую» версию в JSON;
  • вставляет запись в cargo_requests внутри асинхронного контекста session_scope();
  • при ошибках возвращает словарь {"success": False, "message": ..., "details": ...}, при успехе — {"success": True, "message": ...}.

transporter_match_storage.store_transporter_match(...):

  • проверяет корректность lardi_request_id и user_id перевозчика, статус (pending/confirmed/rejected) и текст confirmed_capabilities;
  • выполняет upsert по паре (lardi_request_id, user_id): при наличии записи обновляет статус и параметры, при отсутствии — создаёт новую строку в transporter_matches;
  • возвращает {"success": True, "message": ..., "match_id": ..., "created": bool, "updated": bool} или, при ошибках, success: False с сообщением и деталями.

Профили 1С и отправка сделок

onec_profiles.py

Модуль onec_profiles.py отвечает за сохранение и загрузку профиля 1С в поле users.onec_profile.

Основные элементы:

  • исключение OneCProfileError — выбрасывается при некорректном профиле или ошибках сохранения;
  • _normalize_user_id(value) — нормализует user_id для операций с профилем, выбрасывая OneCProfileError при ошибках;
  • _normalize_profile(data, required_fields=None) — проверяет структуру профиля:
  • гарантирует, что onec_profile — словарь;
  • учитывает обязательные поля по умолчанию (name) плюс опциональную обязательность okpo;
  • нормализует name и okpo как непустые строки;
  • проверяет, что профиль сериализуем в JSON.

Функции верхнего уровня (используются только в async‑контексте):

async def store_profile_async(user_id: Any, profile: Mapping[str, Any], *, require_okpo: Optional[bool] = None) -> Dict[str, Any]
async def fetch_profile_async(user_id: Any) -> Optional[Dict[str, Any]]
  • store_profile_async:
  • нормализует user_id и профиль;
  • валидирует обязательные поля (name, и опционально okpo при require_okpo=True);
  • сериализует профиль в JSON и сохраняет его в users.onec_profile;
  • возвращает нормализованный словарь профиля;
  • при ошибках валидации/сохранения выбрасывает OneCProfileError.
  • fetch_profile_async:
  • загружает JSON из users.onec_profile и десериализует его;
  • проверяет, что профиль — словарь и содержит непустое name (и при наличии — нормализует okpo);
  • при повреждённом JSON выбрасывает OneCProfileError;
  • возвращает None, если профиль отсутствует.

onec_dispatcher.py

Модуль onec_dispatcher.py реализует отправку данных о сделке в 1С.

Основные элементы:

  • исключение OneCDispatchError — выбрасывается при ошибках подготовки или отправки сделки;
  • _load_endpoint_url() — читает ONEC_DISPATCH_URL из окружения и выбрасывает OneCDispatchError, если URL не задан;
  • _normalize_value(value, field_name) — проверяет, что значение поля непустое.

Функция:

def build_onec_payload(values: Mapping[str, Any]) -> Dict[str, str]:
    ...
  • ожидает, что в values есть все логические поля: deal_type, external_id, customer_okpo, customer_name, contractor_okpo, contractor_name, customer_rate, carrier_rate;
  • формирует словарь с русскоязычными ключами, которые ждёт 1С (ТипСделки, ИдВнешний, ОКПОЗаказчика, НаименованиеЗаказчика, ОКПОИсполнителя, НаименованиеИсполнителя, СтавкаОплатыЗаказчика, СтавкаОплатыПеревозчика);
  • нормализует значения, выбрасывая OneCDispatchError при пустых полях.

Асинхронная функция:

async def dispatch_onec_deal(
    values: Mapping[str, Any],
    *,
    endpoint: Optional[str] = None,
    timeout_seconds: Optional[int] = None,
) -> Dict[str, Any]:
    ...
  • собирает payload через build_onec_payload;
  • определяет URL (ONEC_DISPATCH_URL или явный endpoint);
  • формирует таймаут (DEFAULT_TIMEOUT_SECONDS или переданное значение);
  • создаёт httpx.AsyncClient и отправляет POST с JSON‑payload и заголовком Authorization из ONEC_DISPATCH_AUTH (если он задан);
  • читает ответ, пытаясь распарсить JSON, но при неуспехе возвращает сырую строку;
  • возвращает словарь:
{
  "success": true|false,
  "status": <HTTP статус>,
  "payload": { ... },   // отправленный payload
  "response": { ... } | "<сырой текст>"
}
  • при сетевых ошибках (httpx.RequestError) или таймауте (httpx.TimeoutException) выбрасывает OneCDispatchError.

История диалога

В текущей версии MCP‑сервера нет отдельного доменного модуля, который «ведёт диалог» и пишет новые строки в dialogue_history от имени моделей. Историю диалога записывают сервисы‑клиенты:

  • telegram-bot пишет сообщения канала telegram через telegram-bot/services/mysql_memory.py;
  • sip-hook пишет сообщения канала sip через sip_hook_app/history_sync.py и sip-hook/db.py.

Со стороны MCP‑сервера таблица dialogue_history используется:

  • инструментом get_dialogue_history (чтение истории);
  • модулем user_clone.py (перенос истории на «клона» пользователя при switch_user_role).

Клонирование пользователя

user_clone.py

Модуль реализует сценарий «клонирования» пользователя с переносом связанных данных.

Вспомогательные функции:

  • _normalize_user_id(value: Any) -> int — проверяет, что user_id задан, приводится к целому числу и больше нуля; иначе выбрасывает ValueError.
  • _generate_clone_id(session: AsyncSession, source_id: int) -> Optional[int] — в пределах _MAX_ATTEMPTS подбирает свободный user_id для клона, добавляя к исходному source_id случайное число в заданном диапазоне. При удаче возвращает свободный user_id, при неудаче — None.

Основная функция:

async def clone_user_context(user_id: Any) -> Dict[str, Any]:
    ...

Параметры clone_user_context

Name Type Description Default
user_id Any Входной идентификатор пользователя; нормализуется в положительное целое через _normalize_user_id.

Ход работы:

  1. Нормализует входной user_id через _normalize_user_id. При ошибке возвращает {"success": False, "message": ...}.
  2. Открывает сессию SessionLocal и ищет исходного пользователя в таблице users.
  3. Если пользователь не найден — возвращает success: False и сообщение.
  4. Подбирает новый clone_user_id через _generate_clone_id. При неудаче — success: False, сообщение о невозможности подобрать идентификатор.
  5. Создаёт строку‑клон в таблице users:
  6. user_id = clone_user_id;
  7. tg_id = None, phone = None (чтобы избежать конфликтов уникальности);
  8. role и onec_profile копируются с исходного пользователя.
  9. Переносит связанные записи:
  10. dialogue_history: обновляет user_id с исходного на clone_user_id;
  11. cargo_requests: то же самое;
  12. transporter_matches: то же самое. Количество перенесённых записей по каждой таблице сохраняется в history_moved, cargo_moved, transporter_moved.
  13. Очищает onec_profile у исходного пользователя.
  14. Выполняет commit. При исключениях SQLAlchemyError откатывает транзакцию, логирует ошибку и возвращает success: False с деталями. При неожиданных исключениях — также откат и защитное сообщение.

Результат:

{
  "success": true,
  "message": "Користувача клоновано з перенесенням історії та заявок.",
  "user_id": <старый user_id>,
  "clone_user_id": <новый user_id>,
  "history_moved": <количество записей dialogue_history>,
  "cargo_moved": <количество записей cargo_requests>,
  "transporter_matches_moved": <количество записей transporter_matches>
}

Note

Значение message приведено как в коде (сейчас оно на украинском). По‑русски: «Пользователь клонирован с переносом истории и заявок».

MCP‑инструмент switch_user_role использует эту функцию, но скрывает поля user_id и clone_user_id в своём ответе.

Сервис идентичности

identity.py

Модуль отвечает за нормализацию телефонных номеров и привязку разных идентификаторов (Telegram‑ID и телефон) к единому пользователю.

Функция:

def normalize_phone(raw: str) -> str:
    ...

Нормализует номер телефона до формата 380XXXXXXXXX (E.164 без плюса):

  • удаляет все символы, кроме цифр;
  • если после очистки ровно 10 цифр и номер начинается с 0 — превращает его в 380 + последние 9 цифр;
  • если номер начинается с 380 и длина 12 цифр — возвращает как есть;
  • во всех остальных случаях выбрасывает ValueError с пояснением.

Основная функция:

async def verify_user(
    *,
    tg_id: Optional[int] = None,
    phone: Optional[str] = None,
) -> Dict[str, Any]:
    ...

Назначение — проверить или создать пользователя в таблице users по tg_id и/или телефону, обеспечив, что оба идентификатора указывают на одну строку.

Параметры verify_user

Name Type Description Default
tg_id Optional[int] Telegram‑ID пользователя; при наличии используется для поиска/создания записи в таблице users. None
phone Optional[str] Номер телефона; нормализуется функцией normalize_phone до формата 380XXXXXXXXX. None

Если не передан ни tg_id, ни phone, функция возвращает:

{"success": false, "message": "Потрібно передати хоча б один ідентифікатор: tg_id або phone."}

Note

Поле message — строка из кода (сейчас на украинском). По‑русски: «Нужно передать хотя бы один идентификатор: tg_id или phone».

Если передан phone, он нормализуется; при ошибке нормализации возвращается { "success": False, "message": ... }.

Внутри открывается сессия SessionLocal и выполняются запросы:

  • поиск пользователя по tg_id (user_by_tg);
  • поиск пользователя по phone (user_by_phone).

Дальнейшая логика делится на три основные ветки:

  1. tg_id и phone заданы одновременно:
  2. если ни по tg_id, ни по phone пользователя нет — создаётся новый User(tg_id=int(tg_id), phone=normalized_phone), выполняется flush и commit и возвращается created: True;
  3. если оба пользователя найдены и у них разные user_id — текущая реализация перепривязывает tg_id от «телеграмного» пользователя к пользователю, найденному по телефону, снимая tg_id со старой записи и записывая в новую, затем возвращает linked: True;
  4. если оба идентификатора принадлежат одному пользователю — возвращается created: False, linked: False;
  5. если пользователь найден только по tg_id — к нему привязывается телефон;
  6. если только по телефону — к нему привязывается tg_id.

  7. Передан только tg_id:

  8. при отсутствии пользователя по tg_id создаётся новый User(tg_id=int(tg_id));
  9. при наличии — возвращается существующий пользователь.

  10. Передан только phone:

  11. при отсутствии пользователя по нормализованному телефону — создаётся User(phone=normalized_phone);
  12. при наличии — возвращается существующий.

В случае ошибок уровня БД (SQLAlchemyError) функция логирует исключение и возвращает:

{
  "success": false,
  "message": "Не вдалося виконати verify_user.",
  "details": "<текст ошибки>"
}

Note

Поле message — строка из кода (сейчас на украинском). По‑русски: «Не удалось выполнить verify_user».

Побочные эффекты:

  • создание новых строк в users;
  • изменение полей tg_id, phone и onec_profile (через последующие сценарии);
  • косвенное влияние на дальнейшие сценарии клонирования (user_clone) и работы с MCP‑инструментами (verify_user, switch_user_role).

Логирование вызовов инструментов

mcp_logging.py

Файл prime_lardi/mcp_logging.py содержит класс ToolLoggingMiddleware. Этот middleware подключается к экземпляру FastMCP в server.py и перехватывает все вызовы MCP‑инструментов. Для каждого вызова он логирует имя инструмента и переданные аргументы, а после выполнения записывает в лог либо структурированный результат (structured_content), либо компактное текстовое представление содержимого. Такая прослойка упрощает отладку и позволяет видеть, как именно модели используют инструменты MCP‑сервера.