Files
rspade_system/app/RSpade/Commands/Rsx/Task_Process_Command.php
root 29c657f7a7 Exclude tests directory from framework publish
Add 100+ automated unit tests from .expect file specifications
Add session system test
Add rsx:constants:regenerate command test
Add rsx:logrotate command test
Add rsx:clean command test
Add rsx:manifest:stats command test
Add model enum system test
Add model mass assignment prevention test
Add rsx:check command test
Add migrate:status command test

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-25 03:59:58 +00:00

376 lines
13 KiB
PHP

<?php
/**
* CODING CONVENTION:
* This file follows the coding convention where variable_names and function_names
* use snake_case (underscore_wherever_possible).
*/
namespace App\RSpade\Commands\Rsx;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use App\RSpade\Core\Task\Task;
use App\RSpade\Core\Task\Task_Instance;
use App\RSpade\Core\Task\Task_Status;
use App\RSpade\Core\Task\Task_Lock;
use App\RSpade\Core\Task\Cron_Parser;
use Symfony\Component\Process\Process;
/**
* Task Process Command
*
* Main worker manager for the task system. Handles:
* - Processing queued tasks
* - Executing scheduled tasks
* - Spawning worker processes
* - Detecting and recovering stuck tasks
*
* This command is designed to run via cron every minute:
* * * * * * cd /var/www/html && php artisan rsx:task:process
*
* It will:
* 1. Check for stuck tasks and mark them as failed
* 2. Process scheduled tasks that are due
* 3. Spawn workers for queued tasks (up to concurrency limits)
* 4. Exit quickly if no work to do
*/
class Task_Process_Command extends Command
{
protected $signature = 'rsx:task:process
{--queue= : Process only this specific queue}
{--once : Process one task and exit (for testing)}
{--force-scheduled : Run all scheduled tasks immediately (ignore next_run_at)}';
protected $description = 'Process queued and scheduled tasks (run via cron every minute)';
public function handle()
{
$specific_queue = $this->option('queue');
$once_mode = $this->option('once');
$this->info('[TASK PROCESSOR] Starting task processor');
// Step 1: Detect and recover stuck tasks
$this->detect_stuck_tasks();
// Step 2: Process scheduled tasks
if (!$specific_queue) {
$this->process_scheduled_tasks();
}
// Step 3: Spawn workers for queued tasks
$this->spawn_workers_for_queues($specific_queue);
// If in once mode, process one task directly (for testing)
if ($once_mode) {
$this->process_one_task($specific_queue ?? 'default');
}
$this->info('[TASK PROCESSOR] Task processor complete');
return 0;
}
/**
* Detect and mark stuck tasks as failed
*
* A task is considered stuck if:
* - Status is 'running'
* - Started more than timeout seconds ago
* - Worker PID is dead or missing
*/
private function detect_stuck_tasks(): void
{
$cleanup_after = config('rsx.tasks.cleanup_stuck_after', 1800);
$stuck_tasks = DB::table('_task_queue')
->where('status', Task_Status::RUNNING)
->where('started_at', '<', now()->subSeconds($cleanup_after))
->get();
foreach ($stuck_tasks as $task) {
// Check if worker process is still alive
$worker_alive = $task->worker_pid && posix_kill($task->worker_pid, 0);
if (!$worker_alive) {
$this->warn("[STUCK TASK] Marking task {$task->id} as failed (worker PID {$task->worker_pid} not responding)");
DB::table('_task_queue')
->where('id', $task->id)
->update([
'status' => Task_Status::FAILED,
'error' => 'Task stuck - worker process not responding',
'completed_at' => now(),
'updated_at' => now(),
]);
}
}
}
/**
* Process scheduled tasks that are due
*
* Scans manifest for #[Schedule] attributes and ensures each has a database record.
* Creates pending task instances for any scheduled tasks that are due.
*/
private function process_scheduled_tasks(): void
{
$force_all = $this->option('force-scheduled');
// Get all scheduled tasks from manifest
$scheduled_tasks = Task::get_scheduled_tasks();
if (empty($scheduled_tasks)) {
return;
}
$this->info("[SCHEDULED] Found " . count($scheduled_tasks) . " scheduled task(s) in manifest");
foreach ($scheduled_tasks as $task_def) {
$this->process_scheduled_task($task_def, $force_all);
}
}
/**
* Process a single scheduled task definition
*
* @param array $task_def Task definition from manifest
* @param bool $force_run If true, run immediately regardless of schedule
*/
private function process_scheduled_task(array $task_def, bool $force_run): void
{
$class = $task_def['class'];
$method = $task_def['method'];
$cron_expression = $task_def['cron_expression'];
$queue = $task_def['queue'];
// Check if we have a record for this scheduled task
$existing = DB::table('_task_queue')
->where('class', $class)
->where('method', $method)
->where('queue', $queue)
->whereNotNull('next_run_at')
->whereIn('status', [Task_Status::PENDING, Task_Status::RUNNING])
->first();
if (!$existing) {
// Create initial scheduled task record
$parser = new Cron_Parser($cron_expression);
$next_run_at = $parser->get_next_run_time();
$this->info("[SCHEDULED] Registering new scheduled task: {$class}::{$method}");
$this->info("[SCHEDULED] Cron: {$cron_expression}");
$this->info("[SCHEDULED] Next run: " . date('Y-m-d H:i:s', $next_run_at));
DB::table('_task_queue')->insert([
'class' => $class,
'method' => $method,
'queue' => $queue,
'status' => Task_Status::PENDING,
'params' => json_encode([]),
'next_run_at' => date('Y-m-d H:i:s', $next_run_at),
'created_at' => now(),
'updated_at' => now(),
]);
return;
}
// Check if task is due to run
$next_run_timestamp = strtotime($existing->next_run_at);
$is_due = $next_run_timestamp <= time();
if ($force_run || $is_due) {
// Task is due - dispatch it
$this->info("[SCHEDULED] Dispatching scheduled task: {$class}::{$method}");
// Create a new pending task instance
$task_id = DB::table('_task_queue')->insertGetId([
'class' => $class,
'method' => $method,
'queue' => $queue,
'status' => Task_Status::PENDING,
'params' => json_encode([]),
'scheduled_for' => now(),
'created_at' => now(),
'updated_at' => now(),
]);
// Calculate next run time and update the scheduled task record
$parser = new Cron_Parser($cron_expression);
$next_run_at = $parser->get_next_run_time();
DB::table('_task_queue')
->where('id', $existing->id)
->update([
'next_run_at' => date('Y-m-d H:i:s', $next_run_at),
'updated_at' => now(),
]);
$this->info("[SCHEDULED] Created task instance ID: {$task_id}");
$this->info("[SCHEDULED] Next run updated to: " . date('Y-m-d H:i:s', $next_run_at));
}
}
/**
* Spawn workers for queued tasks based on concurrency limits
*
* @param string|null $specific_queue If set, only process this queue
*/
private function spawn_workers_for_queues(?string $specific_queue): void
{
$queue_config = config('rsx.tasks.queues', []);
$global_max = config('rsx.tasks.global_max_workers', 1);
// Count currently running workers across all queues
$total_running = DB::table('_task_queue')
->where('status', Task_Status::RUNNING)
->count();
if ($total_running >= $global_max) {
$this->info("[WORKER SPAWN] Global limit reached ({$total_running}/{$global_max})");
return;
}
// Get queues to process
$queues_to_process = $specific_queue
? [$specific_queue => $queue_config[$specific_queue] ?? ['max_workers' => 1]]
: $queue_config;
foreach ($queues_to_process as $queue_name => $queue_settings) {
$max_workers = $queue_settings['max_workers'] ?? 1;
// Count running workers for this queue
$queue_running = DB::table('_task_queue')
->where('queue', $queue_name)
->where('status', Task_Status::RUNNING)
->count();
// Count pending tasks for this queue (exclude scheduled task records)
$queue_pending = DB::table('_task_queue')
->where('queue', $queue_name)
->where('status', Task_Status::PENDING)
->whereNull('next_run_at') // Exclude scheduled task records
->where(function ($query) {
$query->whereNull('scheduled_for')
->orWhere('scheduled_for', '<=', now());
})
->count();
if ($queue_pending === 0) {
continue;
}
// Calculate how many workers we can spawn
$can_spawn_global = $global_max - $total_running;
$can_spawn_queue = $max_workers - $queue_running;
$can_spawn = min($can_spawn_global, $can_spawn_queue, $queue_pending);
if ($can_spawn > 0) {
$this->info("[WORKER SPAWN] Spawning {$can_spawn} worker(s) for queue '{$queue_name}'");
for ($i = 0; $i < $can_spawn; $i++) {
$this->spawn_worker($queue_name);
$total_running++;
}
}
}
}
/**
* Spawn a worker process for a queue
*
* @param string $queue_name Queue to process
*/
private function spawn_worker(string $queue_name): void
{
$process = new Process([
'php',
base_path('artisan'),
'rsx:task:worker',
'--queue=' . $queue_name,
]);
$process->setTimeout(null);
$process->start();
$this->info("[WORKER SPAWN] Started worker process {$process->getPid()} for queue '{$queue_name}'");
}
/**
* Process one task directly (for testing)
*
* @param string $queue_name Queue to process from
*/
private function process_one_task(string $queue_name): void
{
$this->info("[ONCE MODE] Processing one task from queue '{$queue_name}'");
// Use lock to ensure atomic task selection
$lock = new Task_Lock("task_queue_{$queue_name}");
if (!$lock->acquire()) {
$this->warn("[ONCE MODE] Could not acquire lock for queue '{$queue_name}'");
return;
}
try {
// Select next pending task (exclude scheduled task records)
$task_row = DB::table('_task_queue')
->where('queue', $queue_name)
->where('status', Task_Status::PENDING)
->whereNull('next_run_at') // Exclude scheduled task records
->where(function ($query) {
$query->whereNull('scheduled_for')
->orWhere('scheduled_for', '<=', now());
})
->orderBy('created_at', 'asc')
->lockForUpdate()
->first();
if (!$task_row) {
$this->info("[ONCE MODE] No pending tasks in queue '{$queue_name}'");
$lock->release();
return;
}
// Mark as running
DB::table('_task_queue')
->where('id', $task_row->id)
->update([
'status' => Task_Status::RUNNING,
'started_at' => now(),
'worker_pid' => getmypid(),
'updated_at' => now(),
]);
$lock->release();
// Execute the task
$this->info("[ONCE MODE] Executing task {$task_row->id}: {$task_row->class}::{$task_row->method}");
$task_instance = Task_Instance::find($task_row->id);
try {
$class = $task_row->class;
$method = $task_row->method;
$params = json_decode($task_row->params, true) ?? [];
$result = $class::$method($task_instance, $params);
$task_instance->mark_completed($result);
$this->info("[ONCE MODE] Task {$task_row->id} completed successfully");
} catch (\Exception $e) {
$task_instance->mark_failed($e->getMessage());
$this->error("[ONCE MODE] Task {$task_row->id} failed: " . $e->getMessage());
}
} finally {
if ($lock->is_locked()) {
$lock->release();
}
}
}
}