logger = VLogger::getInstance(); $this->db = VDatabase::getInstance(); try { $this->redis = VRedis::getInstance(); $this->useDatabase = !$this->redis->isConnected(); } catch (Exception $e) { $this->useDatabase = true; $this->logger->warning('Redis unavailable, using database queue', [ 'error' => $e->getMessage() ]); } } /** * Add job to queue with database fallback * @param string $jobClass Job class name * @param array $data Job data * @param string $queue Queue name * @param int $delay Delay in seconds * @param int $priority Priority (higher = more important) * @return string|false Job ID or false on failure */ public function enqueue($jobClass, $data = [], $queue = 'default', $delay = 0, $priority = 0) { $jobId = $this->generateJobId(); $availableAt = date('Y-m-d H:i:s', time() + $delay); if ($this->useDatabase) { return $this->enqueueToDB($jobId, $jobClass, $data, $queue, $delay, $priority, $availableAt); } else { return $this->enqueueToRedis($jobId, $jobClass, $data, $queue, $delay, $priority); } } /** * Get next job from queue * @param array $queues Queue names to check * @param int $timeout Timeout in seconds * @return array|false Job data or false if no job */ public function dequeue($queues = ['default'], $timeout = 10) { if ($this->useDatabase) { return $this->dequeueFromDB($queues); } else { return $this->dequeueFromRedis($queues, $timeout); } } /** * Mark job as completed * @param string $jobId Job ID * @param mixed $result Job result * @return bool Success status */ public function markCompleted($jobId, $result = null) { if ($this->useDatabase) { return $this->markCompletedInDB($jobId, $result); } else { return $this->markCompletedInRedis($jobId, $result); } } /** * Mark job as failed * @param string $jobId Job ID * @param string $error Error message * @return bool Success status */ public function markFailed($jobId, $error) { if ($this->useDatabase) { return $this->markFailedInDB($jobId, $error); } else { return $this->markFailedInRedis($jobId, $error); } } /** * Database queue implementation */ private function enqueueToDB($jobId, $jobClass, $data, $queue, $delay, $priority, $availableAt) { try { $jobData = [ 'id' => $jobId, 'queue' => $queue, 'class' => $jobClass, 'data' => json_encode($data), 'priority' => $priority, 'attempts' => 0, 'max_attempts' => 3, 'status' => 'pending', 'available_at' => $availableAt, 'created_at' => date('Y-m-d H:i:s'), 'updated_at' => date('Y-m-d H:i:s') ]; $this->db->doInsert('db_queue_jobs', $jobData); $this->logger->info('Job enqueued to database', [ 'job_id' => $jobId, 'class' => $jobClass, 'queue' => $queue ]); return $jobId; } catch (Exception $e) { $this->logger->error('Failed to enqueue job to database', [ 'job_id' => $jobId, 'class' => $jobClass, 'error' => $e->getMessage() ]); return false; } } private function dequeueFromDB($queues) { try { // Process delayed jobs first $this->processDelayedJobsInDB(); $queueList = "'" . implode("','", $queues) . "'"; // Get next available job with row locking $query = "SELECT * FROM db_queue_jobs WHERE queue IN ({$queueList}) AND status = 'pending' AND available_at <= NOW() ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE"; $result = $this->db->doQuery($query); $job = $this->db->doFetch($result); if (!$job) { return false; } // Mark as processing $this->db->doUpdate('db_queue_jobs', 'id', [ 'status' => 'processing', 'attempts' => $job['attempts'] + 1, 'updated_at' => date('Y-m-d H:i:s') ], $job['id']); // Decode job data $job['data'] = json_decode($job['data'], true); $job['attempts']++; return $job; } catch (Exception $e) { $this->logger->error('Failed to dequeue job from database', [ 'queues' => $queues, 'error' => $e->getMessage() ]); return false; } } private function markCompletedInDB($jobId, $result) { try { $this->db->doUpdate('db_queue_jobs', 'id', [ 'status' => 'completed', 'updated_at' => date('Y-m-d H:i:s') ], $jobId); return true; } catch (Exception $e) { $this->logger->error('Failed to mark job completed in database', [ 'job_id' => $jobId, 'error' => $e->getMessage() ]); return false; } } private function markFailedInDB($jobId, $error) { try { $job = $this->getJobFromDB($jobId); if (!$job) { return false; } if ($job['attempts'] < $job['max_attempts']) { // Retry with exponential backoff $delay = pow(2, $job['attempts']) * 60; $availableAt = date('Y-m-d H:i:s', time() + $delay); $this->db->doUpdate('db_queue_jobs', 'id', [ 'status' => 'pending', 'error_message' => $error, 'available_at' => $availableAt, 'updated_at' => date('Y-m-d H:i:s') ], $jobId); $this->logger->warning('Job failed, retrying', [ 'job_id' => $jobId, 'attempt' => $job['attempts'], 'retry_in' => $delay, 'error' => $error ]); } else { // Max attempts reached $this->db->doUpdate('db_queue_jobs', 'id', [ 'status' => 'failed', 'error_message' => $error, 'updated_at' => date('Y-m-d H:i:s') ], $jobId); $this->logger->error('Job failed permanently', [ 'job_id' => $jobId, 'attempts' => $job['attempts'], 'error' => $error ]); } return true; } catch (Exception $e) { $this->logger->error('Failed to mark job failed in database', [ 'job_id' => $jobId, 'error' => $e->getMessage() ]); return false; } } private function processDelayedJobsInDB() { try { $this->db->doQuery( "UPDATE db_queue_jobs SET status = 'pending', updated_at = NOW() WHERE status = 'pending' AND available_at <= NOW()" ); } catch (Exception $e) { $this->logger->error('Failed to process delayed jobs in database', [ 'error' => $e->getMessage() ]); } } private function getJobFromDB($jobId) { try { $query = "SELECT * FROM db_queue_jobs WHERE id = ?"; $result = $this->db->doQuery($query, [$jobId]); return $this->db->doFetch($result); } catch (Exception $e) { return false; } } /** * Redis queue implementation (delegated to existing VQueue) */ private function enqueueToRedis($jobId, $jobClass, $data, $queue, $delay, $priority) { $vqueue = new VQueue(); return $vqueue->enqueue($jobClass, $data, $queue, $delay, $priority); } private function dequeueFromRedis($queues, $timeout) { $vqueue = new VQueue(); return $vqueue->dequeue($queues, $timeout); } private function markCompletedInRedis($jobId, $result) { $vqueue = new VQueue(); return $vqueue->markCompleted($jobId, $result); } private function markFailedInRedis($jobId, $error) { $vqueue = new VQueue(); return $vqueue->markFailed($jobId, $error); } /** * Get comprehensive queue statistics * @return array Statistics */ public function getQueueStatistics() { if ($this->useDatabase) { return $this->getDBQueueStats(); } else { return $this->getRedisQueueStats(); } } private function getDBQueueStats() { try { $query = "SELECT queue, status, COUNT(*) as count FROM db_queue_jobs GROUP BY queue, status"; $result = $this->db->doQuery($query); $stats = []; while ($row = $this->db->doFetch($result)) { $queue = $row['queue']; $status = $row['status']; $count = (int)$row['count']; if (!isset($stats[$queue])) { $stats[$queue] = [ 'pending' => 0, 'processing' => 0, 'completed' => 0, 'failed' => 0 ]; } $stats[$queue][$status] = $count; } return $stats; } catch (Exception $e) { $this->logger->error('Failed to get database queue stats', [ 'error' => $e->getMessage() ]); return []; } } private function getRedisQueueStats() { $vqueue = new VQueue(); return $vqueue->getQueueStats(); } /** * Clean up old completed/failed jobs * @param int $olderThanHours Hours to keep jobs * @return int Number of jobs cleaned */ public function cleanupOldJobs($olderThanHours = 24) { if ($this->useDatabase) { return $this->cleanupDBJobs($olderThanHours); } else { return $this->cleanupRedisJobs($olderThanHours); } } private function cleanupDBJobs($olderThanHours) { try { $cutoffTime = date('Y-m-d H:i:s', time() - ($olderThanHours * 3600)); $query = "DELETE FROM db_queue_jobs WHERE status IN ('completed', 'failed') AND updated_at < ?"; $result = $this->db->doQuery($query, [$cutoffTime]); $deletedCount = $this->db->getAffectedRows(); $this->logger->info('Cleaned up old jobs from database', [ 'deleted_count' => $deletedCount, 'cutoff_time' => $cutoffTime ]); return $deletedCount; } catch (Exception $e) { $this->logger->error('Failed to cleanup database jobs', [ 'error' => $e->getMessage() ]); return 0; } } private function cleanupRedisJobs($olderThanHours) { // Redis jobs have TTL, so they clean up automatically return 0; } /** * Get failed jobs for monitoring * @param int $limit Number of jobs to return * @return array Failed jobs */ public function getFailedJobs($limit = 50) { if ($this->useDatabase) { return $this->getFailedJobsFromDB($limit); } else { return $this->getFailedJobsFromRedis($limit); } } private function getFailedJobsFromDB($limit) { try { $query = "SELECT * FROM db_queue_jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT ?"; $result = $this->db->doQuery($query, [$limit]); $jobs = []; while ($row = $this->db->doFetch($result)) { $row['data'] = json_decode($row['data'], true); $jobs[] = $row; } return $jobs; } catch (Exception $e) { $this->logger->error('Failed to get failed jobs from database', [ 'error' => $e->getMessage() ]); return []; } } private function getFailedJobsFromRedis($limit) { // Implementation would scan Redis for failed jobs // This is more complex with Redis, so returning empty for now return []; } /** * Retry failed job * @param string $jobId Job ID * @return bool Success status */ public function retryFailedJob($jobId) { if ($this->useDatabase) { return $this->retryFailedJobInDB($jobId); } else { return $this->retryFailedJobInRedis($jobId); } } private function retryFailedJobInDB($jobId) { try { $this->db->doUpdate('db_queue_jobs', 'id', [ 'status' => 'pending', 'attempts' => 0, 'error_message' => null, 'available_at' => date('Y-m-d H:i:s'), 'updated_at' => date('Y-m-d H:i:s') ], $jobId); $this->logger->info('Failed job retried', ['job_id' => $jobId]); return true; } catch (Exception $e) { $this->logger->error('Failed to retry job', [ 'job_id' => $jobId, 'error' => $e->getMessage() ]); return false; } } private function retryFailedJobInRedis($jobId) { // Implementation for Redis retry return false; } /** * Generate unique job ID * @return string Job ID */ private function generateJobId() { return uniqid('job_', true) . '_' . time(); } /** * Health check for queue system * @return array Health status */ public function healthCheck() { $health = [ 'status' => 'healthy', 'backend' => $this->useDatabase ? 'database' : 'redis', 'issues' => [] ]; try { $stats = $this->getQueueStatistics(); // Check for stuck processing jobs (processing for more than 1 hour) if ($this->useDatabase) { $stuckQuery = "SELECT COUNT(*) as count FROM db_queue_jobs WHERE status = 'processing' AND updated_at < DATE_SUB(NOW(), INTERVAL 1 HOUR)"; $result = $this->db->doQuery($stuckQuery); $stuckCount = $this->db->doFetch($result)['count']; if ($stuckCount > 0) { $health['issues'][] = "Found {$stuckCount} stuck processing jobs"; $health['status'] = 'warning'; } } $health['statistics'] = $stats; } catch (Exception $e) { $health['status'] = 'unhealthy'; $health['issues'][] = 'Failed to get queue statistics: ' . $e->getMessage(); } return $health; } }