Files
rspade_system/app/RSpade/Commands/Rsx/Task_Process_Command.php
root 9ebcc359ae Fix code quality violations and enhance ROUTE-EXISTS-01 rule
Implement JQHTML function cache ID system and fix bundle compilation
Implement underscore prefix for system tables
Fix JS syntax linter to support decorators and grant exception to Task system
SPA: Update planning docs and wishlists with remaining features
SPA: Document Navigation API abandonment and future enhancements
Implement SPA browser integration with History API (Phase 1)
Convert contacts view page to SPA action
Convert clients pages to SPA actions and document conversion procedure
SPA: Merge GET parameters and update documentation
Implement SPA route URL generation in JavaScript and PHP
Implement SPA bootstrap controller architecture
Add SPA routing manual page (rsx:man spa)
Add SPA routing documentation to CLAUDE.md
Phase 4 Complete: Client-side SPA routing implementation
Update get_routes() consumers for unified route structure
Complete SPA Phase 3: PHP-side route type detection and is_spa flag
Restore unified routes structure and Manifest_Query class
Refactor route indexing and add SPA infrastructure
Phase 3 Complete: SPA route registration in manifest
Implement SPA Phase 2: Extract router code and test decorators
Rename Jqhtml_Component to Component and complete SPA foundation setup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-19 17:48:15 +00:00

376 lines
13 KiB
PHP
Executable File

