Files
DomFramework/src/Queuefile.php

229 lines
6.2 KiB
PHP

<?php
/** DomFramework
* @package domframework
* @author Dominique Fournier <dominique@fournier38.fr>
* @license BSD
*/
namespace Domframework;
/** Manage a queue in file
* A process can add entries to the end of a queue
* A process can get all the entries in a queue
* A process can clear all the entries in a queue
* A process can get the first entry in the queue (FIFO), with optional
* removing of the entry
* A process can get the last entry in the queue (LIFO), with optional
* removing of the entry
* A process can get X entries starting at position Y
*/
class Queuefile extends Queue
{
/** The queue connected object
*/
private $queue = null;
/** Connect to the queue. Create it if not exists
* @param string $dsn The DSN to connect to queue
* @return $this;
*/
public function connect ($dsn)
// {{{
{
// The DSN format must be file:///tmp/queuefile.txt
if (substr ($dsn, 0, 7) !== "file://")
throw new \Exception ("Invalid DSN provided to queuefile : not starting ".
"by file://", 500);
$this->queue = substr ($dsn, 7);
$file = new \file ();
if (! $file->file_exists (dirname ($this->queue)))
$file->mkdir (dirname ($this->queue), 0777, true);
if (! $file->file_exists ($this->queue))
$file->touch ($this->queue);
return $this;
}
// }}}
/** Add a new entry to the end of the queue
* @param mixed $entry The entry to add
* @return $this;
*/
public function add ($entry)
// {{{
{
$file = new \file ();
$file->lockEX ($this->queue);
$file->file_put_contents ($this->queue,
json_encode ($entry)."\n", FILE_APPEND);
$file->lockUN ($this->queue);
return $this;
}
// }}}
/** Get all the entries of the queue
* @param boolean|null $delete If true, delete the read entry
* @return array
*/
public function getAll ($delete =false)
// {{{
{
$file = new \file ();
if ($delete)
$file->lockEX ($this->queue);
else
$file->lockSH ($this->queue);
$content = $file->file_get_contents ($this->queue);
$entries = explode ("\n", $content);
// Explode return always one empty entry at end. Remove it
array_pop ($entries);
foreach ($entries as $key => $val)
{
$entries[$key] = json_decode ($val);
}
if ($delete)
$file->file_put_contents ($this->queue, "");
$file->lockUN ($this->queue);
return $entries;
}
// }}}
/** Get the number of entries in the queue
*/
public function count ()
// {{{
{
$file = new \file ();
$file->lockSH ($this->queue);
$content = $file->file_get_contents ($this->queue);
$count = substr_count ($content, "\n");
$file->lockUN ($this->queue);
return $count;
}
// }}}
/** Clear all the entries of the queue
* @return $this;
*/
public function clear ()
// {{{
{
$file = new \file ();
$file->lockEX ($this->queue);
$file->file_put_contents ($this->queue, "");
$file->lockUN ($this->queue);
return $this;
}
// }}}
/** Get the first entry in the queue with optional removing of the entry
* @param boolean|null $delete If true, delete the read entry
* @return mixed Return null if there is no entry, or the first entry in the
* queue (FIFO)
*/
public function getFirst ($delete = false)
// {{{
{
$file = new \file ();
if ($delete)
$file->lockEX ($this->queue);
else
$file->lockSH ($this->queue);
$content = $file->file_get_contents ($this->queue);
$entries = explode ("\n", $content);
// Explode return always one empty entry at end. Remove it
array_pop ($entries);
if (count ($entries) === 0)
$res = null;
else
{
$json = array_shift ($entries);
$res = json_decode ($json);
if ($delete)
$file->file_put_contents ($this->queue,
implode ("\n", $entries));
}
$file->lockUN ($this->queue);
return $res;
}
// }}}
/** Get the last entry in the queue with optional removing of the entry
* @param boolean|null $delete If true, delete the read entry
* @return mixed Return null if there is no entry, or the last entry in the
* queue (LIFO)
*/
public function getLast ($delete = false)
// {{{
{
$file = new \file ();
if ($delete)
$file->lockEX ($this->queue);
else
$file->lockSH ($this->queue);
$content = $file->file_get_contents ($this->queue);
$entries = explode ("\n", $content);
// Explode return always one empty entry at end. Remove it
array_pop ($entries);
if (count ($entries) === 0)
$res = null;
else
{
$json = array_pop ($entries);
$res = json_decode ($json);
if ($delete)
$file->file_put_contents ($this->queue,
implode ("\n", $entries));
}
$file->lockUN ($this->queue);
return $res;
}
// }}}
/** Get X entries starting at position Y
* @param integer $start the starting position (the entries start at position
* zero)
* @param integer $number The number of entries to get
* @param boolean|null $delete If true, delete the read entries
* @return array An array of mixed entries
*/
public function getRange ($start, $number, $delete = false)
// {{{
{
$file = new \file ();
if ($delete)
$file->lockEX ($this->queue);
else
$file->lockSH ($this->queue);
$content = $file->file_get_contents ($this->queue);
$entries = explode ("\n", $content);
// Explode return always one empty entry at end. Remove it
array_pop ($entries);
$res = array ();
if (count ($entries) > 0)
{
for ($i = $start ; $i < $start + $number ; $i++)
{
if (! key_exists ($i, $entries))
{
$file->lockUN ($this->queue);
throw new \Exception (dgettext ("domframework",
"Invalid entry requested"), 406);
}
$json = $entries[$i];
$content = json_decode ($json, true);
if (json_last_error() === JSON_ERROR_NONE)
$res[] = $content;
else
$res[] = $json;
}
if ($delete)
$file->file_put_contents ($this->queue,
implode ("\n", $entries));
}
$file->lockUN ($this->queue);
return $res;
}
// }}}
}