Доменная логика
Этот раздел описывает модули в каталоге prime_lardi/, которые реализуют бизнес‑логику MCP‑сервера. Для ключевых модулей приведены сигнатуры функций, формат входных данных, структура результата и побочные эффекты.
Константы и справочники
constants.py
В этом модуле собраны константы, которые задают базовое поведение интеграции с Lardi. DEFAULT_REQUEST_SETTINGS содержит типовую конфигурацию заявки: города отправления и назначения, типы кузова, габариты, параметры оплаты и специальные флаги. Кортеж REQUIRED_SETTINGS определяет минимальный набор полей, без которых Lardi не примет заявку. Словарь CURRENCY_SYNONYMS помогает приводить пользовательские обозначения валют к каноническим названиям, понятным API. Регулярные выражения ISO_DATE_RE и YEAR_PATTERN используются при проверке и разборе дат. Все эти значения применяются при сборке и валидации payload.
references.py
Модуль references.py отвечает за работу со справочниками Lardi. Здесь определены функции, которые по человекочитаемым названиям (тип кузова, тип загрузки, валюта, вид упаковки, город) возвращают идентификаторы из соответствующих справочников API. Все они используют общий клиент fetch_reference из api_client.py и внутри обходятся без деталей HTTP‑уровня, предоставляя более удобный интерфейс для остальной логики.
Низкоуровневый клиент Lardi
api_client.py
Этот модуль содержит функции нижнего уровня для обращения к Lardi через httpx. require_token() читает токен из переменной окружения LARDI_API и проверяет, что он задан. create_session(token) создаёт асинхронный HTTP‑клиент с заголовками Accept: application/json и Authorization. Функция fetch_reference(session, path, params) выполняет GET‑запрос к заданному пути API и возвращает разобранный JSON. create_cargo(session, payload) отправляет POST‑запрос для создания заявки, добавляя параметр языка CREATE_LANGUAGE. В случае ошибки статуса выбрасывается LardiAPIError с кодом и деталями ответа. Дополнительные вспомогательные функции отвечают за логирование запросов и ответов и за безопасное извлечение тела ответа даже при исключениях.
Построение payload для заявок
payload_builder.py
Файл payload_builder.py отвечает за подготовку тела заявки, которое будет отправлено в Lardi. Проще всего думать о нём так: он берёт «настройки заявки» в нашей форме (человекочитаемые поля, допускающие разные ключи) и превращает их в строгий JSON‑payload, который ждёт Lardi API.
Чтобы было понятно «откуда → куда», держите в голове цепочку вызовов при создании заявки:
- Модель вызывает MCP‑инструмент
cargo_add(mcp-server/tools/lardi_add_cargo.py) и передаёт тудаsettings(то, что собрали из диалога) иuser_id. cargo_addвызываетprime_lardi.cargo_service.create_cargo_request.- Внутри
create_cargo_request: - вытаскиваются реквизиты для 1С (
settings["one_c"]) и сохраняются вusers.onec_profile; user_idтоже обрабатывается отдельно;- оставшиеся поля превращаются в «чистые настройки заявки» и проходят через
merge_settings(...); - затем
build_payload(session, settings)собирает payload Lardi.
Термины, которые дальше будут встречаться:
settings— словарь в нашей форме (обычно snake_case:source_towns,cargo_body_types,payment_valueи т. п.). В коде допускаются и camelCase‑ключи (например,sourceTowns), потому чтоget_setting(...)умеет искать по нескольким вариантам ключа.payload— словарь в формате Lardi (camelCase‑ключи:waypointListSource,paymentCurrencyId,cargoBodyTypeIdsи т. п.), который уходит вPOSTна создание заявки.
def merge_settings(user_settings: Optional[Mapping[str, Any]]) -> Dict[str, Any]:
...
Что на входе: user_settings — настройки, которые пришли в create_cargo_request (обычно это то, что модель прислала через cargo_add, уже без one_c и без user_id).
Что на выходе: словарь настроек, готовый к сборке payload.
Что делает merge_settings:
- если
user_settings is None— берётDEFAULT_REQUEST_SETTINGS(это полезно для CLI/ручных тестов); - если
user_settingsпередан — берёт только переданные ключи (и удаляет те, где значениеNone); - проверяет наличие обязательных полей из
REQUIRED_SETTINGSс учётом альтернатив: content_nameможет быть пропущен, если заданcontent_id;currency_nameможет быть пропущен, если заданpayment_currency_id;source_town/target_townмогут быть пропущены, если заданы waypoint‑списки или списки городов (source_towns/target_towns);- если
date_toне задан иoffset_duration_daysтоже не задан, подставляетoffset_duration_daysизDEFAULT_REQUEST_SETTINGS(обычно это означает «сделатьdateToравнымdateFrom»).
async def build_payload(session: httpx.AsyncClient, settings: Mapping[str, Any]) -> Dict[str, Any]:
...
Что на входе:
settings— результатmerge_settings(...)(настройки заявки в нашей форме);session—httpx.AsyncClient, через который будут запрашиваться справочники Lardi (города, валюты, типы кузова и т. п.).
Что на выходе: словарь payload в формате Lardi, который уходит в POST создания заявки.
Главная идея build_payload: на вход можно передавать «человеческие» значения (например, названия городов и валют), а на выходе получится payload с нужными ID и структурами, которые ждёт Lardi.
Основные шаги сборки (что берём → что получаем):
- формирует
waypointListSourceиwaypointListTarget: - если вы уже знаете
townId— можно передать готовые спискиwaypoint_list_source/waypoint_list_target(это список словарей вида{"townId": ..., ...}) и избежать лишних запросов к справочникам; - иначе можно передать
source_towns/target_towns(список названий городов) или одиночныеsource_town/target_town; - названия городов превращаются в точки маршрута через
fetch_town(session, name)→build_waypoint(...). - определяет валюту:
- если задан
payment_currency_id, он валидируется по справочнику/references/currencies; - иначе берётся
currency_nameи черезfetch_currency_idподбирается ID. - определяет типы кузовов:
- либо по
cargo_body_type_ids, либо поcargo_body_typesчерезfetch_body_type_ids; - определяет типы загрузки:
- либо по
load_type_ids, либо поload_typesчерезfetch_load_type_ids; - формирует упаковку:
- либо из уже «разрешённого» списка
cargo_packaging_resolved, - либо по пользовательскому списку
cargo_packagingчерезfetch_packaging; - проверяет и нормализует базовые числовые поля:
payment_value— должна быть числом; в сам payload записываетсяpaymentValue=0.0, потому что фактическая цена хранится отдельно (и используется в наших расчётах/хранении);size_mass→sizeMass— масса груза;lorryAmount— количество машин (по умолчанию 1);- добавляет в payload даты через
build_dates(settings); - дополняет payload опциональными булевыми, числовыми и строковыми полями (
cmr,groupage,t1,tir,payment_unit_id,size_volume,note,payment_priceи др.); - гарантирует, что указан либо
content_id, либоcontent_name.
Important
Сейчас в payload_builder.py жёстко ограничена валюта: поддерживается только гривна (UAH).
Если передать другую валюту (по имени или ID), сборка payload упадёт с ошибкой.
Практический пример (который можно повторить)
1) Что приходит в MCP‑инструмент cargo_add
cargo_add вызывается моделью с двумя аргументами: user_id и settings. Внутри settings должен быть блок one_c (минимум okpo и name), иначе create_cargo_request упадёт с ошибкой.
Пример валидного вызова:
{
"user_id": 42,
"settings": {
"one_c": { "okpo": "12345678", "name": "ТОВ \"Приклад\"" },
"date_from": "2026-01-15",
"source_towns": ["Київ"],
"target_towns": ["Львів"],
"content_name": "Палети",
"cargo_body_types": ["тент"],
"load_types": ["задня"],
"currency_name": "UAH",
"payment_value": 25000,
"size_mass": 20
}
}
Note
user_id берётся не «из головы»: его возвращает MCP‑инструмент verify_user (tg_id → user_id / phone → user_id).
2) Как увидеть реальный payload, который собирает build_payload
Ниже — самый простой способ воспроизвести сборку payload без создания заявки во внешнем API: мы только читаем справочники (GET) и печатаем результат build_payload.
cd mcp-server
python3 - <<'PY'
import asyncio
import json
from prime_lardi.api_client import create_session, require_token
from prime_lardi.payload_builder import build_payload, merge_settings
async def main() -> None:
settings = merge_settings(
{
"date_from": "2026-01-15",
"source_towns": ["Київ"],
"target_towns": ["Львів"],
"content_name": "Палети",
"cargo_body_types": ["тент"],
"load_types": ["задня"],
"currency_name": "UAH",
"payment_value": 25000,
"size_mass": 20,
}
)
async with create_session(require_token()) as session:
payload = await build_payload(session, settings)
print(json.dumps(payload, ensure_ascii=False, indent=2))
asyncio.run(main())
PY
Note
В реальном пути cargo_add → create_cargo_request сервис дополнительно проставляет contact_id из DEFAULT_REQUEST_SETTINGS, поэтому в payload обычно присутствует поле contactId.
Warning
Для этого примера нужны LARDI_API и LARDI_BASE_URL.
Если город/тип кузова/тип загрузки не находится, значит в settings передано значение, которого нет в справочнике Lardi — возьмите точное имя из сообщения об ошибке (там перечисляются доступные варианты).
Функция build_dates(settings) возвращает {"dateFrom": "...", "dateTo": "..."}. При текстовых датах использует dateparser.parse и поддерживает фразы вроде «завтра», «через 2 дня», автоматически сдвигая год вперёд, если дата уже в прошлом и год не указан.
Вспомогательная функция get_setting(settings, *keys) ищет значение по нескольким ключам (snake_case и camelCase) и возвращает первое непустое. Набор _ensure_*, _coerce_* и _parse_date_text обеспечивает строгую проверку типов: при некорректных значениях выбрасывается ValueError с подробным текстом.
Расчёт маржи и оплат
pricing.py
Модуль pricing.py инкапсулирует правила расчёта маржи и выплат перевозчику. Перечисление DealType задаёт тип сделки (например, soft или bank_transfer), а класс PricingRule хранит параметры маржинального коридора: маржу по умолчанию и нижнюю границу (минимальную маржу). Функция compute_offer(gross_amount, deal_type, margin_level) принимает исходную сумму и тип сделки, нормализует и проверяет сумму, выбирает подходящую маржу в зависимости от уровня (default или floor), рассчитывает выплаты и возвращает объект PricingResult. В этом объекте содержатся все интересующие величины: выбранная маржа, выплаты по дефолтной и минимальной марже и признак того, была ли маржа «поджата» к границам.
Создание заявок
cargo_service.py
В cargo_service.py собрана высокоуровневая логика, связанная с созданием заявок.
Вспомогательная функция:
async def _create_and_return(session: httpx.AsyncClient, settings: Mapping[str, Any]) -> Dict[str, Any]:
...
- собирает payload через
build_payload(session, settings); - вызывает
preview_payload(payload)(лог‑превью); - отправляет запрос
create_cargo(session, payload)в Lardi; - при
LardiAPIErrorвозвращает{"success": False, "message": ..., "payload": payload, "status_code": ..., "error": ...}; - при успехе возвращает
{"success": True, "payload": payload, "response": data}.
Основная функция:
async def create_cargo_request(
settings: Optional[Mapping[str, Any]] = None,
onec_customer: Optional[Mapping[str, Any]] = None,
) -> Dict[str, Any]:
...
Назначение — создать заявку на груз, сохранив профиль клиента для 1С, при необходимости рассчитав маржу, и записать заявку в cargo_requests. Подробный алгоритм описан в разделе выше: извлечение one_c/onec_customer, сохранение профиля через store_profile_async, вычисление маржи через compute_offer, вызов Lardi и запись в cargo_requests через record_cargo_request_async. Наружу возвращается «безопасный» результат, где чувствительные ценовые поля убраны, но есть превью payload и ответа, а также сводка по марже.
cargo_storage.py и transporter_match_storage.py
Модули cargo_storage и transporter_match_storage отвечают за запись данных в базу.
cargo_storage.record_cargo_request_async(...):
- нормализует входные значения (идентификаторы, цену, валюту, статус перевозчика);
- сериализует настройки заявки и их «человекочитаемую» версию в JSON;
- вставляет запись в
cargo_requestsвнутри асинхронного контекстаsession_scope(); - при ошибках возвращает словарь
{"success": False, "message": ..., "details": ...}, при успехе —{"success": True, "message": ...}.
transporter_match_storage.store_transporter_match(...):
- проверяет корректность
lardi_request_idиuser_idперевозчика, статус (pending/confirmed/rejected) и текстconfirmed_capabilities; - выполняет upsert по паре (
lardi_request_id,user_id): при наличии записи обновляет статус и параметры, при отсутствии — создаёт новую строку вtransporter_matches; - возвращает
{"success": True, "message": ..., "match_id": ..., "created": bool, "updated": bool}или, при ошибках,success: Falseс сообщением и деталями.
Профили 1С и отправка сделок
onec_profiles.py
Модуль onec_profiles.py отвечает за сохранение и загрузку профиля 1С в поле users.onec_profile.
Основные элементы:
- исключение
OneCProfileError— выбрасывается при некорректном профиле или ошибках сохранения; _normalize_user_id(value)— нормализуетuser_idдля операций с профилем, выбрасываяOneCProfileErrorпри ошибках;_normalize_profile(data, required_fields=None)— проверяет структуру профиля:- гарантирует, что
onec_profile— словарь; - учитывает обязательные поля по умолчанию (
name) плюс опциональную обязательностьokpo; - нормализует
nameиokpoкак непустые строки; - проверяет, что профиль сериализуем в JSON.
Функции верхнего уровня (используются только в async‑контексте):
async def store_profile_async(user_id: Any, profile: Mapping[str, Any], *, require_okpo: Optional[bool] = None) -> Dict[str, Any]
async def fetch_profile_async(user_id: Any) -> Optional[Dict[str, Any]]
store_profile_async:- нормализует
user_idи профиль; - валидирует обязательные поля (
name, и опциональноokpoприrequire_okpo=True); - сериализует профиль в JSON и сохраняет его в
users.onec_profile; - возвращает нормализованный словарь профиля;
- при ошибках валидации/сохранения выбрасывает
OneCProfileError. fetch_profile_async:- загружает JSON из
users.onec_profileи десериализует его; - проверяет, что профиль — словарь и содержит непустое
name(и при наличии — нормализуетokpo); - при повреждённом JSON выбрасывает
OneCProfileError; - возвращает
None, если профиль отсутствует.
onec_dispatcher.py
Модуль onec_dispatcher.py реализует отправку данных о сделке в 1С.
Основные элементы:
- исключение
OneCDispatchError— выбрасывается при ошибках подготовки или отправки сделки; _load_endpoint_url()— читаетONEC_DISPATCH_URLиз окружения и выбрасываетOneCDispatchError, если URL не задан;_normalize_value(value, field_name)— проверяет, что значение поля непустое.
Функция:
def build_onec_payload(values: Mapping[str, Any]) -> Dict[str, str]:
...
- ожидает, что в
valuesесть все логические поля:deal_type,external_id,customer_okpo,customer_name,contractor_okpo,contractor_name,customer_rate,carrier_rate; - формирует словарь с русскоязычными ключами, которые ждёт 1С (
ТипСделки,ИдВнешний,ОКПОЗаказчика,НаименованиеЗаказчика,ОКПОИсполнителя,НаименованиеИсполнителя,СтавкаОплатыЗаказчика,СтавкаОплатыПеревозчика); - нормализует значения, выбрасывая
OneCDispatchErrorпри пустых полях.
Асинхронная функция:
async def dispatch_onec_deal(
values: Mapping[str, Any],
*,
endpoint: Optional[str] = None,
timeout_seconds: Optional[int] = None,
) -> Dict[str, Any]:
...
- собирает payload через
build_onec_payload; - определяет URL (
ONEC_DISPATCH_URLили явныйendpoint); - формирует таймаут (
DEFAULT_TIMEOUT_SECONDSили переданное значение); - создаёт
httpx.AsyncClientи отправляетPOSTс JSON‑payload и заголовкомAuthorizationизONEC_DISPATCH_AUTH(если он задан); - читает ответ, пытаясь распарсить JSON, но при неуспехе возвращает сырую строку;
- возвращает словарь:
{
"success": true|false,
"status": <HTTP статус>,
"payload": { ... }, // отправленный payload
"response": { ... } | "<сырой текст>"
}
- при сетевых ошибках (
httpx.RequestError) или таймауте (httpx.TimeoutException) выбрасываетOneCDispatchError.
История диалога
В текущей версии MCP‑сервера нет отдельного доменного модуля, который «ведёт диалог» и пишет новые строки в dialogue_history от имени моделей. Историю диалога записывают сервисы‑клиенты:
telegram-botпишет сообщения каналаtelegramчерезtelegram-bot/services/mysql_memory.py;sip-hookпишет сообщения каналаsipчерезsip_hook_app/history_sync.pyиsip-hook/db.py.
Со стороны MCP‑сервера таблица dialogue_history используется:
- инструментом
get_dialogue_history(чтение истории); - модулем
user_clone.py(перенос истории на «клона» пользователя приswitch_user_role).
Клонирование пользователя
user_clone.py
Модуль реализует сценарий «клонирования» пользователя с переносом связанных данных.
Вспомогательные функции:
_normalize_user_id(value: Any) -> int— проверяет, чтоuser_idзадан, приводится к целому числу и больше нуля; иначе выбрасываетValueError._generate_clone_id(session: AsyncSession, source_id: int) -> Optional[int]— в пределах_MAX_ATTEMPTSподбирает свободныйuser_idдля клона, добавляя к исходномуsource_idслучайное число в заданном диапазоне. При удаче возвращает свободныйuser_id, при неудаче —None.
Основная функция:
async def clone_user_context(user_id: Any) -> Dict[str, Any]:
...
Параметры clone_user_context
| Name | Type | Description | Default |
|---|---|---|---|
user_id |
Any |
Входной идентификатор пользователя; нормализуется в положительное целое через _normalize_user_id. |
— |
Ход работы:
- Нормализует входной
user_idчерез_normalize_user_id. При ошибке возвращает{"success": False, "message": ...}. - Открывает сессию
SessionLocalи ищет исходного пользователя в таблицеusers. - Если пользователь не найден — возвращает
success: Falseи сообщение. - Подбирает новый
clone_user_idчерез_generate_clone_id. При неудаче —success: False, сообщение о невозможности подобрать идентификатор. - Создаёт строку‑клон в таблице
users: user_id = clone_user_id;tg_id = None,phone = None(чтобы избежать конфликтов уникальности);roleиonec_profileкопируются с исходного пользователя.- Переносит связанные записи:
dialogue_history: обновляетuser_idс исходного наclone_user_id;cargo_requests: то же самое;transporter_matches: то же самое. Количество перенесённых записей по каждой таблице сохраняется вhistory_moved,cargo_moved,transporter_moved.- Очищает
onec_profileу исходного пользователя. - Выполняет
commit. При исключенияхSQLAlchemyErrorоткатывает транзакцию, логирует ошибку и возвращаетsuccess: Falseс деталями. При неожиданных исключениях — также откат и защитное сообщение.
Результат:
{
"success": true,
"message": "Користувача клоновано з перенесенням історії та заявок.",
"user_id": <старый user_id>,
"clone_user_id": <новый user_id>,
"history_moved": <количество записей dialogue_history>,
"cargo_moved": <количество записей cargo_requests>,
"transporter_matches_moved": <количество записей transporter_matches>
}
Note
Значение message приведено как в коде (сейчас оно на украинском). По‑русски: «Пользователь клонирован с переносом истории и заявок».
MCP‑инструмент switch_user_role использует эту функцию, но скрывает поля user_id и clone_user_id в своём ответе.
Сервис идентичности
identity.py
Модуль отвечает за нормализацию телефонных номеров и привязку разных идентификаторов (Telegram‑ID и телефон) к единому пользователю.
Функция:
def normalize_phone(raw: str) -> str:
...
Нормализует номер телефона до формата 380XXXXXXXXX (E.164 без плюса):
- удаляет все символы, кроме цифр;
- если после очистки ровно 10 цифр и номер начинается с
0— превращает его в380+ последние 9 цифр; - если номер начинается с
380и длина 12 цифр — возвращает как есть; - во всех остальных случаях выбрасывает
ValueErrorс пояснением.
Основная функция:
async def verify_user(
*,
tg_id: Optional[int] = None,
phone: Optional[str] = None,
) -> Dict[str, Any]:
...
Назначение — проверить или создать пользователя в таблице users по tg_id и/или телефону, обеспечив, что оба идентификатора указывают на одну строку.
Параметры verify_user
| Name | Type | Description | Default |
|---|---|---|---|
tg_id |
Optional[int] |
Telegram‑ID пользователя; при наличии используется для поиска/создания записи в таблице users. |
None |
phone |
Optional[str] |
Номер телефона; нормализуется функцией normalize_phone до формата 380XXXXXXXXX. |
None |
Если не передан ни tg_id, ни phone, функция возвращает:
{"success": false, "message": "Потрібно передати хоча б один ідентифікатор: tg_id або phone."}
Note
Поле message — строка из кода (сейчас на украинском). По‑русски: «Нужно передать хотя бы один идентификатор: tg_id или phone».
Если передан phone, он нормализуется; при ошибке нормализации возвращается { "success": False, "message": ... }.
Внутри открывается сессия SessionLocal и выполняются запросы:
- поиск пользователя по
tg_id(user_by_tg); - поиск пользователя по
phone(user_by_phone).
Дальнейшая логика делится на три основные ветки:
tg_idиphoneзаданы одновременно:- если ни по
tg_id, ни поphoneпользователя нет — создаётся новыйUser(tg_id=int(tg_id), phone=normalized_phone), выполняетсяflushиcommitи возвращаетсяcreated: True; - если оба пользователя найдены и у них разные
user_id— текущая реализация перепривязываетtg_idот «телеграмного» пользователя к пользователю, найденному по телефону, снимаяtg_idсо старой записи и записывая в новую, затем возвращаетlinked: True; - если оба идентификатора принадлежат одному пользователю — возвращается
created: False,linked: False; - если пользователь найден только по
tg_id— к нему привязывается телефон; -
если только по телефону — к нему привязывается
tg_id. -
Передан только
tg_id: - при отсутствии пользователя по
tg_idсоздаётся новыйUser(tg_id=int(tg_id)); -
при наличии — возвращается существующий пользователь.
-
Передан только
phone: - при отсутствии пользователя по нормализованному телефону — создаётся
User(phone=normalized_phone); - при наличии — возвращается существующий.
В случае ошибок уровня БД (SQLAlchemyError) функция логирует исключение и возвращает:
{
"success": false,
"message": "Не вдалося виконати verify_user.",
"details": "<текст ошибки>"
}
Note
Поле message — строка из кода (сейчас на украинском). По‑русски: «Не удалось выполнить verify_user».
Побочные эффекты:
- создание новых строк в
users; - изменение полей
tg_id,phoneиonec_profile(через последующие сценарии); - косвенное влияние на дальнейшие сценарии клонирования (
user_clone) и работы с MCP‑инструментами (verify_user,switch_user_role).
Логирование вызовов инструментов
mcp_logging.py
Файл prime_lardi/mcp_logging.py содержит класс ToolLoggingMiddleware. Этот middleware подключается к экземпляру FastMCP в server.py и перехватывает все вызовы MCP‑инструментов. Для каждого вызова он логирует имя инструмента и переданные аргументы, а после выполнения записывает в лог либо структурированный результат (structured_content), либо компактное текстовое представление содержимого. Такая прослойка упрощает отладку и позволяет видеть, как именно модели используют инструменты MCP‑сервера.