option('queue'); $once_mode = $this->option('once'); $this->info('[TASK PROCESSOR] Starting task processor'); // Step 1: Detect and recover stuck tasks $this->detect_stuck_tasks(); // Step 2: Process scheduled tasks if (!$specific_queue) { $this->process_scheduled_tasks(); } // Step 3: Spawn workers for queued tasks $this->spawn_workers_for_queues($specific_queue); // If in once mode, process one task directly (for testing) if ($once_mode) { $this->process_one_task($specific_queue ?? 'default'); } $this->info('[TASK PROCESSOR] Task processor complete'); return 0; } /** * Detect and mark stuck tasks as failed * * A task is considered stuck if: * - Status is 'running' * - Started more than timeout seconds ago * - Worker PID is dead or missing */ private function detect_stuck_tasks(): void { $cleanup_after = config('rsx.tasks.cleanup_stuck_after', 1800); $stuck_tasks = DB::table('_task_queue') ->where('status', Task_Status::RUNNING) ->where('started_at', '<', now()->subSeconds($cleanup_after)) ->get(); foreach ($stuck_tasks as $task) { // Check if worker process is still alive $worker_alive = $task->worker_pid && posix_kill($task->worker_pid, 0); if (!$worker_alive) { $this->warn("[STUCK TASK] Marking task {$task->id} as failed (worker PID {$task->worker_pid} not responding)"); DB::table('_task_queue') ->where('id', $task->id) ->update([ 'status' => Task_Status::FAILED, 'error' => 'Task stuck - worker process not responding', 'completed_at' => now(), 'updated_at' => now(), ]); } } } /** * Process scheduled tasks that are due * * Scans manifest for #[Schedule] attributes and ensures each has a database record. * Creates pending task instances for any scheduled tasks that are due. */ private function process_scheduled_tasks(): void { $force_all = $this->option('force-scheduled'); // Get all scheduled tasks from manifest $scheduled_tasks = Task::get_scheduled_tasks(); if (empty($scheduled_tasks)) { return; } $this->info("[SCHEDULED] Found " . count($scheduled_tasks) . " scheduled task(s) in manifest"); foreach ($scheduled_tasks as $task_def) { $this->process_scheduled_task($task_def, $force_all); } } /** * Process a single scheduled task definition * * @param array $task_def Task definition from manifest * @param bool $force_run If true, run immediately regardless of schedule */ private function process_scheduled_task(array $task_def, bool $force_run): void { $class = $task_def['class']; $method = $task_def['method']; $cron_expression = $task_def['cron_expression']; $queue = $task_def['queue']; // Check if we have a record for this scheduled task $existing = DB::table('_task_queue') ->where('class', $class) ->where('method', $method) ->where('queue', $queue) ->whereNotNull('next_run_at') ->whereIn('status', [Task_Status::PENDING, Task_Status::RUNNING]) ->first(); if (!$existing) { // Create initial scheduled task record $parser = new Cron_Parser($cron_expression); $next_run_at = $parser->get_next_run_time(); $this->info("[SCHEDULED] Registering new scheduled task: {$class}::{$method}"); $this->info("[SCHEDULED] Cron: {$cron_expression}"); $this->info("[SCHEDULED] Next run: " . date('Y-m-d H:i:s', $next_run_at)); DB::table('_task_queue')->insert([ 'class' => $class, 'method' => $method, 'queue' => $queue, 'status' => Task_Status::PENDING, 'params' => json_encode([]), 'next_run_at' => date('Y-m-d H:i:s', $next_run_at), 'created_at' => now(), 'updated_at' => now(), ]); return; } // Check if task is due to run $next_run_timestamp = strtotime($existing->next_run_at); $is_due = $next_run_timestamp <= time(); if ($force_run || $is_due) { // Task is due - dispatch it $this->info("[SCHEDULED] Dispatching scheduled task: {$class}::{$method}"); // Create a new pending task instance $task_id = DB::table('_task_queue')->insertGetId([ 'class' => $class, 'method' => $method, 'queue' => $queue, 'status' => Task_Status::PENDING, 'params' => json_encode([]), 'scheduled_for' => now(), 'created_at' => now(), 'updated_at' => now(), ]); // Calculate next run time and update the scheduled task record $parser = new Cron_Parser($cron_expression); $next_run_at = $parser->get_next_run_time(); DB::table('_task_queue') ->where('id', $existing->id) ->update([ 'next_run_at' => date('Y-m-d H:i:s', $next_run_at), 'updated_at' => now(), ]); $this->info("[SCHEDULED] Created task instance ID: {$task_id}"); $this->info("[SCHEDULED] Next run updated to: " . date('Y-m-d H:i:s', $next_run_at)); } } /** * Spawn workers for queued tasks based on concurrency limits * * @param string|null $specific_queue If set, only process this queue */ private function spawn_workers_for_queues(?string $specific_queue): void { $queue_config = config('rsx.tasks.queues', []); $global_max = config('rsx.tasks.global_max_workers', 1); // Count currently running workers across all queues $total_running = DB::table('_task_queue') ->where('status', Task_Status::RUNNING) ->count(); if ($total_running >= $global_max) { $this->info("[WORKER SPAWN] Global limit reached ({$total_running}/{$global_max})"); return; } // Get queues to process $queues_to_process = $specific_queue ? [$specific_queue => $queue_config[$specific_queue] ?? ['max_workers' => 1]] : $queue_config; foreach ($queues_to_process as $queue_name => $queue_settings) { $max_workers = $queue_settings['max_workers'] ?? 1; // Count running workers for this queue $queue_running = DB::table('_task_queue') ->where('queue', $queue_name) ->where('status', Task_Status::RUNNING) ->count(); // Count pending tasks for this queue (exclude scheduled task records) $queue_pending = DB::table('_task_queue') ->where('queue', $queue_name) ->where('status', Task_Status::PENDING) ->whereNull('next_run_at') // Exclude scheduled task records ->where(function ($query) { $query->whereNull('scheduled_for') ->orWhere('scheduled_for', '<=', now()); }) ->count(); if ($queue_pending === 0) { continue; } // Calculate how many workers we can spawn $can_spawn_global = $global_max - $total_running; $can_spawn_queue = $max_workers - $queue_running; $can_spawn = min($can_spawn_global, $can_spawn_queue, $queue_pending); if ($can_spawn > 0) { $this->info("[WORKER SPAWN] Spawning {$can_spawn} worker(s) for queue '{$queue_name}'"); for ($i = 0; $i < $can_spawn; $i++) { $this->spawn_worker($queue_name); $total_running++; } } } } /** * Spawn a worker process for a queue * * @param string $queue_name Queue to process */ private function spawn_worker(string $queue_name): void { $process = new Process([ 'php', base_path('artisan'), 'rsx:task:worker', '--queue=' . $queue_name, ]); $process->setTimeout(null); $process->start(); $this->info("[WORKER SPAWN] Started worker process {$process->getPid()} for queue '{$queue_name}'"); } /** * Process one task directly (for testing) * * @param string $queue_name Queue to process from */ private function process_one_task(string $queue_name): void { $this->info("[ONCE MODE] Processing one task from queue '{$queue_name}'"); // Use lock to ensure atomic task selection $lock = new Task_Lock("task_queue_{$queue_name}"); if (!$lock->acquire()) { $this->warn("[ONCE MODE] Could not acquire lock for queue '{$queue_name}'"); return; } try { // Select next pending task (exclude scheduled task records) $task_row = DB::table('_task_queue') ->where('queue', $queue_name) ->where('status', Task_Status::PENDING) ->whereNull('next_run_at') // Exclude scheduled task records ->where(function ($query) { $query->whereNull('scheduled_for') ->orWhere('scheduled_for', '<=', now()); }) ->orderBy('created_at', 'asc') ->lockForUpdate() ->first(); if (!$task_row) { $this->info("[ONCE MODE] No pending tasks in queue '{$queue_name}'"); $lock->release(); return; } // Mark as running DB::table('_task_queue') ->where('id', $task_row->id) ->update([ 'status' => Task_Status::RUNNING, 'started_at' => now(), 'worker_pid' => getmypid(), 'updated_at' => now(), ]); $lock->release(); // Execute the task $this->info("[ONCE MODE] Executing task {$task_row->id}: {$task_row->class}::{$task_row->method}"); $task_instance = Task_Instance::find($task_row->id); try { $class = $task_row->class; $method = $task_row->method; $params = json_decode($task_row->params, true) ?? []; $result = $class::$method($task_instance, $params); $task_instance->mark_completed($result); $this->info("[ONCE MODE] Task {$task_row->id} completed successfully"); } catch (\Exception $e) { $task_instance->mark_failed($e->getMessage()); $this->error("[ONCE MODE] Task {$task_row->id} failed: " . $e->getMessage()); } } finally { if ($lock->is_locked()) { $lock->release(); } } } }