Files
rspade_system/app/RSpade/Core/Locks/RsxLocks.php
root 29c657f7a7 Exclude tests directory from framework publish
Add 100+ automated unit tests from .expect file specifications
Add session system test
Add rsx:constants:regenerate command test
Add rsx:logrotate command test
Add rsx:clean command test
Add rsx:manifest:stats command test
Add model enum system test
Add model mass assignment prevention test
Add rsx:check command test
Add migrate:status command test

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-25 03:59:58 +00:00

710 lines
25 KiB
PHP

<?php
namespace App\RSpade\Core\Locks;
use \Redis;
use Exception;
use RuntimeException;
// Ensure helpers are loaded since we run early in bootstrap
$helpers_path = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'helpers.php';
if (file_exists($helpers_path)) {
require_once $helpers_path;
}
/**
* Advisory locking system using Redis
*
* Implements readers-writer lock pattern:
* - Multiple readers can hold locks simultaneously
* - Writers get exclusive access (no readers or other writers)
* - Writers wait for existing readers to drain
* - New readers wait if a writer is pending
*
* IMPORTANT: This class operates independently of the manifest system
* and can be used during manifest building itself.
*/
class RsxLocks
{
// Lock domains
const SERVER_LOCK = 'server';
const DATABASE_LOCK = 'database'; // For future extension if needed
// Lock types
const READ_LOCK = 'READ';
const WRITE_LOCK = 'WRITE';
// System-wide lock names (these are the ONLY locks used in the system)
const LOCK_APPLICATION = 'APPLICATION'; // Global application lock for all requests
const LOCK_MANIFEST_BUILD = 'MANIFEST_BUILD'; // Manifest rebuild operations
const LOCK_BUNDLE_BUILD = 'BUNDLE_BUILD'; // Bundle compilation operations
const LOCK_MIGRATION = 'MIGRATION'; // Database migration operations
const LOCK_FILE_WRITE = 'FILE_WRITE'; // File upload/storage operations
// Site-specific lock prefix (appended with site_id)
const LOCK_SITE_PREFIX = 'SITE_'; // e.g., SITE_1, SITE_2, etc.
// Redis configuration
private static ?\Redis $redis = null;
private static int $lock_db = 1; // Database 1 for locks (no eviction)
private static int $default_timeout = 30; // Default lock timeout in seconds
private static float $poll_interval = 0.1; // 100ms polling interval
// Track locks held by this process for cleanup
private static array $held_locks = [];
// Track lock acquisition counts for reentrant locking
// Format: ['lock_key' => ['token' => 'xxx', 'count' => 2]]
private static array $lock_counts = [];
/**
* Initialize Redis connection for locking
* Separate from cache to avoid manifest dependency
* Returns null in IDE context if Redis extension not available
*/
private static function _ensure_redis(): ?\Redis
{
// Skip Redis in IDE context if extension not available
if (\is_ide() && !class_exists('\Redis')) {
return null;
}
if (self::$redis === null) {
if (!class_exists('\Redis')) {
return null;
}
self::$redis = new \Redis();
// Connect to Redis (will be configured via environment)
$host = env('REDIS_HOST', '127.0.0.1');
$port = env('REDIS_PORT', 6379);
$socket = env('REDIS_SOCKET', null);
if ($socket && file_exists($socket)) {
$connected = self::$redis->connect($socket);
}
else {
$connected = self::$redis->connect($host, $port, 2.0);
}
if (!$connected) {
shouldnt_happen("Failed to connect to Redis for locking");
}
// Select the lock database (no eviction)
self::$redis->select(self::$lock_db);
// Register shutdown handler to release locks on exit
register_shutdown_function([self::class, '_cleanup_locks']);
}
return self::$redis;
}
/**
* Get an advisory lock
*
* @param string $domain Lock domain (SERVER_LOCK or DATABASE_LOCK)
* @param string $name Lock identifier (e.g., MANIFEST_BUILD_LOCK_ID)
* @param string $type Lock type (READ_LOCK or WRITE_LOCK)
* @param int $timeout Maximum seconds to wait for lock acquisition
* @return string Lock token for release
*/
public static function get_lock(
string $domain,
string $name,
string $type = self::READ_LOCK,
int $timeout = 30
): string {
if (!in_array($domain, [self::SERVER_LOCK, self::DATABASE_LOCK])) {
shouldnt_happen("Invalid lock domain: {$domain}");
}
if (!in_array($type, [self::READ_LOCK, self::WRITE_LOCK])) {
shouldnt_happen("Invalid lock type: {$type}");
}
// Reentrant locking: Check if we already hold this lock
$count_key = "{$domain}:{$name}:{$type}";
if (isset(self::$lock_counts[$count_key])) {
// Already holding this lock, increment count and return same token
self::$lock_counts[$count_key]['count']++;
return self::$lock_counts[$count_key]['token'];
}
$redis = self::_ensure_redis();
// Skip lock in IDE context without Redis
if ($redis === null) {
$mock_token = 'ide-mock-token-' . uniqid();
self::$lock_counts[$count_key] = ['token' => $mock_token, 'count' => 1];
return $mock_token;
}
$lock_key = "lock:{$domain}:{$name}";
$lock_token = uniqid(gethostname() . ':' . getmypid() . ':', true);
$start_time = microtime(true);
// Use Lua script for atomic operations
if ($type === self::READ_LOCK) {
$acquired = self::_acquire_read_lock($redis, $lock_key, $lock_token, $timeout, $start_time);
} else {
$acquired = self::_acquire_write_lock($redis, $lock_key, $lock_token, $timeout, $start_time);
}
if (!$acquired) {
throw new RuntimeException(
"Failed to acquire {$type} lock for {$domain}:{$name} after {$timeout} seconds"
);
}
// Track lock for cleanup
self::$held_locks[$lock_token] = [
'domain' => $domain,
'name' => $name,
'type' => $type,
'key' => $lock_key,
'acquired_at' => time()
];
// Initialize lock count tracking
self::$lock_counts[$count_key] = ['token' => $lock_token, 'count' => 1];
return $lock_token;
}
/**
* Release an advisory lock
*
* @param string $lock_token Token returned from get_lock
*/
public static function release_lock(string $lock_token): void
{
// Skip mock tokens from IDE context
if (str_starts_with($lock_token, 'ide-mock-token-')) {
// Still need to handle reentrant count for IDE mock tokens
// Find the count_key by matching token in lock_counts
foreach (self::$lock_counts as $key => $data) {
if ($data['token'] === $lock_token) {
if ($data['count'] > 1) {
self::$lock_counts[$key]['count']--;
} else {
unset(self::$lock_counts[$key]);
}
return;
}
}
return;
}
if (!isset(self::$held_locks[$lock_token])) {
// Lock already released or not owned by this process
return;
}
$lock_info = self::$held_locks[$lock_token];
// Build count_key from lock_info for reentrant locking
$count_key = "{$lock_info['domain']}:{$lock_info['name']}:{$lock_info['type']}";
// Reentrant locking: Check if count > 1
if (isset(self::$lock_counts[$count_key]) && self::$lock_counts[$count_key]['count'] > 1) {
// Decrement count and return without releasing Redis lock
self::$lock_counts[$count_key]['count']--;
return;
}
$redis = self::_ensure_redis();
// Skip if Redis not available in IDE
if ($redis === null) {
unset(self::$held_locks[$lock_token]);
unset(self::$lock_counts[$count_key]);
return;
}
$lock_key = $lock_info['key'];
if ($lock_info['type'] === self::READ_LOCK) {
self::_release_read_lock($redis, $lock_key, $lock_token);
} else {
self::_release_write_lock($redis, $lock_key, $lock_token);
}
// Remove from both tracking arrays when count reaches 0
unset(self::$held_locks[$lock_token]);
unset(self::$lock_counts[$count_key]);
}
/**
* Upgrade a read lock to a write lock atomically
*
* This is used for optimistic concurrency control where we start with a read lock
* and only upgrade to write if needed. The read lock is NOT released during upgrade.
*
* @param string $lock_token Existing read lock token
* @param int $timeout Maximum seconds to wait for upgrade
* @return string New write lock token
* @throws RuntimeException if upgrade fails or token is not a read lock
*/
public static function upgrade_lock(string $lock_token, int $timeout = 30): string
{
// Skip mock tokens from IDE context
if (str_starts_with($lock_token, 'ide-mock-token-')) {
$new_token = 'ide-mock-token-' . uniqid();
// Handle reentrant count for IDE mock tokens
foreach (self::$lock_counts as $key => $data) {
if ($data['token'] === $lock_token) {
$count = $data['count'];
unset(self::$lock_counts[$key]);
// Create new count entry with WRITE_LOCK type
// Parse the key to get domain and name
$parts = explode(':', $key);
if (count($parts) === 3) {
$new_count_key = "{$parts[0]}:{$parts[1]}:" . self::WRITE_LOCK;
self::$lock_counts[$new_count_key] = ['token' => $new_token, 'count' => $count];
}
break;
}
}
return $new_token;
}
if (!isset(self::$held_locks[$lock_token])) {
throw new RuntimeException("Cannot upgrade lock - token not found or already released");
}
$lock_info = self::$held_locks[$lock_token];
if ($lock_info['type'] !== self::READ_LOCK) {
throw new RuntimeException("Can only upgrade READ locks to WRITE locks");
}
// Handle reentrant lock count transfer from READ to WRITE
$old_count_key = "{$lock_info['domain']}:{$lock_info['name']}:" . self::READ_LOCK;
$new_count_key = "{$lock_info['domain']}:{$lock_info['name']}:" . self::WRITE_LOCK;
$current_count = self::$lock_counts[$old_count_key]['count'] ?? 1;
$redis = self::_ensure_redis();
// Skip if Redis not available in IDE
if ($redis === null) {
$new_token = 'ide-mock-token-' . uniqid();
unset(self::$lock_counts[$old_count_key]);
self::$lock_counts[$new_count_key] = ['token' => $new_token, 'count' => $current_count];
unset(self::$held_locks[$lock_token]);
self::$held_locks[$new_token] = [
'domain' => $lock_info['domain'],
'name' => $lock_info['name'],
'type' => self::WRITE_LOCK,
'key' => $lock_info['key'],
'acquired_at' => time(),
'upgraded_from' => $lock_token
];
return $new_token;
}
$lock_key = $lock_info['key'];
$new_token = uniqid(gethostname() . ':' . getmypid() . ':write:', true);
$start_time = microtime(true);
// Attempt atomic upgrade
$upgraded = self::_upgrade_read_to_write_lock(
$redis,
$lock_key,
$lock_token,
$new_token,
$timeout,
$start_time
);
if (!$upgraded) {
throw new RuntimeException(
"Failed to upgrade read lock to write lock for {$lock_info['domain']}:{$lock_info['name']} after {$timeout} seconds"
);
}
// Remove old read lock from tracking
unset(self::$held_locks[$lock_token]);
unset(self::$lock_counts[$old_count_key]);
// Add new write lock to tracking with preserved count
self::$held_locks[$new_token] = [
'domain' => $lock_info['domain'],
'name' => $lock_info['name'],
'type' => self::WRITE_LOCK,
'key' => $lock_key,
'acquired_at' => time(),
'upgraded_from' => $lock_token
];
self::$lock_counts[$new_count_key] = ['token' => $new_token, 'count' => $current_count];
return $new_token;
}
/**
* Acquire a read lock
*/
private static function _acquire_read_lock(
Redis $redis,
string $lock_key,
string $lock_token,
int $timeout,
float $start_time
): bool {
$writer_queue_key = "{$lock_key}:writer_queue";
$readers_key = "{$lock_key}:readers";
$writer_active_key = "{$lock_key}:writer_active";
while ((microtime(true) - $start_time) < $timeout) {
// Lua script for atomic read lock acquisition
$lua = <<<'LUA'
local lock_key = KEYS[1]
local writer_queue_key = KEYS[2]
local readers_key = KEYS[3]
local writer_active_key = KEYS[4]
local lock_token = ARGV[1]
local ttl = ARGV[2]
-- Check if a writer is active
local writer_active = redis.call('EXISTS', writer_active_key)
if writer_active == 1 then
return 0 -- Writer is active, cannot acquire read lock
end
-- Check if writers are waiting
local writers_waiting = redis.call('LLEN', writer_queue_key)
if writers_waiting > 0 then
return 0 -- Writers are waiting, new readers must wait
end
-- Add this reader
redis.call('HSET', readers_key, lock_token, '1')
redis.call('EXPIRE', readers_key, ttl)
return 1 -- Successfully acquired read lock
LUA;
$result = $redis->eval(
$lua,
[$lock_key, $writer_queue_key, $readers_key, $writer_active_key, $lock_token, self::$default_timeout],
4
);
if ($result === 1) {
return true;
}
// Wait before retrying
usleep(self::$poll_interval * 1000000);
}
return false;
}
/**
* Acquire a write lock
*/
private static function _acquire_write_lock(
Redis $redis,
string $lock_key,
string $lock_token,
int $timeout,
float $start_time
): bool {
$writer_queue_key = "{$lock_key}:writer_queue";
$readers_key = "{$lock_key}:readers";
$writer_active_key = "{$lock_key}:writer_active";
// First, add ourselves to the writer queue
$redis->rPush($writer_queue_key, $lock_token);
$redis->expire($writer_queue_key, self::$default_timeout);
try {
while ((microtime(true) - $start_time) < $timeout) {
// Lua script for atomic write lock acquisition
$lua = <<<'LUA'
local lock_key = KEYS[1]
local writer_queue_key = KEYS[2]
local readers_key = KEYS[3]
local writer_active_key = KEYS[4]
local lock_token = ARGV[1]
local ttl = ARGV[2]
-- Check if we're next in the writer queue
local next_writer = redis.call('LINDEX', writer_queue_key, 0)
if next_writer ~= lock_token then
return 0 -- Not our turn yet
end
-- Check if any readers are active
local reader_count = redis.call('HLEN', readers_key)
if reader_count > 0 then
return 0 -- Readers still active, must wait
end
-- Check if another writer is active
local writer_active = redis.call('EXISTS', writer_active_key)
if writer_active == 1 then
return 0 -- Another writer is active
end
-- Acquire the write lock
redis.call('SET', writer_active_key, lock_token, 'EX', ttl)
redis.call('LPOP', writer_queue_key) -- Remove ourselves from queue
return 1 -- Successfully acquired write lock
LUA;
$result = $redis->eval(
$lua,
[$lock_key, $writer_queue_key, $readers_key, $writer_active_key, $lock_token, self::$default_timeout],
4
);
if ($result === 1) {
return true;
}
// Wait before retrying
usleep(self::$poll_interval * 1000000);
}
} catch (Exception $e) {
// Remove from queue on error
$redis->lRem($writer_queue_key, $lock_token, 0);
throw $e;
}
// Timeout - remove from queue
$redis->lRem($writer_queue_key, $lock_token, 0);
return false;
}
/**
* Upgrade a read lock to a write lock atomically
*
* IMPORTANT: To prevent deadlock, readers that are upgrading to writers
* are NOT counted as blocking readers. This allows multiple readers to
* upgrade without deadlocking each other.
*/
private static function _upgrade_read_to_write_lock(
Redis $redis,
string $lock_key,
string $old_token,
string $new_token,
int $timeout,
float $start_time
): bool {
$writer_queue_key = "{$lock_key}:writer_queue";
$readers_key = "{$lock_key}:readers";
$writer_active_key = "{$lock_key}:writer_active";
$upgrading_readers_key = "{$lock_key}:upgrading_readers";
// Mark ourselves as an upgrading reader (not blocking other upgrades)
$redis->sAdd($upgrading_readers_key, $old_token);
$redis->expire($upgrading_readers_key, self::$default_timeout);
// Add ourselves to the writer queue
$redis->rPush($writer_queue_key, $new_token); // Normal queue order
$redis->expire($writer_queue_key, self::$default_timeout);
try {
while ((microtime(true) - $start_time) < $timeout) {
// Lua script for atomic lock upgrade (deadlock-safe)
$lua = <<<'LUA'
local lock_key = KEYS[1]
local writer_queue_key = KEYS[2]
local readers_key = KEYS[3]
local writer_active_key = KEYS[4]
local upgrading_readers_key = KEYS[5]
local old_token = ARGV[1]
local new_token = ARGV[2]
local ttl = ARGV[3]
-- Verify we still hold the read lock
local has_read_lock = redis.call('HEXISTS', readers_key, old_token)
if has_read_lock == 0 then
return -1 -- Lost our read lock somehow
end
-- Check if we're next in the writer queue
local next_writer = redis.call('LINDEX', writer_queue_key, 0)
if next_writer ~= new_token then
return 0 -- Not our turn yet
end
-- Check if any OTHER readers are active (excluding upgrading readers)
-- This prevents deadlock where multiple readers wait for each other
local all_readers = redis.call('HKEYS', readers_key)
local upgrading = redis.call('SMEMBERS', upgrading_readers_key)
local upgrading_set = {}
for i, token in ipairs(upgrading) do
upgrading_set[token] = true
end
local blocking_readers = 0
for i, reader_token in ipairs(all_readers) do
-- Don't count ourselves or other upgrading readers as blockers
if reader_token ~= old_token and not upgrading_set[reader_token] then
blocking_readers = blocking_readers + 1
end
end
if blocking_readers > 0 then
return 0 -- Non-upgrading readers still active, must wait
end
-- Check if another writer is active
local writer_active = redis.call('EXISTS', writer_active_key)
if writer_active == 1 then
return 0 -- Another writer is active
end
-- Upgrade to write lock atomically
-- 1. Remove our read lock
redis.call('HDEL', readers_key, old_token)
-- 2. Remove from upgrading readers set
redis.call('SREM', upgrading_readers_key, old_token)
-- 3. Set write lock
redis.call('SET', writer_active_key, new_token, 'EX', ttl)
-- 4. Remove from writer queue
redis.call('LPOP', writer_queue_key)
return 1 -- Successfully upgraded
LUA;
$result = $redis->eval(
$lua,
[$lock_key, $writer_queue_key, $readers_key, $writer_active_key, $upgrading_readers_key, $old_token, $new_token, self::$default_timeout],
5
);
if ($result === 1) {
return true;
} elseif ($result === -1) {
// Lost our read lock - this shouldn't happen
$redis->lRem($writer_queue_key, $new_token, 0);
$redis->sRem($upgrading_readers_key, $old_token);
throw new RuntimeException("Lost read lock during upgrade attempt");
}
// Wait before retrying
usleep(self::$poll_interval * 1000000);
}
} catch (Exception $e) {
// Remove from queues on error
$redis->lRem($writer_queue_key, $new_token, 0);
$redis->sRem($upgrading_readers_key, $old_token);
throw $e;
}
// Timeout - remove from queues
$redis->lRem($writer_queue_key, $new_token, 0);
$redis->sRem($upgrading_readers_key, $old_token);
return false;
}
/**
* Release a read lock
*/
private static function _release_read_lock(Redis $redis, string $lock_key, string $lock_token): void
{
$readers_key = "{$lock_key}:readers";
$redis->hDel($readers_key, $lock_token);
}
/**
* Release a write lock
*/
private static function _release_write_lock(Redis $redis, string $lock_key, string $lock_token): void
{
$writer_active_key = "{$lock_key}:writer_active";
// Only delete if we own it
$lua = <<<'LUA'
local writer_active_key = KEYS[1]
local lock_token = ARGV[1]
local current_writer = redis.call('GET', writer_active_key)
if current_writer == lock_token then
redis.call('DEL', writer_active_key)
return 1
end
return 0
LUA;
$redis->eval($lua, [$writer_active_key, $lock_token], 1);
}
/**
* Cleanup locks on process exit
* Called by shutdown handler
*/
public static function _cleanup_locks(): void
{
foreach (array_keys(self::$held_locks) as $lock_token) {
try {
self::release_lock($lock_token);
} catch (Exception $e) {
// Ignore errors during cleanup
}
}
}
/**
* Force clear all locks for a given name (emergency use only)
*
* @param string $domain Lock domain
* @param string $name Lock name
*/
public static function force_clear_lock(string $domain, string $name): void
{
$redis = self::_ensure_redis();
// Skip if Redis not available in IDE
if ($redis === null) {
return;
}
$lock_key = "lock:{$domain}:{$name}";
$redis->del([
$lock_key,
"{$lock_key}:writer_queue",
"{$lock_key}:readers",
"{$lock_key}:writer_active"
]);
}
/**
* Get lock statistics for monitoring
*
* @param string $domain Lock domain
* @param string $name Lock name
* @return array Lock statistics
*/
public static function get_lock_stats(string $domain, string $name): array
{
$redis = self::_ensure_redis();
// Skip if Redis not available in IDE
if ($redis === null) {
return [
'readers_active' => 0,
'writers_waiting' => 0,
'writer_active' => false,
'writer_token' => null
];
}
$lock_key = "lock:{$domain}:{$name}";
$writer_queue = $redis->lLen("{$lock_key}:writer_queue");
$reader_count = $redis->hLen("{$lock_key}:readers");
$writer_active = $redis->get("{$lock_key}:writer_active");
return [
'readers_active' => $reader_count,
'writers_waiting' => $writer_queue,
'writer_active' => $writer_active !== false,
'writer_token' => $writer_active ?: null
];
}
}