Введение
Система очередей задач (Tasks Queue) в Wepps Platform — это механизм для отложенной асинхронной обработки тяжелых или длительных операций. Задачи регистрируются в базе данных (таблица s_Tasks) и обрабатываются отдельным процессом, что позволяет не блокировать основной поток выполнения приложения и обеспечивает надежную обработку даже при сбоях.
Зачем использовать очереди задач?
Проблемы, которые решает система Tasks
- Блокировка пользовательского интерфейса - длительные операции (отправка email, генерация PDF) задерживают ответ пользователю
- Ненадежность синхронной обработки - сбой при отправке уведомления приводит к потере данных о заказе
- Отсутствие повторных попыток - при временных сбоях (недоступность почтового сервера) операция просто не выполняется
- Невозможность трекинга - нет централизованного места для отслеживания статуса выполнения операций и отладки
Когда использовать
- Отправка email-уведомлений (новые заказы, восстановление пароля, подтверждение регистрации)
- Интеграция с внешними API (платежные системы, службы доставки, мессенджеры)
- Генерация тяжелых отчетов и документов
- Обработка вебхуков от внешних сервисов
- Любые операции, которые можно выполнить асинхронно
Основные концепции
Архитектура системы
// 1. Регистрация задачи (в момент действия пользователя)
$tasks = new Tasks();
$tasks->add('order-new', ['id' => 123, 'email' => true]);
// 2. Обработка задач (периодический процесс через cron)
$botSystem = new BotSystem();
$botSystem->tasks(); // Выбирает и обрабатывает задачи
// 3. Обновление статуса (внутри обработчика)
$tasks->update($taskId, ['message' => 'email ok'], 200);
Компоненты системы:
WeppsCore\Tasks- класс для регистрации и обновления задачs_Tasks- таблица базы данных для хранения очередиBotSystem::tasks()- основной обработчик задач (запускается по расписанию)processTask()- методы в расширениях для обработки конкретных типов задач
Структура таблицы s_Tasks
Поля таблицы
| Поле | Тип | Описание |
|---|---|---|
| Id | INT | Уникальный идентификатор задачи |
| Name | VARCHAR | Тип задачи (order-new, password, yookassa и т.д.) |
| LDate | DATETIME | Дата и время создания задачи |
| IP | VARCHAR | IP-адрес источника запроса |
| Url | VARCHAR | URL запроса (для POST/HTTP задач) |
| TRequest | VARCHAR | Тип запроса (cli, http, post) |
| BRequest | TEXT | JSON с данными запроса |
| BResponse | TEXT | JSON с результатом обработки |
| HRequest | TEXT | Заголовки запроса |
| HResponse | TEXT | Заголовки ответа |
| SResponse | INT | HTTP-статус результата (200, 400, 404 и т.д.) |
| InProgress | TINYINT | Флаг: задача в обработке (0/1) |
| IsProcessed | TINYINT | Флаг: задача обработана (0/1) |
| IsHidden | TINYINT | Флаг: задача скрыта (0/1) |
Состояния задачи
// Новая задача (ожидает обработки)
InProgress = 0, IsProcessed = 0
// Задача в обработке
InProgress = 1, IsProcessed = 0
// Задача успешно обработана
InProgress = 1, IsProcessed = 1
Регистрация задач
Метод Tasks::add()
/**
* @param string $name Тип задачи (идентификатор)
* @param array $jdata Данные задачи (будут сериализованы в JSON)
* @param string $date Дата создания (по умолчанию текущая)
* @param string $ip IP-адрес (по умолчанию из $_SERVER)
* @param string $type Тип запроса: 'cli', 'http', 'post'
*/
public function add(string $name, array $jdata, string $date = '', string $ip = '', string $type = 'cli')
Примеры регистрации задач
1. Новый заказ (CartUtils)
// После успешного создания заказа
$jdata = [
'id' => (int) $orderId,
'email' => true, // Отправить email покупателю
'telegram' => true, // Отправить уведомление в Telegram
];
$tasks = new Tasks();
$tasks->add('order-new', $jdata, $order['ODate'], $order['UserIP']);
2. Восстановление пароля (Profile)
// При запросе восстановления пароля
$jdata = [
'token' => $token,
'nameFirst' => $user['nameFirst'],
'email' => $user['email']
];
$tasks = new Tasks();
$tasks->add('password', $jdata, date('Y-m-d H:i:s'), $_SERVER['REMOTE_ADDR']);
3. Вебхук от платежной системы (Yookassa)
// Обработка вебхука от ЮKassa (POST-запрос)
$jdata = [
'event' => $webhookData['event'],
'orderId' => $webhookData['object']['metadata']['order_id'],
'paymentId' => $webhookData['object']['id']
];
$tasks = new Tasks();
$tasks->add('yookassa', $jdata, '', '', 'post');
4. Подтверждение оплаты
// После успешной оплаты заказа
$responsePayment = [
'id' => $orderId,
'email' => true,
'telegram' => true
];
$tasks = new Tasks();
$tasks->add('order-payment', $responsePayment, '', '', '');
Обработка задач
Обработка задач в Wepps Platform реализована через централизованный метод BotSystem::tasks(), который содержит switch-case конструкцию для маршрутизации задач к соответствующим обработчикам.
Важно: При разработке новых обработчиков задач необходимо добавлять новые case в метод BotSystem::tasks() по аналогии с существующими. Каждый case должен вызывать метод processTask() соответствующего класса и передавать ему данные задачи и экземпляр класса Tasks. Это обеспечивает единую точку входа для всех типов задач и упрощает мониторинг системы очередей.
Основной обработчик BotSystem::tasks()
namespace WeppsExtensions\Addons\Bot;
class BotSystem extends Bot {
public function tasks() {
// Выбираем необработанные задачи (лимит 50 за раз)
$sql = "SELECT * FROM s_Tasks
WHERE IsHidden=0
AND InProgress IN (1,0)
AND IsProcessed=0
ORDER BY InProgress DESC, Id
LIMIT 50";
$res = Connect::$instance->fetch($sql);
// Если задач нет или все в обработке - выходим
if (empty($res) || $res[0]['InProgress'] == 1) {
return;
}
// Инициализируем обработчики
$tasks = new Tasks();
$cartUtils = new CartUtils();
$yookassa = new Yookassa([], $cartUtils);
$profileUtils = new ProfileUtils([]);
// Обрабатываем каждую задачу
foreach ($res as $value) {
switch($value['Name']) {
case 'order-new':
$cartUtils->processTask($value, $tasks);
break;
case 'order-payment':
$cartUtils->processPaymentTask($value, $tasks);
break;
case 'yookassa':
$yookassa->processTask($value, $tasks);
break;
case 'password':
$profileUtils->processPasswordTask($value, $tasks);
break;
case 'password-confirm':
$profileUtils->processPasswordConfirmTask($value, $tasks);
break;
case 'reg-confirm':
$profileUtils->processRegConfirmTask($value, $tasks);
break;
case 'reg-complete':
$profileUtils->processRegCompleteTask($value, $tasks);
break;
default:
// Неизвестный тип задачи
$tasks->update($value['Id'], ['message' => 'task fail'], 404);
break;
}
}
}
}
Запуск обработчика через cron
# Добавить в crontab (запуск каждую минуту)
* * * * * php /var/www/project/packages/WeppsExtensions/Addons/Bot/Bot.php tasks
Реализация обработчиков задач
1. Обработка нового заказа
namespace WeppsExtensions\Cart;
class CartUtils {
public function processTask(array $request, Tasks $tasks)
{
// Декодируем данные из BRequest
$jdata = json_decode($request['BRequest'], true);
// Получаем данные заказа
$order = $this->getOrder($jdata['id']);
if (empty($order)) {
return $tasks->update($request['Id'], ['message' => 'no order'], 400);
}
$outputMessage = "";
// Отправка email
$mail = new Mail('html');
$subject = 'Новый заказ';
$text = $order['OText']; // HTML текст заказа
// Email администратору
$mail->mail(Connect::$projectInfo['email'], $subject, $text);
// Email покупателю (если указано)
if (!empty($jdata['email'])) {
$mail->mail($order['Email'], $subject, $text);
$outputMessage .= " email ok";
}
// Отправка в Telegram (если указано)
if (!empty($jdata['telegram'])) {
$text = "<b>НОВЫЙ ЗАКАЗ</b> №{$order['Id']} / {$order['OSum']} ₽\n";
$text .= "🙋{$order['Name']}\n";
$text .= "📞{$order['Phone']}\n";
$text .= "✉️{$order['Email']}\n\n";
$text .= "#сайт_{$order['Id']}";
$tg = new Telegram();
$res = $tg->send(Connect::$projectServices['telegram']['dev'], $text);
$jdata = json_decode($res['response'], true);
$outputMessage .= ($jdata['ok'] === true) ? " telegram ok" : " telegram false";
}
// Обновляем статус задачи
$response = ['message' => trim($outputMessage)];
return $tasks->update($request['Id'], $response, 200);
}
}
2. Восстановление пароля
namespace WeppsExtensions\Profile;
class ProfileUtils {
public function processPasswordTask(array $request, Tasks $tasks)
{
// Декодируем данные
$jdata = json_decode($request['BRequest'], true);
// Формируем ссылку для сброса пароля
$url = 'https://' . Connect::$projectDev['host'] . "/profile/?token={$jdata['token']}";
// Формируем текст письма
$text = "<b>Добрый день, {$jdata['nameFirst']}!</b><br/><br/>";
$text .= "Поступил запрос на смену пароля!";
$text .= "<br/><br/>Для установки нового пароля перейдите по ссылке:";
$text .= "<br/><br/><center><a href=\"{$url}\" class=\"button\">Установить новый пароль</a></center>";
// Отправляем email
$mail = new Mail('html');
$outputMessage = "email fail";
if ($mail->mail($jdata['email'], "Восстановление доступа", $text)) {
$outputMessage = "email ok";
}
// Обновляем статус
return $tasks->update($request['Id'], ['message' => $outputMessage], 200);
}
public function processPasswordConfirmTask(array $request, Tasks $tasks)
{
$jdata = json_decode($request['BRequest'], true);
$url = 'https://' . Connect::$projectDev['host'] . "/profile/";
$text = "<b>Добрый день, {$jdata['nameFirst']}!</b><br/><br/>";
$text .= "Ваш пароль в Личном кабинете изменен!";
$text .= "<br/><br/><center><a href=\"{$url}\" class=\"button\">Перейти в Личный кабинет</a></center>";
$mail = new Mail('html');
$outputMessage = $mail->mail($jdata['email'], "Пароль изменен", $text) ? "email ok" : "email fail";
return $tasks->update($request['Id'], ['message' => $outputMessage], 200);
}
}
3. Обработка вебхука от платежной системы
namespace WeppsExtensions\Cart\Payments\Yookassa;
class Yookassa {
public function processTask(array $request, Tasks $tasks)
{
// Декодируем данные вебхука
$jdata = json_decode($request['BRequest'], true);
// Получаем информацию о платеже
$paymentInfo = $this->getPayment($jdata['paymentId']);
if (empty($paymentInfo)) {
return $tasks->update($request['Id'], ['message' => 'payment not found'], 404);
}
// Обрабатываем успешный платеж
if ($paymentInfo['status'] === 'succeeded') {
// Обновляем статус заказа
$this->updateOrderPaymentStatus($jdata['orderId'], $paymentInfo);
// Регистрируем новую задачу на отправку уведомлений
$newTasks = new Tasks();
$responsePayment = [
'id' => $jdata['orderId'],
'email' => true,
'telegram' => true
];
$newTasks->add('order-payment', $responsePayment, '', '', '');
$response = ['message' => 'payment processed'];
return $tasks->update($request['Id'], $response, 200);
}
// Платеж в другом статусе
$response = ['message' => 'payment status: ' . $paymentInfo['status']];
return $tasks->update($request['Id'], $response, 200);
}
}
4. Регистрация пользователя
namespace WeppsExtensions\Profile;
class ProfileUtils {
public function processRegConfirmTask(array $request, Tasks $tasks)
{
$jdata = json_decode($request['BRequest'], true);
// Добавляем ID задачи в токен для связи
$jwt = new Jwt();
$token = $jwt->token_decode($jdata['token']);
$token['payload']['tsk'] = $request['Id'];
$token = $jwt->token_encode($token['payload']);
$url = 'https://' . Connect::$projectDev['host'] . "/profile/?token={$token}";
$text = "<b>Добрый день, {$jdata['nameFirst']}!</b><br/><br/>";
$text .= "Пожалуйста, завершите регистрацию!";
$text .= "<br/>После перехода по ссылке и установки пароля - Ваш аккаунт будет активирован.";
$text .= "<br/><br/><center><a href=\"{$url}\" class=\"button\">Завершить регистрацию</a></center>";
$mail = new Mail('html');
$outputMessage = $mail->mail($jdata['email'], "Завершите регистрацию", $text) ? "email ok" : "email fail";
return $tasks->update($request['Id'], ['message' => $outputMessage], 200);
}
public function processRegCompleteTask(array $request, Tasks $tasks)
{
$jdata = json_decode($request['BRequest'], true);
$url = 'https://' . Connect::$projectDev['host'] . "/profile/";
$text = "<b>Добрый день, {$jdata['nameFirst']}!</b><br/><br/>";
$text .= "Вы завершили регистрацию!";
$text .= "<br/>Ваш аккаунт активирован, спасибо.";
$text .= "<br/><br/><center><a href=\"{$url}\" class=\"button\">Перейти в личный кабинет</a></center>";
$mail = new Mail('html');
$outputMessage = $mail->mail($jdata['email'], "Регистрация прошла успешно", $text) ? "email ok" : "email fail";
return $tasks->update($request['Id'], ['message' => $outputMessage], 200);
}
}
Обновление статуса задачи
Метод Tasks::update()
/**
* @param int $id Идентификатор задачи
* @param array $response Результат выполнения (будет в BResponse)
* @param int $status HTTP-статус (200, 400, 404 и т.д.)
*/
public function update(int $id, array $response, int $status = 200)
{
$row = [
'InProgress' => 1, // Помечаем как обрабатываемую
'IsProcessed' => 1, // Помечаем как обработанную
'BResponse' => json_encode($response, JSON_UNESCAPED_UNICODE),
'SResponse' => $status,
];
$prepare = Connect::$instance->prepare($row);
$sql = "UPDATE s_Tasks SET {$prepare['update']} WHERE Id = :Id";
Connect::$instance->query($sql, array_merge($prepare['row'], ['Id' => $id]));
return [
'status' => 200,
'message' => 'Задача обновлена',
'data' => [
'id' => $id,
'response' => $response,
'http_status' => $status
]
];
}
Примеры обновления
// Успешное выполнение
$tasks->update($taskId, ['message' => 'email ok, telegram ok'], 200);
// Частичный успех
$tasks->update($taskId, ['message' => 'email ok, telegram fail'], 200);
// Ошибка валидации данных
$tasks->update($taskId, ['message' => 'no order'], 400);
// Задача не найдена
$tasks->update($taskId, ['message' => 'task fail'], 404);
Лучшие практики
1. Используйте guard clause для обработки ошибок
// ❌ Плохо: глубокая вложенность
public function processTask(array $request, Tasks $tasks)
{
$jdata = json_decode($request['BRequest'], true);
$order = $this->getOrder($jdata['id']);
if (!empty($order)) {
// Много кода обработки...
$mail = new Mail('html');
// ...
return $tasks->update($request['Id'], $response, 200);
} else {
return $tasks->update($request['Id'], ['message' => 'no order'], 400);
}
}
// ✅ Хорошо: ранний return
public function processTask(array $request, Tasks $tasks)
{
$jdata = json_decode($request['BRequest'], true);
$order = $this->getOrder($jdata['id']);
// Проверка ошибок в начале
if (empty($order)) {
return $tasks->update($request['Id'], ['message' => 'no order'], 400);
}
// Основная логика без вложенности
$mail = new Mail('html');
$subject = 'Новый заказ';
// ...
return $tasks->update($request['Id'], $response, 200);
}
2. Всегда обновляйте статус задачи
// ❌ Плохо: задача навсегда останется в очереди
public function processTask(array $request, Tasks $tasks)
{
$jdata = json_decode($request['BRequest'], true);
$mail = new Mail('html');
$mail->mail($jdata['email'], 'Subject', 'Text');
// Забыли вызвать $tasks->update()
}
// ✅ Хорошо: всегда обновляем статус
public function processTask(array $request, Tasks $tasks)
{
$jdata = json_decode($request['BRequest'], true);
$mail = new Mail('html');
$outputMessage = $mail->mail($jdata['email'], 'Subject', 'Text') ? "email ok" : "email fail";
// Обязательно обновляем статус
return $tasks->update($request['Id'], ['message' => $outputMessage], 200);
}
3. Используйте информативные сообщения в response
// ❌ Плохо: неинформативное сообщение
$tasks->update($taskId, ['message' => 'ok'], 200);
// ✅ Хорошо: детальная информация
$outputMessage = "";
if ($emailSent) {
$outputMessage .= "email ok";
}
if ($telegramSent) {
$outputMessage .= " telegram ok";
}
$tasks->update($taskId, ['message' => trim($outputMessage)], 200);
// ✅ Еще лучше: структурированные данные
$tasks->update($taskId, [
'email' => $emailSent ? 'sent' : 'failed',
'telegram' => $telegramSent ? 'sent' : 'failed',
'recipients' => [$adminEmail, $customerEmail]
], 200);
4. Правильно выбирайте тип задачи
// ✅ Используйте осмысленные имена типов задач
$tasks->add('order-new', $data); // Новый заказ
$tasks->add('order-payment', $data); // Подтверждение оплаты
$tasks->add('password', $data); // Восстановление пароля
$tasks->add('password-confirm', $data); // Подтверждение смены пароля
$tasks->add('reg-confirm', $data); // Подтверждение регистрации
$tasks->add('reg-complete', $data); // Завершение регистрации
$tasks->add('yookassa', $data); // Вебхук от ЮKassa
// ❌ Плохо: неясные названия
$tasks->add('task1', $data);
$tasks->add('email', $data);
$tasks->add('process', $data);
5. Отделяйте создание задачи от обработки
// ✅ Правильная структура: разделение ответственности
class CartUtils {
// Метод создания заказа - быстрый, регистрирует задачу
public function createOrder($data) {
// Сохранение заказа в БД (быстро)
$orderId = $this->saveOrder($data);
// Регистрация задачи на отправку уведомлений
$tasks = new Tasks();
$tasks->add('order-new', ['id' => $orderId, 'email' => true]);
// Сразу возвращаем ответ пользователю
return ['id' => $orderId];
}
// Метод обработки задачи - может быть медленным
public function processTask(array $request, Tasks $tasks) {
$jdata = json_decode($request['BRequest'], true);
$order = $this->getOrder($jdata['id']);
// Медленные операции (отправка email, вызов API)
$mail = new Mail('html');
$mail->mail($order['Email'], 'Новый заказ', $order['OText']);
$tg = new Telegram();
$tg->send(Connect::$projectServices['telegram']['dev'], "Новый заказ №{$order['Id']}");
return $tasks->update($request['Id'], ['message' => 'ok'], 200);
}
}
Мониторинг и отладка
Просмотр задач через админку
https://your-site.com/_wepps/lists/s_Tasks/
В интерфейсе доступны:
- Список всех задач с фильтрацией
- Просмотр данных запроса (BRequest) и ответа (BResponse)
- Статусы выполнения (InProgress, IsProcessed)
- Время создания и IP-адрес
Запуск обработчика вручную
# Запуск через CLI
php /var/www/project/packages/WeppsExtensions/Addons/Bot/Bot.php tasks
Отладка отдельных обработчиков задач
Для тестирования и отладки конкретного обработчика задачи можно вызвать метод processTask() напрямую, без запуска всей очереди:
use WeppsCore\Tasks;
use WeppsExtensions\Cart\CartUtils;
// Подготавливаем тестовый массив задачи
$request = [
'Id' => 1,
'Name' => 'order-new',
'BRequest' => json_encode([
'id' => 123,
'email' => true,
'telegram' => true
]),
'LDate' => date('Y-m-d H:i:s'),
'IP' => '127.0.0.1'
];
// Инициализируем зависимости
$tasks = new Tasks();
$cartUtils = new CartUtils();
// Вызываем обработчик напрямую
$result = $cartUtils->processTask($request, $tasks);
// Выводим результат
print_r($result);
Примеры отладки разных обработчиков:
// Отладка восстановления пароля
use WeppsExtensions\Profile\ProfileUtils;
$request = [
'Id' => 2,
'Name' => 'password',
'BRequest' => json_encode([
'token' => 'test_token_abc123',
'nameFirst' => 'Иван',
'email' => 'test@example.com'
])
];
$profileUtils = new ProfileUtils([]);
$result = $profileUtils->processPasswordTask($request, new Tasks());
print_r($result);
// Отладка вебхука Yookassa
use WeppsExtensions\Cart\Payments\Yookassa\Yookassa;
$request = [
'Id' => 3,
'Name' => 'yookassa',
'BRequest' => json_encode([
'event' => 'payment.succeeded',
'orderId' => 456,
'paymentId' => 'payment_id_xyz789'
])
];
$yookassa = new Yookassa([], new CartUtils());
$result = $yookassa->processTask($request, new Tasks());
print_r($result);
Очистка таблицы задач
Для очистки таблицы s_Tasks используйте методы класса Tasks:
use WeppsCore\Tasks;
$tasks = new Tasks();
// Полная очистка таблицы (только если нет необработанных задач)
$result = $tasks->cleanup();
// Вернет: ['status' => 200, 'message' => 'Таблица очищена от обработанных задач', 'data' => null]
// Или: ['status' => 400, 'message' => 'Есть необработанные задачи', 'data' => ['pending_tasks' => 5]]
// Удаление старых задач (по умолчанию старше 7 дней)
$result = $tasks->removeOld();
// Удаление успешных задач старше 1 дня
$result = $tasks->removeOld(1, 200);
// Удаление задач с ошибками старше 7 дней
$result = $tasks->removeOld(7, 400);
SQL-запросы для отладки
-- Необработанные задачи
SELECT * FROM s_Tasks
WHERE IsHidden=0 AND IsProcessed=0
ORDER BY Id DESC;
-- Задачи с ошибками
SELECT * FROM s_Tasks
WHERE IsProcessed=1 AND SResponse != 200
ORDER BY Id DESC;
-- Задачи конкретного типа
SELECT * FROM s_Tasks
WHERE Name='order-new'
ORDER BY Id DESC LIMIT 10;
-- Статистика по типам задач
SELECT Name,
COUNT(*) as total,
SUM(CASE WHEN IsProcessed=1 THEN 1 ELSE 0 END) as processed,
SUM(CASE WHEN SResponse=200 THEN 1 ELSE 0 END) as success
FROM s_Tasks
WHERE IsHidden=0
GROUP BY Name;
Частые проблемы
1. Задачи не обрабатываются
// Проверьте, что cron настроен правильно
// Добавьте в crontab:
* * * * * php /var/www/project/packages/WeppsExtensions/Addons/Bot/Bot.php tasks >> /var/log/wepps_tasks.log 2>&1
// Проверьте логи
tail -f /var/log/wepps_tasks.log
Причины:
- Cron не настроен или не работает
- Неправильный путь к файлу в crontab
- Отсутствуют права на выполнение
- PHP CLI не установлен или недоступен
2. Задача зависла в статусе InProgress=1, IsProcessed=0
-- Проблема: обработчик упал во время выполнения
-- Решение: сбросить статус вручную
UPDATE s_Tasks
SET InProgress=0
WHERE Id=123 AND IsProcessed=0;
Причины:
- Fatal error в коде обработчика
- Превышено время выполнения PHP
- Обработчик не вызвал $tasks->update()
3. Задачи обрабатываются дважды
// ❌ Проблема: несколько экземпляров cron одновременно
// ✅ Решение: используйте блокировки
public function tasks() {
// Проверка блокировки
$lockFile = sys_get_temp_dir() . '/wepps_tasks.lock';
$fp = fopen($lockFile, 'w+');
if (!flock($fp, LOCK_EX | LOCK_NB)) {
// Уже выполняется
return;
}
// Обработка задач...
flock($fp, LOCK_UN);
fclose($fp);
}
4. Email не отправляется из задач
// Проверьте настройки в config.php
$projectServices = [
'mail' => [
'host' => 'smtp.example.com',
'port' => 587,
'username' => 'user@example.com',
'password' => 'password',
'from' => 'noreply@example.com',
'fromName' => 'Project Name'
]
];
// Добавьте обработку ошибок
$mail = new Mail('html');
try {
if ($mail->mail($email, $subject, $text)) {
$outputMessage = "email ok";
} else {
$outputMessage = "email fail: unknown error";
}
} catch (\Exception $e) {
$outputMessage = "email fail: " . $e->getMessage();
}
Производительность
Оптимизация обработки большого количества задач
// ❌ Неоптимально: обработка всех задач за раз
public function tasks() {
$sql = "SELECT * FROM s_Tasks WHERE IsProcessed=0"; // Может быть тысячи
$res = Connect::$instance->fetch($sql);
// ...
}
// ✅ Оптимально: пакетная обработка с лимитом
public function tasks() {
// Обрабатываем по 50 задач за раз
$sql = "SELECT * FROM s_Tasks
WHERE IsHidden=0 AND InProgress IN (1,0) AND IsProcessed=0
ORDER BY InProgress DESC, Id
LIMIT 50";
$res = Connect::$instance->fetch($sql);
// Защита от повторной обработки
if (empty($res) || $res[0]['InProgress'] == 1) {
return;
}
// Обработка...
}
Архивирование старых задач
Класс Tasks предоставляет методы для управления старыми записями:
Метод cleanup()
Очищает всю таблицу s_Tasks, если нет необработанных задач:
$tasks = new Tasks();
$result = $tasks->cleanup();
// Результат:
// Если есть необработанные задачи:
[
'status' => 400,
'message' => 'Есть необработанные задачи',
'data' => [
'pending_tasks' => 5
]
]
// Если таблица очищена:
[
'status' => 200,
'message' => 'Таблица очищена от обработанных задач',
'data' => null
]
Особенности:
- Использует
TRUNCATEдля полной очистки таблицы - Автоматически сбрасывает AUTO_INCREMENT в 1
- Проверяет наличие необработанных задач перед очисткой
- Безопасно - не удаляет задачи в процессе обработки
Метод removeOld()
Удаляет старые записи по критериям возраста и статуса:
$tasks = new Tasks();
// Удалить все обработанные задачи старше 7 дней (по умолчанию)
$result = $tasks->removeOld();
// Удалить все задачи старше 1 дня
$result = $tasks->removeOld(1);
// Удалить только успешные задачи (статус 200) старше 1 дня
$result = $tasks->removeOld(1, 200);
// Удалить задачи с ошибками (статус 400) старше 7 дней
$result = $tasks->removeOld(7, 400);
// Результат:
[
'status' => 200,
'message' => 'Удалено записей: 42',
'data' => [
'deleted' => 42,
'filter' => [
'days' => 7,
'status' => 200
]
]
]
Параметры:
$days- возраст логов в днях (по умолчанию 7)$status- фильтр по HTTP-статусу (200, 400, 404 и т.д.), еслиnull- удаляются все статусы
Примеры использования:
// Скрипт для очистки старых обработанных задач
public function archiveTasks()
{
$tasks = new Tasks();
// Удаляем успешные задачи старше 1 дня
$result1 = $tasks->removeOld(1, 200);
// Удаляем задачи с ошибками старше 7 дней
$result2 = $tasks->removeOld(7, 400);
// Удаляем все остальные задачи старше 30 дней
$result3 = $tasks->removeOld(30);
return [
'success_1day' => $result1,
'errors_7days' => $result2,
'all_30days' => $result3
];
}
Автоматизация через BotSystem:
- Добавьте метод
cleanupTasks()в классBotSystem:
namespace WeppsExtensions\Addons\Bot;
class BotSystem extends Bot {
// ... существующие методы ...
public function cleanupTasks()
{
$tasks = new Tasks();
// Вариант 1: Удаление старых задач по критериям
// Удаляем успешные задачи старше 1 дня
$result1 = $tasks->removeOld(1, 200);
echo "Успешные задачи (1 день): {$result1['message']}\n";
// Удаляем задачи с ошибками старше 7 дней
$result2 = $tasks->removeOld(7, 400);
echo "Задачи с ошибками (7 дней): {$result2['message']}\n";
// Удаляем все остальные задачи старше 30 дней
$result3 = $tasks->removeOld(30);
echo "Все задачи (30 дней): {$result3['message']}\n";
// Вариант 2: Полная очистка таблицы (если нет необработанных задач)
// $result = $tasks->cleanup();
// echo "Очистка таблицы: {$result['message']}\n";
}
}
- Добавьте
caseв файлеBot.phpдля обработки команды:
switch ($action) {
case 'tasks':
$botSystem = new BotSystem();
$botSystem->tasks();
break;
case 'cleanupTasks':
$botSystem = new BotSystem();
$botSystem->cleanupTasks();
break;
// ... другие case ...
}
Запуск вручную через Bot.php:
php /var/www/project/packages/WeppsExtensions/Addons/Bot/Bot.php cleanupTasks
Автоматический запуск через cron (каждый день в 03:00):
0 3 * * * php /var/www/project/packages/WeppsExtensions/Addons/Bot/Bot.php cleanupTasks >> /var/log/wepps_cleanup.log 2>&1
Заключение
Система очередей задач (Tasks Queue) в Wepps Platform предоставляет:
- ✅ Асинхронную обработку - не блокирует ответ пользователю
- ✅ Надежность - задачи не теряются при сбоях
- ✅ Трекинг - все действия логируются в БД
- ✅ Гибкость - легко добавлять новые типы задач
- ✅ Масштабируемость - обработка тысяч задач через пакетную обработку
- ✅ Отладка - просмотр статуса через админку
Используйте Tasks Queue для:
- Email-рассылок и уведомлений
- Интеграции с внешними API
- Обработки вебхуков
- Генерации отчетов
- Любых операций, которые можно выполнить асинхронно
Файлы:
packages/WeppsCore/Tasks.php- класс для работы с задачамиpackages/WeppsExtensions/Addons/Bot/BotSystem.php- основной обработчикpackages/WeppsExtensions/Cart/CartUtils.php- примеры обработки заказовpackages/WeppsExtensions/Profile/ProfileUtils.php- примеры обработки пользователей