* @license BSD */ namespace Domframework; /** * Manage a queue in file * A process can add entries to the end of a queue * A process can get all the entries in a queue * A process can clear all the entries in a queue * A process can get the first entry in the queue (FIFO), with optional * removing of the entry * A process can get the last entry in the queue (LIFO), with optional * removing of the entry * A process can get X entries starting at position Y */ class Queuefile extends Queue { /** * The queue connected object */ private $queue = null; /** * Connect to the queue. Create it if not exists * @param string $dsn The DSN to connect to queue * @return $this; */ public function connect($dsn) { // The DSN format must be file:///tmp/queuefile.txt if (substr($dsn, 0, 7) !== "file://") { throw new \Exception("Invalid DSN provided to queuefile : not starting " . "by file://", 500); } $this->queue = substr($dsn, 7); $file = new File(); if (! $file->file_exists(dirname($this->queue))) { $file->mkdir(dirname($this->queue), 0777, true); } if (! $file->file_exists($this->queue)) { $file->touch($this->queue); } return $this; } /** * Add a new entry to the end of the queue * @param mixed $entry The entry to add * @return $this; */ public function add($entry) { $file = new File(); $file->lockEX($this->queue); $file->file_put_contents( $this->queue, json_encode($entry) . "\n", FILE_APPEND ); $file->lockUN($this->queue); return $this; } /** * Get all the entries of the queue * @param boolean|null $delete If true, delete the read entry * @return array */ public function getAll($delete = false) { $file = new File(); if ($delete) { $file->lockEX($this->queue); } else { $file->lockSH($this->queue); } $content = $file->file_get_contents($this->queue); $entries = explode("\n", $content); // Explode return always one empty entry at end. Remove it array_pop($entries); foreach ($entries as $key => $val) { $entries[$key] = json_decode($val); } if ($delete) { $file->file_put_contents($this->queue, ""); } $file->lockUN($this->queue); return $entries; } /** * Get the number of entries in the queue */ public function count() { $file = new File(); $file->lockSH($this->queue); $content = $file->file_get_contents($this->queue); $count = substr_count($content, "\n"); $file->lockUN($this->queue); return $count; } /** * Clear all the entries of the queue * @return $this; */ public function clear() { $file = new File(); $file->lockEX($this->queue); $file->file_put_contents($this->queue, ""); $file->lockUN($this->queue); return $this; } /** * Get the first entry in the queue with optional removing of the entry * @param boolean|null $delete If true, delete the read entry * @return mixed Return null if there is no entry, or the first entry in the * queue (FIFO) */ public function getFirst($delete = false) { $file = new File(); if ($delete) { $file->lockEX($this->queue); } else { $file->lockSH($this->queue); } $content = $file->file_get_contents($this->queue); $entries = explode("\n", $content); // Explode return always one empty entry at end. Remove it array_pop($entries); if (count($entries) === 0) { $res = null; } else { $json = array_shift($entries); $res = json_decode($json); if ($delete) { $file->file_put_contents( $this->queue, implode("\n", $entries) ); } } $file->lockUN($this->queue); return $res; } /** * Get the last entry in the queue with optional removing of the entry * @param boolean|null $delete If true, delete the read entry * @return mixed Return null if there is no entry, or the last entry in the * queue (LIFO) */ public function getLast($delete = false) { $file = new File(); if ($delete) { $file->lockEX($this->queue); } else { $file->lockSH($this->queue); } $content = $file->file_get_contents($this->queue); $entries = explode("\n", $content); // Explode return always one empty entry at end. Remove it array_pop($entries); if (count($entries) === 0) { $res = null; } else { $json = array_pop($entries); $res = json_decode($json); if ($delete) { $file->file_put_contents( $this->queue, implode("\n", $entries) ); } } $file->lockUN($this->queue); return $res; } /** * Get X entries starting at position Y * @param integer $start the starting position (the entries start at position * zero) * @param integer $number The number of entries to get * @param boolean|null $delete If true, delete the read entries * @return array An array of mixed entries */ public function getRange($start, $number, $delete = false) { $file = new File(); if ($delete) { $file->lockEX($this->queue); } else { $file->lockSH($this->queue); } $content = $file->file_get_contents($this->queue); $entries = explode("\n", $content); // Explode return always one empty entry at end. Remove it array_pop($entries); $res = []; if (count($entries) > 0) { for ($i = $start; $i < $start + $number; $i++) { if (! key_exists($i, $entries)) { $file->lockUN($this->queue); throw new \Exception(dgettext( "domframework", "Invalid entry requested" ), 406); } $json = $entries[$i]; $content = json_decode($json, true); if (json_last_error() === JSON_ERROR_NONE) { $res[] = $content; } else { $res[] = $json; } } if ($delete) { $file->file_put_contents( $this->queue, implode("\n", $entries) ); } } $file->lockUN($this->queue); return $res; } }