<?php
/**
* CODING CONVENTION:
* This file follows the coding convention where variable_names and function_names
* use snake_case (underscore_wherever_possible).
*/
namespace App\RSpade\Commands\Rsx;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use App\RSpade\Core\Task\Task;
use App\RSpade\Core\Task\Task_Instance;
use App\RSpade\Core\Task\Task_Status;
use App\RSpade\Core\Task\Task_Lock;
use App\RSpade\Core\Task\Cron_Parser;
use Symfony\Component\Process\Process;
/**
* Task Process Command
*
* Main worker manager for the task system. Handles:
* - Processing queued tasks
* - Executing scheduled tasks
* - Spawning worker processes
* - Detecting and recovering stuck tasks
*
* This command is designed to run via cron every minute:
* * * * * * cd /var/www/html && php artisan rsx:task:process
*
* It will:
* 1. Check for stuck tasks and mark them as failed
* 2. Process scheduled tasks that are due
* 3. Spawn workers for queued tasks (up to concurrency limits)
* 4. Exit quickly if no work to do
*/
class Task_Process_Command extends Command
{
protected $signature = 'rsx:task:process
{--queue= : Process only this specific queue}
{--once : Process one task and exit (for testing)}
{--force-scheduled : Run all scheduled tasks immediately (ignore next_run_at)}';
protected $description = 'Process queued and scheduled tasks (run via cron every minute)';
public function handle()
{
$specific_queue = $this->option('queue');
$once_mode = $this->option('once');
$this->info('[TASK PROCESSOR] Starting task processor');
// Step 1: Detect and recover stuck tasks
$this->detect_stuck_tasks();
// Step 2: Process scheduled tasks
if (!$specific_queue) {
$this->process_scheduled_tasks();
}
// Step 3: Spawn workers for queued tasks
$this->spawn_workers_for_queues($specific_queue);
// If in once mode, process one task directly (for testing)
if ($once_mode) {
$this->process_one_task($specific_queue ?? 'default');
}
$this->info('[TASK PROCESSOR] Task processor complete');
return 0;
}
/**
* Detect and mark stuck tasks as failed
*
* A task is considered stuck if:
* - Status is 'running'
* - Started more than timeout seconds ago
* - Worker PID is dead or missing
*/
private function detect_stuck_tasks(): void
{
$cleanup_after = config('rsx.tasks.cleanup_stuck_after', 1800);
$stuck_tasks = DB::table('_task_queue')
->where('status', Task_Status::RUNNING)
->where('started_at', '<', now()->subSeconds($cleanup_after))
->get();
foreach ($stuck_tasks as $task) {
// Check if worker process is still alive
$worker_alive = $task->worker_pid && posix_kill($task->worker_pid, 0);
if (!$worker_alive) {
$this->warn("[STUCK TASK] Marking task {$task->id} as failed (worker PID {$task->worker_pid} not responding)");
DB::table('_task_queue')
->where('id', $task->id)
->update([
'status' => Task_Status::FAILED,
'error' => 'Task stuck - worker process not responding',
'completed_at' => now(),
'updated_at' => now(),
]);
}
}
}
/**
* Process scheduled tasks that are due
*
* Scans manifest for #[Schedule] attributes and ensures each has a database record.
* Creates pending task instances for any scheduled tasks that are due.
*/
private function process_scheduled_tasks(): void
{
$force_all = $this->option('force-scheduled');
// Get all scheduled tasks from manifest
$scheduled_tasks = Task::get_scheduled_tasks();
if (empty($scheduled_tasks)) {
return;
}
$this->info("[SCHEDULED] Found " . count($scheduled_tasks) . " scheduled task(s) in manifest");
foreach ($scheduled_tasks as $task_def) {
$this->process_scheduled_task($task_def, $force_all);
}
}
/**
* Process a single scheduled task definition
*
* @param array $task_def Task definition from manifest
* @param bool $force_run If true, run immediately regardless of schedule
*/
private function process_scheduled_task(array $task_def, bool $force_run): void
{
$class = $task_def['class'];
$method = $task_def['method'];
$cron_expression = $task_def['cron_expression'];
$queue = $task_def['queue'];
// Check if we have a record for this scheduled task
$existing = DB::table('_task_queue')
->where('class', $class)
->where('method', $method)
->where('queue', $queue)
->whereNotNull('next_run_at')
->whereIn('status', [Task_Status::PENDING, Task_Status::RUNNING])
->first();
if (!$existing) {
// Create initial scheduled task record
$parser = new Cron_Parser($cron_expression);
$next_run_at = $parser->get_next_run_time();
$this->info("[SCHEDULED] Registering new scheduled task: {$class}::{$method}");
$this->info("[SCHEDULED] Cron: {$cron_expression}");
$this->info("[SCHEDULED] Next run: " . date('Y-m-d H:i:s', $next_run_at));
DB::table('_task_queue')->insert([
'class' => $class,
'method' => $method,
'queue' => $queue,
'status' => Task_Status::PENDING,
'params' => json_encode([]),
'next_run_at' => date('Y-m-d H:i:s', $next_run_at),
'created_at' => now(),
'updated_at' => now(),
]);
return;
}
// Check if task is due to run
$next_run_timestamp = strtotime($existing->next_run_at);
$is_due = $next_run_timestamp <= time();
if ($force_run || $is_due) {
// Task is due - dispatch it
$this->info("[SCHEDULED] Dispatching scheduled task: {$class}::{$method}");
// Create a new pending task instance
$task_id = DB::table('_task_queue')->insertGetId([
'class' => $class,
'method' => $method,
'queue' => $queue,
'status' => Task_Status::PENDING,
'params' => json_encode([]),
'scheduled_for' => now(),
'created_at' => now(),
'updated_at' => now(),
]);
// Calculate next run time and update the scheduled task record
$parser = new Cron_Parser($cron_expression);
$next_run_at = $parser->get_next_run_time();
DB::table('_task_queue')
->where('id', $existing->id)
->update([
'next_run_at' => date('Y-m-d H:i:s', $next_run_at),
'updated_at' => now(),
]);
$this->info("[SCHEDULED] Created task instance ID: {$task_id}");
$this->info("[SCHEDULED] Next run updated to: " . date('Y-m-d H:i:s', $next_run_at));
}
}
/**
* Spawn workers for queued tasks based on concurrency limits
*
* @param string|null $specific_queue If set, only process this queue
*/
private function spawn_workers_for_queues(?string $specific_queue): void
{
$queue_config = config('rsx.tasks.queues', []);
$global_max = config('rsx.tasks.global_max_workers', 1);
// Count currently running workers across all queues
$total_running = DB::table('_task_queue')
->where('status', Task_Status::RUNNING)
->count();
if ($total_running >= $global_max) {
$this->info("[WORKER SPAWN] Global limit reached ({$total_running}/{$global_max})");
return;
}
// Get queues to process
$queues_to_process = $specific_queue
? [$specific_queue => $queue_config[$specific_queue] ?? ['max_workers' => 1]]
: $queue_config;
foreach ($queues_to_process as $queue_name => $queue_settings) {
$max_workers = $queue_settings['max_workers'] ?? 1;
// Count running workers for this queue
$queue_running = DB::table('_task_queue')
->where('queue', $queue_name)
->where('status', Task_Status::RUNNING)
->count();
// Count pending tasks for this queue (exclude scheduled task records)
$queue_pending = DB::table('_task_queue')
->where('queue', $queue_name)
->where('status', Task_Status::PENDING)
->whereNull('next_run_at') // Exclude scheduled task records
->where(function ($query) {
$query->whereNull('scheduled_for')
->orWhere('scheduled_for', '<=', now());
})
->count();
if ($queue_pending === 0) {
continue;
}
// Calculate how many workers we can spawn
$can_spawn_global = $global_max - $total_running;
$can_spawn_queue = $max_workers - $queue_running;
$can_spawn = min($can_spawn_global, $can_spawn_queue, $queue_pending);
if ($can_spawn > 0) {
$this->info("[WORKER SPAWN] Spawning {$can_spawn} worker(s) for queue '{$queue_name}'");
for ($i = 0; $i < $can_spawn; $i++) {
$this->spawn_worker($queue_name);
$total_running++;
}
}
}
}
/**
* Spawn a worker process for a queue
*
* @param string $queue_name Queue to process
*/
private function spawn_worker(string $queue_name): void
{
$process = new Process([
'php',
base_path('artisan'),
'rsx:task:worker',
'--queue=' . $queue_name,
]);
$process->setTimeout(null);
$process->start();
$this->info("[WORKER SPAWN] Started worker process {$process->getPid()} for queue '{$queue_name}'");
}
/**
* Process one task directly (for testing)
*
* @param string $queue_name Queue to process from
*/
private function process_one_task(string $queue_name): void
{
$this->info("[ONCE MODE] Processing one task from queue '{$queue_name}'");
// Use lock to ensure atomic task selection
$lock = new Task_Lock("task_queue_{$queue_name}");
if (!$lock->acquire()) {
$this->warn("[ONCE MODE] Could not acquire lock for queue '{$queue_name}'");
return;
}
try {
// Select next pending task (exclude scheduled task records)
$task_row = DB::table('_task_queue')
->where('queue', $queue_name)
->where('status', Task_Status::PENDING)
->whereNull('next_run_at') // Exclude scheduled task records
->where(function ($query) {
$query->whereNull('scheduled_for')
->orWhere('scheduled_for', '<=', now());
})
->orderBy('created_at', 'asc')
->lockForUpdate()
->first();
if (!$task_row) {
$this->info("[ONCE MODE] No pending tasks in queue '{$queue_name}'");
$lock->release();
return;
}
// Mark as running
DB::table('_task_queue')
->where('id', $task_row->id)
->update([
'status' => Task_Status::RUNNING,
'started_at' => now(),
'worker_pid' => getmypid(),
'updated_at' => now(),
]);
$lock->release();
// Execute the task
$this->info("[ONCE MODE] Executing task {$task_row->id}: {$task_row->class}::{$task_row->method}");
$task_instance = Task_Instance::find($task_row->id);
try {
$class = $task_row->class;
$method = $task_row->method;
$params = json_decode($task_row->params, true) ?? [];
$result = $class::$method($task_instance, $params);
$task_instance->mark_completed($result);
$this->info("[ONCE MODE] Task {$task_row->id} completed successfully");
} catch (\Exception $e) {
$task_instance->mark_failed($e->getMessage());
$this->error("[ONCE MODE] Task {$task_row->id} failed: " . $e->getMessage());
}
} finally {
if ($lock->is_locked()) {
$lock->release();
}
}
}
}