logger = VLogger::getInstance(); $this->queue = new VQueue(); $this->queues = (array)$queues; $this->maxJobs = $maxJobs; $this->maxMemory = $this->parseMemoryLimit($maxMemory); $this->startTime = time(); // Set up signal handlers for graceful shutdown if (function_exists('pcntl_signal')) { pcntl_signal(SIGTERM, [$this, 'handleShutdown']); pcntl_signal(SIGINT, [$this, 'handleShutdown']); } $this->logger->info('Queue worker started', [ 'queues' => $this->queues, 'max_jobs' => $this->maxJobs, 'max_memory' => $maxMemory, 'pid' => getmypid() ]); } /** * Main worker loop */ public function run() { while ($this->running) { try { // Check memory usage if ($this->shouldStop()) { $this->logger->info('Worker stopping due to limits', [ 'processed_jobs' => $this->processedJobs, 'failed_jobs' => $this->failedJobs, 'memory_usage' => memory_get_usage(true), 'uptime' => time() - $this->startTime ]); break; } // Process signals if (function_exists('pcntl_signal_dispatch')) { pcntl_signal_dispatch(); } // Get next job $job = $this->queue->dequeue($this->queues, 5); if (!$job) { continue; // No job available, continue polling } $this->processJob($job); } catch (Exception $e) { $this->logger->error('Worker error', [ 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); // Brief pause to prevent tight error loops sleep(1); } } $this->logger->info('Queue worker stopped', [ 'processed_jobs' => $this->processedJobs, 'failed_jobs' => $this->failedJobs, 'uptime' => time() - $this->startTime ]); } /** * Process a single job * @param array $job Job data */ private function processJob($job) { $jobId = $job['id']; $jobClass = $job['class']; $jobData = $job['data']; $this->logger->info('Processing job', [ 'job_id' => $jobId, 'class' => $jobClass, 'attempt' => $job['attempts'] ]); $startTime = microtime(true); $startMemory = memory_get_usage(true); try { // Validate job class exists if (!class_exists($jobClass)) { throw new Exception("Job class not found: {$jobClass}"); } // Create job instance $jobInstance = new $jobClass(); if (!method_exists($jobInstance, 'handle')) { throw new Exception("Job class {$jobClass} does not have handle method"); } // Execute job $result = $jobInstance->handle($jobData); // Mark job as completed $this->queue->markCompleted($jobId, $result); $processingTime = microtime(true) - $startTime; $memoryUsed = memory_get_usage(true) - $startMemory; $this->processedJobs++; $this->logger->info('Job completed', [ 'job_id' => $jobId, 'class' => $jobClass, 'processing_time' => round($processingTime, 3), 'memory_used' => $memoryUsed, 'result' => is_array($result) ? array_keys($result) : gettype($result) ]); // Record job statistics $this->recordJobStatistics($jobClass, 'completed', $processingTime, $memoryUsed); } catch (Exception $e) { $processingTime = microtime(true) - $startTime; $memoryUsed = memory_get_usage(true) - $startMemory; $this->failedJobs++; $this->logger->error('Job failed', [ 'job_id' => $jobId, 'class' => $jobClass, 'attempt' => $job['attempts'], 'error' => $e->getMessage(), 'processing_time' => round($processingTime, 3), 'memory_used' => $memoryUsed ]); // Mark job as failed (will retry if attempts < max_attempts) $this->queue->markFailed($jobId, $e->getMessage()); // Record job statistics $this->recordJobStatistics($jobClass, 'failed', $processingTime, $memoryUsed, $e->getMessage()); } } /** * Check if worker should stop * @return bool True if worker should stop */ private function shouldStop() { // Check job limit if ($this->processedJobs >= $this->maxJobs) { return true; } // Check memory limit if (memory_get_usage(true) >= $this->maxMemory) { return true; } // Check if running flag is still true return !$this->running; } /** * Handle shutdown signals * @param int $signal Signal number */ public function handleShutdown($signal) { $this->logger->info('Received shutdown signal', ['signal' => $signal]); $this->running = false; } /** * Parse memory limit string to bytes * @param string $limit Memory limit (e.g., '256M', '1G') * @return int Memory limit in bytes */ private function parseMemoryLimit($limit) { $limit = trim($limit); $unit = strtoupper(substr($limit, -1)); $value = (int)substr($limit, 0, -1); switch ($unit) { case 'G': return $value * 1024 * 1024 * 1024; case 'M': return $value * 1024 * 1024; case 'K': return $value * 1024; default: return (int)$limit; } } /** * Record job statistics * @param string $jobType Job type * @param string $status Status (completed/failed) * @param float $processingTime Processing time in seconds * @param int $memoryUsage Memory usage in bytes * @param string $errorMessage Error message (if failed) */ private function recordJobStatistics($jobType, $status, $processingTime, $memoryUsage, $errorMessage = null) { try { $db = VDatabase::getInstance(); $data = [ 'job_type' => $jobType, 'status' => $status, 'processing_time' => $processingTime, 'memory_usage' => $memoryUsage, 'error_message' => $errorMessage, 'created_at' => date('Y-m-d H:i:s') ]; $db->doInsert('db_job_statistics', $data); } catch (Exception $e) { $this->logger->error('Failed to record job statistics', [ 'job_type' => $jobType, 'error' => $e->getMessage() ]); } } } // Parse command line arguments $queues = isset($argv[1]) ? explode(',', $argv[1]) : ['default']; $maxJobs = isset($argv[2]) ? (int)$argv[2] : 1000; $maxMemory = isset($argv[3]) ? $argv[3] : '256M'; // Create and run worker $worker = new QueueWorker($queues, $maxJobs, $maxMemory); $worker->run();