- Created complete documentation in docs/ directory - Added PROJECT_OVERVIEW.md with feature highlights and getting started guide - Added ARCHITECTURE.md with system design and technical details - Added SECURITY.md with comprehensive security implementation guide - Added DEVELOPMENT.md with development workflows and best practices - Added DEPLOYMENT.md with production deployment instructions - Added API.md with complete REST API documentation - Added CONTRIBUTING.md with contribution guidelines - Added CHANGELOG.md with version history and migration notes - Reorganized all documentation files into docs/ directory for better organization - Updated README.md with proper documentation links and quick navigation - Enhanced project structure with professional documentation standards
275 lines
8.7 KiB
PHP
275 lines
8.7 KiB
PHP
<?php
|
|
/*******************************************************************************************************************
|
|
| Enhanced Queue Worker
|
|
| Processes background jobs with proper error handling and monitoring
|
|
|*******************************************************************************************************************/
|
|
|
|
define('_ISVALID', true);
|
|
require_once __DIR__ . '/../f_core/config.core.php';
|
|
|
|
// Auto-load job classes
|
|
spl_autoload_register(function ($className) {
|
|
$jobFile = __DIR__ . '/../f_jobs/' . $className . '.php';
|
|
if (file_exists($jobFile)) {
|
|
require_once $jobFile;
|
|
}
|
|
});
|
|
|
|
class QueueWorker
|
|
{
|
|
private $logger;
|
|
private $queue;
|
|
private $running = true;
|
|
private $processedJobs = 0;
|
|
private $failedJobs = 0;
|
|
private $startTime;
|
|
private $maxJobs;
|
|
private $maxMemory;
|
|
private $queues;
|
|
|
|
public function __construct($queues = ['default'], $maxJobs = 1000, $maxMemory = '256M')
|
|
{
|
|
$this->logger = VLogger::getInstance();
|
|
$this->queue = new VQueue();
|
|
$this->queues = (array)$queues;
|
|
$this->maxJobs = $maxJobs;
|
|
$this->maxMemory = $this->parseMemoryLimit($maxMemory);
|
|
$this->startTime = time();
|
|
|
|
// Set up signal handlers for graceful shutdown
|
|
if (function_exists('pcntl_signal')) {
|
|
pcntl_signal(SIGTERM, [$this, 'handleShutdown']);
|
|
pcntl_signal(SIGINT, [$this, 'handleShutdown']);
|
|
}
|
|
|
|
$this->logger->info('Queue worker started', [
|
|
'queues' => $this->queues,
|
|
'max_jobs' => $this->maxJobs,
|
|
'max_memory' => $maxMemory,
|
|
'pid' => getmypid()
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Main worker loop
|
|
*/
|
|
public function run()
|
|
{
|
|
while ($this->running) {
|
|
try {
|
|
// Check memory usage
|
|
if ($this->shouldStop()) {
|
|
$this->logger->info('Worker stopping due to limits', [
|
|
'processed_jobs' => $this->processedJobs,
|
|
'failed_jobs' => $this->failedJobs,
|
|
'memory_usage' => memory_get_usage(true),
|
|
'uptime' => time() - $this->startTime
|
|
]);
|
|
break;
|
|
}
|
|
|
|
// Process signals
|
|
if (function_exists('pcntl_signal_dispatch')) {
|
|
pcntl_signal_dispatch();
|
|
}
|
|
|
|
// Get next job
|
|
$job = $this->queue->dequeue($this->queues, 5);
|
|
|
|
if (!$job) {
|
|
continue; // No job available, continue polling
|
|
}
|
|
|
|
$this->processJob($job);
|
|
|
|
} catch (Exception $e) {
|
|
$this->logger->error('Worker error', [
|
|
'error' => $e->getMessage(),
|
|
'trace' => $e->getTraceAsString()
|
|
]);
|
|
|
|
// Brief pause to prevent tight error loops
|
|
sleep(1);
|
|
}
|
|
}
|
|
|
|
$this->logger->info('Queue worker stopped', [
|
|
'processed_jobs' => $this->processedJobs,
|
|
'failed_jobs' => $this->failedJobs,
|
|
'uptime' => time() - $this->startTime
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Process a single job
|
|
* @param array $job Job data
|
|
*/
|
|
private function processJob($job)
|
|
{
|
|
$jobId = $job['id'];
|
|
$jobClass = $job['class'];
|
|
$jobData = $job['data'];
|
|
|
|
$this->logger->info('Processing job', [
|
|
'job_id' => $jobId,
|
|
'class' => $jobClass,
|
|
'attempt' => $job['attempts']
|
|
]);
|
|
|
|
$startTime = microtime(true);
|
|
$startMemory = memory_get_usage(true);
|
|
|
|
try {
|
|
// Validate job class exists
|
|
if (!class_exists($jobClass)) {
|
|
throw new Exception("Job class not found: {$jobClass}");
|
|
}
|
|
|
|
// Create job instance
|
|
$jobInstance = new $jobClass();
|
|
|
|
if (!method_exists($jobInstance, 'handle')) {
|
|
throw new Exception("Job class {$jobClass} does not have handle method");
|
|
}
|
|
|
|
// Execute job
|
|
$result = $jobInstance->handle($jobData);
|
|
|
|
// Mark job as completed
|
|
$this->queue->markCompleted($jobId, $result);
|
|
|
|
$processingTime = microtime(true) - $startTime;
|
|
$memoryUsed = memory_get_usage(true) - $startMemory;
|
|
|
|
$this->processedJobs++;
|
|
|
|
$this->logger->info('Job completed', [
|
|
'job_id' => $jobId,
|
|
'class' => $jobClass,
|
|
'processing_time' => round($processingTime, 3),
|
|
'memory_used' => $memoryUsed,
|
|
'result' => is_array($result) ? array_keys($result) : gettype($result)
|
|
]);
|
|
|
|
// Record job statistics
|
|
$this->recordJobStatistics($jobClass, 'completed', $processingTime, $memoryUsed);
|
|
|
|
} catch (Exception $e) {
|
|
$processingTime = microtime(true) - $startTime;
|
|
$memoryUsed = memory_get_usage(true) - $startMemory;
|
|
|
|
$this->failedJobs++;
|
|
|
|
$this->logger->error('Job failed', [
|
|
'job_id' => $jobId,
|
|
'class' => $jobClass,
|
|
'attempt' => $job['attempts'],
|
|
'error' => $e->getMessage(),
|
|
'processing_time' => round($processingTime, 3),
|
|
'memory_used' => $memoryUsed
|
|
]);
|
|
|
|
// Mark job as failed (will retry if attempts < max_attempts)
|
|
$this->queue->markFailed($jobId, $e->getMessage());
|
|
|
|
// Record job statistics
|
|
$this->recordJobStatistics($jobClass, 'failed', $processingTime, $memoryUsed, $e->getMessage());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if worker should stop
|
|
* @return bool True if worker should stop
|
|
*/
|
|
private function shouldStop()
|
|
{
|
|
// Check job limit
|
|
if ($this->processedJobs >= $this->maxJobs) {
|
|
return true;
|
|
}
|
|
|
|
// Check memory limit
|
|
if (memory_get_usage(true) >= $this->maxMemory) {
|
|
return true;
|
|
}
|
|
|
|
// Check if running flag is still true
|
|
return !$this->running;
|
|
}
|
|
|
|
/**
|
|
* Handle shutdown signals
|
|
* @param int $signal Signal number
|
|
*/
|
|
public function handleShutdown($signal)
|
|
{
|
|
$this->logger->info('Received shutdown signal', ['signal' => $signal]);
|
|
$this->running = false;
|
|
}
|
|
|
|
/**
|
|
* Parse memory limit string to bytes
|
|
* @param string $limit Memory limit (e.g., '256M', '1G')
|
|
* @return int Memory limit in bytes
|
|
*/
|
|
private function parseMemoryLimit($limit)
|
|
{
|
|
$limit = trim($limit);
|
|
$unit = strtoupper(substr($limit, -1));
|
|
$value = (int)substr($limit, 0, -1);
|
|
|
|
switch ($unit) {
|
|
case 'G':
|
|
return $value * 1024 * 1024 * 1024;
|
|
case 'M':
|
|
return $value * 1024 * 1024;
|
|
case 'K':
|
|
return $value * 1024;
|
|
default:
|
|
return (int)$limit;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Record job statistics
|
|
* @param string $jobType Job type
|
|
* @param string $status Status (completed/failed)
|
|
* @param float $processingTime Processing time in seconds
|
|
* @param int $memoryUsage Memory usage in bytes
|
|
* @param string $errorMessage Error message (if failed)
|
|
*/
|
|
private function recordJobStatistics($jobType, $status, $processingTime, $memoryUsage, $errorMessage = null)
|
|
{
|
|
try {
|
|
$db = VDatabase::getInstance();
|
|
|
|
$data = [
|
|
'job_type' => $jobType,
|
|
'status' => $status,
|
|
'processing_time' => $processingTime,
|
|
'memory_usage' => $memoryUsage,
|
|
'error_message' => $errorMessage,
|
|
'created_at' => date('Y-m-d H:i:s')
|
|
];
|
|
|
|
$db->doInsert('db_job_statistics', $data);
|
|
|
|
} catch (Exception $e) {
|
|
$this->logger->error('Failed to record job statistics', [
|
|
'job_type' => $jobType,
|
|
'error' => $e->getMessage()
|
|
]);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Parse command line arguments
|
|
$queues = isset($argv[1]) ? explode(',', $argv[1]) : ['default'];
|
|
$maxJobs = isset($argv[2]) ? (int)$argv[2] : 1000;
|
|
$maxMemory = isset($argv[3]) ? $argv[3] : '256M';
|
|
|
|
// Create and run worker
|
|
$worker = new QueueWorker($queues, $maxJobs, $maxMemory);
|
|
$worker->run();
|
|
|