Add 100+ automated unit tests from .expect file specifications Add session system test Add rsx:constants:regenerate command test Add rsx:logrotate command test Add rsx:clean command test Add rsx:manifest:stats command test Add model enum system test Add model mass assignment prevention test Add rsx:check command test Add migrate:status command test 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
710 lines
25 KiB
PHP
710 lines
25 KiB
PHP
<?php
|
|
|
|
namespace App\RSpade\Core\Locks;
|
|
|
|
use \Redis;
|
|
use Exception;
|
|
use RuntimeException;
|
|
|
|
// Ensure helpers are loaded since we run early in bootstrap
|
|
$helpers_path = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'helpers.php';
|
|
if (file_exists($helpers_path)) {
|
|
require_once $helpers_path;
|
|
}
|
|
|
|
/**
|
|
* Advisory locking system using Redis
|
|
*
|
|
* Implements readers-writer lock pattern:
|
|
* - Multiple readers can hold locks simultaneously
|
|
* - Writers get exclusive access (no readers or other writers)
|
|
* - Writers wait for existing readers to drain
|
|
* - New readers wait if a writer is pending
|
|
*
|
|
* IMPORTANT: This class operates independently of the manifest system
|
|
* and can be used during manifest building itself.
|
|
*/
|
|
class RsxLocks
|
|
{
|
|
// Lock domains
|
|
const SERVER_LOCK = 'server';
|
|
const DATABASE_LOCK = 'database'; // For future extension if needed
|
|
|
|
// Lock types
|
|
const READ_LOCK = 'READ';
|
|
const WRITE_LOCK = 'WRITE';
|
|
|
|
// System-wide lock names (these are the ONLY locks used in the system)
|
|
const LOCK_APPLICATION = 'APPLICATION'; // Global application lock for all requests
|
|
const LOCK_MANIFEST_BUILD = 'MANIFEST_BUILD'; // Manifest rebuild operations
|
|
const LOCK_BUNDLE_BUILD = 'BUNDLE_BUILD'; // Bundle compilation operations
|
|
const LOCK_MIGRATION = 'MIGRATION'; // Database migration operations
|
|
const LOCK_FILE_WRITE = 'FILE_WRITE'; // File upload/storage operations
|
|
|
|
// Site-specific lock prefix (appended with site_id)
|
|
const LOCK_SITE_PREFIX = 'SITE_'; // e.g., SITE_1, SITE_2, etc.
|
|
|
|
// Redis configuration
|
|
private static ?\Redis $redis = null;
|
|
private static int $lock_db = 1; // Database 1 for locks (no eviction)
|
|
private static int $default_timeout = 30; // Default lock timeout in seconds
|
|
private static float $poll_interval = 0.1; // 100ms polling interval
|
|
|
|
// Track locks held by this process for cleanup
|
|
private static array $held_locks = [];
|
|
|
|
// Track lock acquisition counts for reentrant locking
|
|
// Format: ['lock_key' => ['token' => 'xxx', 'count' => 2]]
|
|
private static array $lock_counts = [];
|
|
|
|
/**
|
|
* Initialize Redis connection for locking
|
|
* Separate from cache to avoid manifest dependency
|
|
* Returns null in IDE context if Redis extension not available
|
|
*/
|
|
private static function _ensure_redis(): ?\Redis
|
|
{
|
|
// Skip Redis in IDE context if extension not available
|
|
if (\is_ide() && !class_exists('\Redis')) {
|
|
return null;
|
|
}
|
|
|
|
if (self::$redis === null) {
|
|
if (!class_exists('\Redis')) {
|
|
return null;
|
|
}
|
|
self::$redis = new \Redis();
|
|
|
|
// Connect to Redis (will be configured via environment)
|
|
$host = env('REDIS_HOST', '127.0.0.1');
|
|
$port = env('REDIS_PORT', 6379);
|
|
$socket = env('REDIS_SOCKET', null);
|
|
|
|
if ($socket && file_exists($socket)) {
|
|
$connected = self::$redis->connect($socket);
|
|
}
|
|
else {
|
|
$connected = self::$redis->connect($host, $port, 2.0);
|
|
}
|
|
|
|
if (!$connected) {
|
|
shouldnt_happen("Failed to connect to Redis for locking");
|
|
}
|
|
|
|
// Select the lock database (no eviction)
|
|
self::$redis->select(self::$lock_db);
|
|
|
|
// Register shutdown handler to release locks on exit
|
|
register_shutdown_function([self::class, '_cleanup_locks']);
|
|
}
|
|
|
|
return self::$redis;
|
|
}
|
|
|
|
/**
|
|
* Get an advisory lock
|
|
*
|
|
* @param string $domain Lock domain (SERVER_LOCK or DATABASE_LOCK)
|
|
* @param string $name Lock identifier (e.g., MANIFEST_BUILD_LOCK_ID)
|
|
* @param string $type Lock type (READ_LOCK or WRITE_LOCK)
|
|
* @param int $timeout Maximum seconds to wait for lock acquisition
|
|
* @return string Lock token for release
|
|
*/
|
|
public static function get_lock(
|
|
string $domain,
|
|
string $name,
|
|
string $type = self::READ_LOCK,
|
|
int $timeout = 30
|
|
): string {
|
|
if (!in_array($domain, [self::SERVER_LOCK, self::DATABASE_LOCK])) {
|
|
shouldnt_happen("Invalid lock domain: {$domain}");
|
|
}
|
|
|
|
if (!in_array($type, [self::READ_LOCK, self::WRITE_LOCK])) {
|
|
shouldnt_happen("Invalid lock type: {$type}");
|
|
}
|
|
|
|
// Reentrant locking: Check if we already hold this lock
|
|
$count_key = "{$domain}:{$name}:{$type}";
|
|
if (isset(self::$lock_counts[$count_key])) {
|
|
// Already holding this lock, increment count and return same token
|
|
self::$lock_counts[$count_key]['count']++;
|
|
return self::$lock_counts[$count_key]['token'];
|
|
}
|
|
|
|
$redis = self::_ensure_redis();
|
|
|
|
// Skip lock in IDE context without Redis
|
|
if ($redis === null) {
|
|
$mock_token = 'ide-mock-token-' . uniqid();
|
|
self::$lock_counts[$count_key] = ['token' => $mock_token, 'count' => 1];
|
|
return $mock_token;
|
|
}
|
|
|
|
$lock_key = "lock:{$domain}:{$name}";
|
|
$lock_token = uniqid(gethostname() . ':' . getmypid() . ':', true);
|
|
$start_time = microtime(true);
|
|
|
|
// Use Lua script for atomic operations
|
|
if ($type === self::READ_LOCK) {
|
|
$acquired = self::_acquire_read_lock($redis, $lock_key, $lock_token, $timeout, $start_time);
|
|
} else {
|
|
$acquired = self::_acquire_write_lock($redis, $lock_key, $lock_token, $timeout, $start_time);
|
|
}
|
|
|
|
if (!$acquired) {
|
|
throw new RuntimeException(
|
|
"Failed to acquire {$type} lock for {$domain}:{$name} after {$timeout} seconds"
|
|
);
|
|
}
|
|
|
|
// Track lock for cleanup
|
|
self::$held_locks[$lock_token] = [
|
|
'domain' => $domain,
|
|
'name' => $name,
|
|
'type' => $type,
|
|
'key' => $lock_key,
|
|
'acquired_at' => time()
|
|
];
|
|
|
|
// Initialize lock count tracking
|
|
self::$lock_counts[$count_key] = ['token' => $lock_token, 'count' => 1];
|
|
|
|
return $lock_token;
|
|
}
|
|
|
|
/**
|
|
* Release an advisory lock
|
|
*
|
|
* @param string $lock_token Token returned from get_lock
|
|
*/
|
|
public static function release_lock(string $lock_token): void
|
|
{
|
|
// Skip mock tokens from IDE context
|
|
if (str_starts_with($lock_token, 'ide-mock-token-')) {
|
|
// Still need to handle reentrant count for IDE mock tokens
|
|
// Find the count_key by matching token in lock_counts
|
|
foreach (self::$lock_counts as $key => $data) {
|
|
if ($data['token'] === $lock_token) {
|
|
if ($data['count'] > 1) {
|
|
self::$lock_counts[$key]['count']--;
|
|
} else {
|
|
unset(self::$lock_counts[$key]);
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (!isset(self::$held_locks[$lock_token])) {
|
|
// Lock already released or not owned by this process
|
|
return;
|
|
}
|
|
|
|
$lock_info = self::$held_locks[$lock_token];
|
|
|
|
// Build count_key from lock_info for reentrant locking
|
|
$count_key = "{$lock_info['domain']}:{$lock_info['name']}:{$lock_info['type']}";
|
|
|
|
// Reentrant locking: Check if count > 1
|
|
if (isset(self::$lock_counts[$count_key]) && self::$lock_counts[$count_key]['count'] > 1) {
|
|
// Decrement count and return without releasing Redis lock
|
|
self::$lock_counts[$count_key]['count']--;
|
|
return;
|
|
}
|
|
|
|
$redis = self::_ensure_redis();
|
|
|
|
// Skip if Redis not available in IDE
|
|
if ($redis === null) {
|
|
unset(self::$held_locks[$lock_token]);
|
|
unset(self::$lock_counts[$count_key]);
|
|
return;
|
|
}
|
|
$lock_key = $lock_info['key'];
|
|
|
|
if ($lock_info['type'] === self::READ_LOCK) {
|
|
self::_release_read_lock($redis, $lock_key, $lock_token);
|
|
} else {
|
|
self::_release_write_lock($redis, $lock_key, $lock_token);
|
|
}
|
|
|
|
// Remove from both tracking arrays when count reaches 0
|
|
unset(self::$held_locks[$lock_token]);
|
|
unset(self::$lock_counts[$count_key]);
|
|
}
|
|
|
|
/**
|
|
* Upgrade a read lock to a write lock atomically
|
|
*
|
|
* This is used for optimistic concurrency control where we start with a read lock
|
|
* and only upgrade to write if needed. The read lock is NOT released during upgrade.
|
|
*
|
|
* @param string $lock_token Existing read lock token
|
|
* @param int $timeout Maximum seconds to wait for upgrade
|
|
* @return string New write lock token
|
|
* @throws RuntimeException if upgrade fails or token is not a read lock
|
|
*/
|
|
public static function upgrade_lock(string $lock_token, int $timeout = 30): string
|
|
{
|
|
// Skip mock tokens from IDE context
|
|
if (str_starts_with($lock_token, 'ide-mock-token-')) {
|
|
$new_token = 'ide-mock-token-' . uniqid();
|
|
|
|
// Handle reentrant count for IDE mock tokens
|
|
foreach (self::$lock_counts as $key => $data) {
|
|
if ($data['token'] === $lock_token) {
|
|
$count = $data['count'];
|
|
unset(self::$lock_counts[$key]);
|
|
|
|
// Create new count entry with WRITE_LOCK type
|
|
// Parse the key to get domain and name
|
|
$parts = explode(':', $key);
|
|
if (count($parts) === 3) {
|
|
$new_count_key = "{$parts[0]}:{$parts[1]}:" . self::WRITE_LOCK;
|
|
self::$lock_counts[$new_count_key] = ['token' => $new_token, 'count' => $count];
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
return $new_token;
|
|
}
|
|
|
|
if (!isset(self::$held_locks[$lock_token])) {
|
|
throw new RuntimeException("Cannot upgrade lock - token not found or already released");
|
|
}
|
|
|
|
$lock_info = self::$held_locks[$lock_token];
|
|
|
|
if ($lock_info['type'] !== self::READ_LOCK) {
|
|
throw new RuntimeException("Can only upgrade READ locks to WRITE locks");
|
|
}
|
|
|
|
// Handle reentrant lock count transfer from READ to WRITE
|
|
$old_count_key = "{$lock_info['domain']}:{$lock_info['name']}:" . self::READ_LOCK;
|
|
$new_count_key = "{$lock_info['domain']}:{$lock_info['name']}:" . self::WRITE_LOCK;
|
|
$current_count = self::$lock_counts[$old_count_key]['count'] ?? 1;
|
|
|
|
$redis = self::_ensure_redis();
|
|
|
|
// Skip if Redis not available in IDE
|
|
if ($redis === null) {
|
|
$new_token = 'ide-mock-token-' . uniqid();
|
|
unset(self::$lock_counts[$old_count_key]);
|
|
self::$lock_counts[$new_count_key] = ['token' => $new_token, 'count' => $current_count];
|
|
unset(self::$held_locks[$lock_token]);
|
|
self::$held_locks[$new_token] = [
|
|
'domain' => $lock_info['domain'],
|
|
'name' => $lock_info['name'],
|
|
'type' => self::WRITE_LOCK,
|
|
'key' => $lock_info['key'],
|
|
'acquired_at' => time(),
|
|
'upgraded_from' => $lock_token
|
|
];
|
|
return $new_token;
|
|
}
|
|
$lock_key = $lock_info['key'];
|
|
$new_token = uniqid(gethostname() . ':' . getmypid() . ':write:', true);
|
|
$start_time = microtime(true);
|
|
|
|
// Attempt atomic upgrade
|
|
$upgraded = self::_upgrade_read_to_write_lock(
|
|
$redis,
|
|
$lock_key,
|
|
$lock_token,
|
|
$new_token,
|
|
$timeout,
|
|
$start_time
|
|
);
|
|
|
|
if (!$upgraded) {
|
|
throw new RuntimeException(
|
|
"Failed to upgrade read lock to write lock for {$lock_info['domain']}:{$lock_info['name']} after {$timeout} seconds"
|
|
);
|
|
}
|
|
|
|
// Remove old read lock from tracking
|
|
unset(self::$held_locks[$lock_token]);
|
|
unset(self::$lock_counts[$old_count_key]);
|
|
|
|
// Add new write lock to tracking with preserved count
|
|
self::$held_locks[$new_token] = [
|
|
'domain' => $lock_info['domain'],
|
|
'name' => $lock_info['name'],
|
|
'type' => self::WRITE_LOCK,
|
|
'key' => $lock_key,
|
|
'acquired_at' => time(),
|
|
'upgraded_from' => $lock_token
|
|
];
|
|
self::$lock_counts[$new_count_key] = ['token' => $new_token, 'count' => $current_count];
|
|
|
|
return $new_token;
|
|
}
|
|
|
|
/**
|
|
* Acquire a read lock
|
|
*/
|
|
private static function _acquire_read_lock(
|
|
Redis $redis,
|
|
string $lock_key,
|
|
string $lock_token,
|
|
int $timeout,
|
|
float $start_time
|
|
): bool {
|
|
$writer_queue_key = "{$lock_key}:writer_queue";
|
|
$readers_key = "{$lock_key}:readers";
|
|
$writer_active_key = "{$lock_key}:writer_active";
|
|
|
|
while ((microtime(true) - $start_time) < $timeout) {
|
|
// Lua script for atomic read lock acquisition
|
|
$lua = <<<'LUA'
|
|
local lock_key = KEYS[1]
|
|
local writer_queue_key = KEYS[2]
|
|
local readers_key = KEYS[3]
|
|
local writer_active_key = KEYS[4]
|
|
local lock_token = ARGV[1]
|
|
local ttl = ARGV[2]
|
|
|
|
-- Check if a writer is active
|
|
local writer_active = redis.call('EXISTS', writer_active_key)
|
|
if writer_active == 1 then
|
|
return 0 -- Writer is active, cannot acquire read lock
|
|
end
|
|
|
|
-- Check if writers are waiting
|
|
local writers_waiting = redis.call('LLEN', writer_queue_key)
|
|
if writers_waiting > 0 then
|
|
return 0 -- Writers are waiting, new readers must wait
|
|
end
|
|
|
|
-- Add this reader
|
|
redis.call('HSET', readers_key, lock_token, '1')
|
|
redis.call('EXPIRE', readers_key, ttl)
|
|
return 1 -- Successfully acquired read lock
|
|
LUA;
|
|
|
|
$result = $redis->eval(
|
|
$lua,
|
|
[$lock_key, $writer_queue_key, $readers_key, $writer_active_key, $lock_token, self::$default_timeout],
|
|
4
|
|
);
|
|
|
|
if ($result === 1) {
|
|
return true;
|
|
}
|
|
|
|
// Wait before retrying
|
|
usleep(self::$poll_interval * 1000000);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Acquire a write lock
|
|
*/
|
|
private static function _acquire_write_lock(
|
|
Redis $redis,
|
|
string $lock_key,
|
|
string $lock_token,
|
|
int $timeout,
|
|
float $start_time
|
|
): bool {
|
|
$writer_queue_key = "{$lock_key}:writer_queue";
|
|
$readers_key = "{$lock_key}:readers";
|
|
$writer_active_key = "{$lock_key}:writer_active";
|
|
|
|
// First, add ourselves to the writer queue
|
|
$redis->rPush($writer_queue_key, $lock_token);
|
|
$redis->expire($writer_queue_key, self::$default_timeout);
|
|
|
|
try {
|
|
while ((microtime(true) - $start_time) < $timeout) {
|
|
// Lua script for atomic write lock acquisition
|
|
$lua = <<<'LUA'
|
|
local lock_key = KEYS[1]
|
|
local writer_queue_key = KEYS[2]
|
|
local readers_key = KEYS[3]
|
|
local writer_active_key = KEYS[4]
|
|
local lock_token = ARGV[1]
|
|
local ttl = ARGV[2]
|
|
|
|
-- Check if we're next in the writer queue
|
|
local next_writer = redis.call('LINDEX', writer_queue_key, 0)
|
|
if next_writer ~= lock_token then
|
|
return 0 -- Not our turn yet
|
|
end
|
|
|
|
-- Check if any readers are active
|
|
local reader_count = redis.call('HLEN', readers_key)
|
|
if reader_count > 0 then
|
|
return 0 -- Readers still active, must wait
|
|
end
|
|
|
|
-- Check if another writer is active
|
|
local writer_active = redis.call('EXISTS', writer_active_key)
|
|
if writer_active == 1 then
|
|
return 0 -- Another writer is active
|
|
end
|
|
|
|
-- Acquire the write lock
|
|
redis.call('SET', writer_active_key, lock_token, 'EX', ttl)
|
|
redis.call('LPOP', writer_queue_key) -- Remove ourselves from queue
|
|
return 1 -- Successfully acquired write lock
|
|
LUA;
|
|
|
|
$result = $redis->eval(
|
|
$lua,
|
|
[$lock_key, $writer_queue_key, $readers_key, $writer_active_key, $lock_token, self::$default_timeout],
|
|
4
|
|
);
|
|
|
|
if ($result === 1) {
|
|
return true;
|
|
}
|
|
|
|
// Wait before retrying
|
|
usleep(self::$poll_interval * 1000000);
|
|
}
|
|
} catch (Exception $e) {
|
|
// Remove from queue on error
|
|
$redis->lRem($writer_queue_key, $lock_token, 0);
|
|
throw $e;
|
|
}
|
|
|
|
// Timeout - remove from queue
|
|
$redis->lRem($writer_queue_key, $lock_token, 0);
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Upgrade a read lock to a write lock atomically
|
|
*
|
|
* IMPORTANT: To prevent deadlock, readers that are upgrading to writers
|
|
* are NOT counted as blocking readers. This allows multiple readers to
|
|
* upgrade without deadlocking each other.
|
|
*/
|
|
private static function _upgrade_read_to_write_lock(
|
|
Redis $redis,
|
|
string $lock_key,
|
|
string $old_token,
|
|
string $new_token,
|
|
int $timeout,
|
|
float $start_time
|
|
): bool {
|
|
$writer_queue_key = "{$lock_key}:writer_queue";
|
|
$readers_key = "{$lock_key}:readers";
|
|
$writer_active_key = "{$lock_key}:writer_active";
|
|
$upgrading_readers_key = "{$lock_key}:upgrading_readers";
|
|
|
|
// Mark ourselves as an upgrading reader (not blocking other upgrades)
|
|
$redis->sAdd($upgrading_readers_key, $old_token);
|
|
$redis->expire($upgrading_readers_key, self::$default_timeout);
|
|
|
|
// Add ourselves to the writer queue
|
|
$redis->rPush($writer_queue_key, $new_token); // Normal queue order
|
|
$redis->expire($writer_queue_key, self::$default_timeout);
|
|
|
|
try {
|
|
while ((microtime(true) - $start_time) < $timeout) {
|
|
// Lua script for atomic lock upgrade (deadlock-safe)
|
|
$lua = <<<'LUA'
|
|
local lock_key = KEYS[1]
|
|
local writer_queue_key = KEYS[2]
|
|
local readers_key = KEYS[3]
|
|
local writer_active_key = KEYS[4]
|
|
local upgrading_readers_key = KEYS[5]
|
|
local old_token = ARGV[1]
|
|
local new_token = ARGV[2]
|
|
local ttl = ARGV[3]
|
|
|
|
-- Verify we still hold the read lock
|
|
local has_read_lock = redis.call('HEXISTS', readers_key, old_token)
|
|
if has_read_lock == 0 then
|
|
return -1 -- Lost our read lock somehow
|
|
end
|
|
|
|
-- Check if we're next in the writer queue
|
|
local next_writer = redis.call('LINDEX', writer_queue_key, 0)
|
|
if next_writer ~= new_token then
|
|
return 0 -- Not our turn yet
|
|
end
|
|
|
|
-- Check if any OTHER readers are active (excluding upgrading readers)
|
|
-- This prevents deadlock where multiple readers wait for each other
|
|
local all_readers = redis.call('HKEYS', readers_key)
|
|
local upgrading = redis.call('SMEMBERS', upgrading_readers_key)
|
|
local upgrading_set = {}
|
|
for i, token in ipairs(upgrading) do
|
|
upgrading_set[token] = true
|
|
end
|
|
|
|
local blocking_readers = 0
|
|
for i, reader_token in ipairs(all_readers) do
|
|
-- Don't count ourselves or other upgrading readers as blockers
|
|
if reader_token ~= old_token and not upgrading_set[reader_token] then
|
|
blocking_readers = blocking_readers + 1
|
|
end
|
|
end
|
|
|
|
if blocking_readers > 0 then
|
|
return 0 -- Non-upgrading readers still active, must wait
|
|
end
|
|
|
|
-- Check if another writer is active
|
|
local writer_active = redis.call('EXISTS', writer_active_key)
|
|
if writer_active == 1 then
|
|
return 0 -- Another writer is active
|
|
end
|
|
|
|
-- Upgrade to write lock atomically
|
|
-- 1. Remove our read lock
|
|
redis.call('HDEL', readers_key, old_token)
|
|
-- 2. Remove from upgrading readers set
|
|
redis.call('SREM', upgrading_readers_key, old_token)
|
|
-- 3. Set write lock
|
|
redis.call('SET', writer_active_key, new_token, 'EX', ttl)
|
|
-- 4. Remove from writer queue
|
|
redis.call('LPOP', writer_queue_key)
|
|
|
|
return 1 -- Successfully upgraded
|
|
LUA;
|
|
|
|
$result = $redis->eval(
|
|
$lua,
|
|
[$lock_key, $writer_queue_key, $readers_key, $writer_active_key, $upgrading_readers_key, $old_token, $new_token, self::$default_timeout],
|
|
5
|
|
);
|
|
|
|
if ($result === 1) {
|
|
return true;
|
|
} elseif ($result === -1) {
|
|
// Lost our read lock - this shouldn't happen
|
|
$redis->lRem($writer_queue_key, $new_token, 0);
|
|
$redis->sRem($upgrading_readers_key, $old_token);
|
|
throw new RuntimeException("Lost read lock during upgrade attempt");
|
|
}
|
|
|
|
// Wait before retrying
|
|
usleep(self::$poll_interval * 1000000);
|
|
}
|
|
} catch (Exception $e) {
|
|
// Remove from queues on error
|
|
$redis->lRem($writer_queue_key, $new_token, 0);
|
|
$redis->sRem($upgrading_readers_key, $old_token);
|
|
throw $e;
|
|
}
|
|
|
|
// Timeout - remove from queues
|
|
$redis->lRem($writer_queue_key, $new_token, 0);
|
|
$redis->sRem($upgrading_readers_key, $old_token);
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Release a read lock
|
|
*/
|
|
private static function _release_read_lock(Redis $redis, string $lock_key, string $lock_token): void
|
|
{
|
|
$readers_key = "{$lock_key}:readers";
|
|
$redis->hDel($readers_key, $lock_token);
|
|
}
|
|
|
|
/**
|
|
* Release a write lock
|
|
*/
|
|
private static function _release_write_lock(Redis $redis, string $lock_key, string $lock_token): void
|
|
{
|
|
$writer_active_key = "{$lock_key}:writer_active";
|
|
|
|
// Only delete if we own it
|
|
$lua = <<<'LUA'
|
|
local writer_active_key = KEYS[1]
|
|
local lock_token = ARGV[1]
|
|
|
|
local current_writer = redis.call('GET', writer_active_key)
|
|
if current_writer == lock_token then
|
|
redis.call('DEL', writer_active_key)
|
|
return 1
|
|
end
|
|
return 0
|
|
LUA;
|
|
|
|
$redis->eval($lua, [$writer_active_key, $lock_token], 1);
|
|
}
|
|
|
|
/**
|
|
* Cleanup locks on process exit
|
|
* Called by shutdown handler
|
|
*/
|
|
public static function _cleanup_locks(): void
|
|
{
|
|
foreach (array_keys(self::$held_locks) as $lock_token) {
|
|
try {
|
|
self::release_lock($lock_token);
|
|
} catch (Exception $e) {
|
|
// Ignore errors during cleanup
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Force clear all locks for a given name (emergency use only)
|
|
*
|
|
* @param string $domain Lock domain
|
|
* @param string $name Lock name
|
|
*/
|
|
public static function force_clear_lock(string $domain, string $name): void
|
|
{
|
|
$redis = self::_ensure_redis();
|
|
|
|
// Skip if Redis not available in IDE
|
|
if ($redis === null) {
|
|
return;
|
|
}
|
|
$lock_key = "lock:{$domain}:{$name}";
|
|
|
|
$redis->del([
|
|
$lock_key,
|
|
"{$lock_key}:writer_queue",
|
|
"{$lock_key}:readers",
|
|
"{$lock_key}:writer_active"
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Get lock statistics for monitoring
|
|
*
|
|
* @param string $domain Lock domain
|
|
* @param string $name Lock name
|
|
* @return array Lock statistics
|
|
*/
|
|
public static function get_lock_stats(string $domain, string $name): array
|
|
{
|
|
$redis = self::_ensure_redis();
|
|
|
|
// Skip if Redis not available in IDE
|
|
if ($redis === null) {
|
|
return [
|
|
'readers_active' => 0,
|
|
'writers_waiting' => 0,
|
|
'writer_active' => false,
|
|
'writer_token' => null
|
|
];
|
|
}
|
|
$lock_key = "lock:{$domain}:{$name}";
|
|
|
|
$writer_queue = $redis->lLen("{$lock_key}:writer_queue");
|
|
$reader_count = $redis->hLen("{$lock_key}:readers");
|
|
$writer_active = $redis->get("{$lock_key}:writer_active");
|
|
|
|
return [
|
|
'readers_active' => $reader_count,
|
|
'writers_waiting' => $writer_queue,
|
|
'writer_active' => $writer_active !== false,
|
|
'writer_token' => $writer_active ?: null
|
|
];
|
|
}
|
|
} |