$info) { // Skip non-PHP files or files without classes if (!isset($info['class']) || !isset($info['fqcn'])) { continue; } // Check if class name matches exactly (without namespace) $class_basename = basename(str_replace('\\', '/', $info['fqcn'])); if ($class_basename === $rsx_service) { $service_class = $info['fqcn']; $file_info = $info; break; } } if (!$service_class) { throw new Exception("Service class not found: {$rsx_service}"); } // Check if class exists if (!class_exists($service_class)) { throw new Exception("Service class does not exist: {$service_class}"); } // Check if it's a subclass of Rsx_Service_Abstract if (!Manifest::php_is_subclass_of($service_class, Rsx_Service_Abstract::class)) { throw new Exception("Service {$service_class} must extend Rsx_Service_Abstract"); } // Check if method exists and has Task attribute if (!isset($file_info['public_static_methods'][$rsx_task])) { throw new Exception("Task {$rsx_task} not found in service {$service_class}"); } $method_info = $file_info['public_static_methods'][$rsx_task]; $has_task = false; // Check for Task attribute in method metadata if (isset($method_info['attributes'])) { foreach ($method_info['attributes'] as $attr_name => $attr_instances) { if ($attr_name === 'Task' || str_ends_with($attr_name, '\\Task')) { $has_task = true; break; } } } if (!$has_task) { throw new Exception("Method {$rsx_task} in service {$service_class} must have #[Task] attribute"); } // Create task instance for immediate execution $task_instance = new Task_Instance( $service_class, $rsx_task, $params, 'default', true // immediate execution ); // Mark as started $task_instance->mark_started(); try { // Call pre_task() if exists if (method_exists($service_class, 'pre_task')) { $pre_result = $service_class::pre_task($task_instance, $params); if ($pre_result !== null) { // pre_task returned something, use that as response $task_instance->mark_completed($pre_result); return $pre_result; } } // Call the actual task method $response = $service_class::$rsx_task($task_instance, $params); // Mark as completed $task_instance->mark_completed($response); // Filter response through JSON encode/decode to remove PHP objects // (similar to Ajax behavior) $filtered_response = json_decode(json_encode($response), true); return $filtered_response; } catch (Exception $e) { // Mark as failed $task_instance->mark_failed($e->getMessage()); throw $e; } } /** * Format task response for CLI output * Wraps the response in a consistent format * * @param mixed $response Task return value * @return array Formatted response */ public static function format_task_response($response): array { return [ 'success' => true, 'result' => $response, ]; } /** * Dispatch a task to the queue for async execution * * Creates a database record for the task and returns the task ID. * Task will be picked up and executed by the task processor (rsx:task:process). * * @param string $rsx_service Service name (e.g., 'Seeder_Service') * @param string $rsx_task Task/method name (e.g., 'seed_clients') * @param array $params Parameters to pass to the task * @param array $options Optional task options: * - 'queue' => Queue name (default: 'default') * - 'scheduled_for' => Timestamp when task should run (default: now) * - 'timeout' => Maximum execution time in seconds (default: from config) * @return int Task ID * @throws Exception */ public static function dispatch(string $rsx_service, string $rsx_task, array $params = [], array $options = []): int { // Get manifest to find service $manifest = Manifest::get_all(); $service_class = null; $file_info = null; // Search for service class in manifest foreach ($manifest as $file_path => $info) { // Skip non-PHP files or files without classes if (!isset($info['class']) || !isset($info['fqcn'])) { continue; } // Check if class name matches exactly (without namespace) $class_basename = basename(str_replace('\\', '/', $info['fqcn'])); if ($class_basename === $rsx_service) { $service_class = $info['fqcn']; $file_info = $info; break; } } if (!$service_class) { throw new Exception("Service class not found: {$rsx_service}"); } // Check if class exists if (!class_exists($service_class)) { throw new Exception("Service class does not exist: {$service_class}"); } // Check if it's a subclass of Rsx_Service_Abstract if (!Manifest::php_is_subclass_of($service_class, Rsx_Service_Abstract::class)) { throw new Exception("Service {$service_class} must extend Rsx_Service_Abstract"); } // Check if method exists and has Task attribute if (!isset($file_info['public_static_methods'][$rsx_task])) { throw new Exception("Task {$rsx_task} not found in service {$service_class}"); } $method_info = $file_info['public_static_methods'][$rsx_task]; $has_task = false; // Check for Task attribute in method metadata if (isset($method_info['attributes'])) { foreach ($method_info['attributes'] as $attr_name => $attr_instances) { if ($attr_name === 'Task' || str_ends_with($attr_name, '\\Task')) { $has_task = true; break; } } } if (!$has_task) { throw new Exception("Method {$rsx_task} in service {$service_class} must have #[Task] attribute"); } // Create task instance $instance = new Task_Instance( $service_class, $rsx_task, $params, $options['queue'] ?? 'default', false // not immediate ); // Create database record $data = [ 'class' => $service_class, 'method' => $rsx_task, 'queue' => $options['queue'] ?? 'default', 'status' => Task_Status::PENDING, 'params' => json_encode($params), 'scheduled_for' => $options['scheduled_for'] ?? now(), 'timeout' => $options['timeout'] ?? config('rsx.tasks.default_timeout'), 'created_at' => now(), 'updated_at' => now(), ]; $task_id = DB::table('_task_queue')->insertGetId($data); return $task_id; } /** * Get the status of a task * * Returns task information including status, logs, result, and error. * * @param int $task_id Task ID * @return array|null Task status data or null if not found */ public static function status(int $task_id): ?array { $row = DB::table('_task_queue')->where('id', $task_id)->first(); if (!$row) { return null; } return [ 'id' => $row->id, 'class' => $row->class, 'method' => $row->method, 'queue' => $row->queue, 'status' => $row->status, 'params' => json_decode($row->params, true), 'result' => json_decode($row->result, true), 'logs' => $row->logs ? explode("\n", $row->logs) : [], 'error' => $row->error, 'scheduled_for' => $row->scheduled_for, 'started_at' => $row->started_at, 'completed_at' => $row->completed_at, 'created_at' => $row->created_at, 'updated_at' => $row->updated_at, ]; } /** * Get all scheduled tasks from manifest * * Scans the manifest for methods with #[Schedule] attribute * and returns information about each scheduled task. * * @return array Array of scheduled task definitions */ public static function get_scheduled_tasks(): array { $manifest = Manifest::get_all(); $scheduled_tasks = []; foreach ($manifest as $file_path => $info) { // Skip non-PHP files or files without classes if (!isset($info['class']) || !isset($info['fqcn'])) { continue; } // Check if it's a service class if (!isset($info['public_static_methods'])) { continue; } foreach ($info['public_static_methods'] as $method_name => $method_info) { // Check for Schedule attribute if (!isset($method_info['attributes'])) { continue; } foreach ($method_info['attributes'] as $attr_name => $attr_instances) { if ($attr_name === 'Schedule' || str_ends_with($attr_name, '\\Schedule')) { // Found a scheduled task foreach ($attr_instances as $attr_instance) { $cron_expression = $attr_instance[0] ?? null; $queue = $attr_instance[1] ?? 'scheduled'; if ($cron_expression) { $scheduled_tasks[] = [ 'class' => $info['fqcn'], 'method' => $method_name, 'cron_expression' => $cron_expression, 'queue' => $queue, ]; } } } } } } return $scheduled_tasks; } }