freenet-router |
Subversion Repositories: |
Compare with Previous - Blame - Download
<?php
namespace Phem\Environment;
use DateTime;
use Phem\Core\Collection;
use Phem\Libraries\MessageBus\CometPusher;
use Ratchet\Wamp\Exception;
/**
* @author kubapet
*/
class Application
{
private static function injectFCGIRequest($query)
{
$cmd = "SCRIPT_NAME=".getcwd().DS."index.php " .
"SCRIPT_FILENAME=".getcwd().DS."index.php " .
"DOCUMENT_ROOT=".getcwd()." " .
"HTTP_HOST=".FPM_INJECT_HTTP_HOST." " .
"REQUEST_METHOD=GET " .
"QUERY_STRING=\"".$query."\" " .
"REQUEST_URI=\"/index.php".$query."\" " .
"cgi-fcgi -bind -connect 127.0.0.1:".FPM_INJECT_PORT." > /tmp/app.log &";
echo $cmd;
echo "BEGIN".exec($cmd)."END";
}
public static function runParallelTask($controller,$task,$args = null)
{
$query = "controller=$controller&task=$task";
foreach ($args as $key => $arg)
{
$query .= "&$key=$arg";
}
self::injectFCGIRequest($query);
}
/**
*
* @return Collection
*/
public static function getUsers()
{
if (self::getVar("Phem.".APP_NAMESPACE.".Users") == null)
{
self::setVar("Phem.".APP_NAMESPACE.".Users",new Collection());
}
return self::getVar("Phem.".APP_NAMESPACE.".Users");
}
public static function setUsers(Collection $users)
{
self::setVar("Phem.".APP_NAMESPACE.".Users",$users);
}
/**
*
* @return Collection
*/
public static function getRecentlyLoggedUsers()
{
if (self::getVar("Phem.".APP_NAMESPACE.".RLUsers") == null)
{
self::setVar("Phem.".APP_NAMESPACE.".RLUsers",new Collection());
}
return self::getVar("Phem.".APP_NAMESPACE.".RLUsers");
}
public static function setRecentlyLoggedUsers(Collection $users)
{
self::setVar("Phem.".APP_NAMESPACE.".RLUsers",$users);
}
public static function onNewConnection($cid,$sid,$uid)
{
$lock = self::lockVarsWait();
$users = self::getUsers();
$rlUsers = self::getRecentlyLoggedUsers();
if (!($users->containsKey($uid)))
{
$users->put($uid,new Collection());
if (!($rlUsers->containsKey($uid)))
{
$msgArgs = new MessageArgs();
$msgArgs->setActionUrl(EnvironmentManager::getLinkBuilder()
->navigate("Administration","loggedUsers"));
$msgArgs->setActionType("replace");
$msgArgs->setActionTarget("loggedUsers");
$msg = new Message;
$msg->setSubject('UĹživatel je online');
$msg->getArgs()->add($msgArgs);
$msg->setDate(new DateTime(date("Y-m-d")));
$msg->setFromUsr($uid);
$msgArgs->setMessage($msg);
//$msg->setText($uid);
Application::notify($msg);
}
}
$sessions = $users->get($uid);
if (!($sessions->containsKey($sid)))
{
$sessions->set($sid,new Collection());
}
$connections = $sessions->get($sid);
if (!($connections->containsKey($cid)))
{
$connections->set($cid,$cid);
}
self::setUsers($users);
$lock->unlock();
}
public static function onConnectionClose($cid)
{
//$lb = EnvironmentManager::getLinkBuilder();
//$lb->externalLink(array("controller"=>"dispatcher",""));
$uid = null;
$sid = null;
$lock = self::lockVarsWait();
$users = self::getUsers();
foreach ($users as $user => $sessions)
{
foreach ($sessions as $session => $connections)
{
foreach ($connections as $connection)
{
if ($connection == $cid)
{
$uid = $user;
$sid = $session;
break 3;
}
}
}
}
$sessions = $users->get($uid);
$connections = $sessions->get($sid);
$connections->removeKey($cid);
if ($connections->isEmpty())
{
$sessions->removeKey($sid);
}
if ($sessions->isEmpty())
{
$users->removeKey($uid);
self::setUsers($users);
$rlUsers = self::getRecentlyLoggedUsers();
$rlUsers->put($uid,$uid);
self::setRecentlyLoggedUsers($rlUsers);
$lock->unlock();
//exec("php ".getcwd().DS.FRAMEWORK_DIR.DS."Static".DS."userWentOffline.php ".$uid." &");
//exec("php ".getcwd().DS.FRAMEWORK_DIR.DS."Static".DS."userWentOffline.sh ".$uid." &");
self::runParallelTask("Notification", "userWentOffline", array("uid"=>$uid));
return;
}
self::setUsers($users);
$lock->unlock();
}
public static function setVar($key, $value, $ttl = 0)
{
switch (strtolower(APPLICATION_SCOPE_STORAGE))
{
case 'xcache':
if (!function_exists('xcache_set'))
{
throw new ApplicationException('XCache support is missing');
}
xcache_set($key, serialize($value), $ttl);
break;
case 'apc':
if (!function_exists('apc_store'))
{
throw new ApplicationException('APC support is missing');
}
apc_store($key, $value, $ttl);
break;
default:
case 'disabled':
throw new ApplicationException(
'Application scope backend not configured'
);
}
}
public static function getVar($key)
{
switch (strtolower(APPLICATION_SCOPE_STORAGE))
{
case 'xcache':
if (!function_exists('xcache_set'))
{
throw new ApplicationException('XCache support is missing');
}
return unserialize(\xcache_get($key));
case 'apc':
if (!function_exists('apc_store'))
{
throw new ApplicationException('APC support is missing');
}
return apc_fetch($key);
default:
case 'disabled':
throw new ApplicationException(
'Application scope backend not configured'
);
}
}
public static function deleteVar($key)
{
switch (strtolower(APPLICATION_SCOPE_STORAGE))
{
case 'xcache':
if (!function_exists('xcache_set'))
{
throw new ApplicationException('XCache support is missing');
}
xcache_unset($key);
break;
case 'apc':
if (!function_exists('apc_store'))
{
throw new ApplicationException('APC support is missing');
}
apc_delete($key);
break;
default:
case 'disabled':
throw new ApplicationException(
'Application scope backend not configured'
);
}
}
public static function lockVarsWait()
{
$lock = new \Phem\Core\ExclusiveLock("appscope");
while(!$lock->lock())
{
sleep(0.01);
}
return $lock;
}
public static function getUserSessions($username)
{
$sessions = new Collection();
//$iter = new \APCIterator('user', '/user.name/');
$iter = \xcache_list(XC_TYPE_VAR, 0)['cache_list'];
//print_r($iter);
foreach ($iter as $item)
{
if (strpos($item["name"], 'user.name') === false)
continue;
if (self::getVar($item['name']) == $username)
{
$session = str_replace('user.name.', '', $item['name']);
$sessions->add($session);
}
}
return $sessions;
}
public static function getLoggedUsers()
{
$em = EnvironmentManager::getEntityManager();
$usernames = self::getUsers()->getKeys();
$usersCollection = new Collection();
if ((is_array($usernames))&&(count($usernames)>0))
{
$users = $em->getRepository('Phem\Libraries\Security\Model\MySQL\User')
->findBy(array('username' => self::getUsers()->getKeys()));
}
else
{
return $usersCollection;
}
foreach ($users as $user)
{
$usersCollection->put($user->getUsername(), $user);
}
return $usersCollection;
}
/* public static function getLoggedUsers()
{
// if (!class_exists("\APCIterator"))
// throw new ApplicationException('APC support is missing');
$users = new Collection();
//$iter = new \APCIterator('user', '/user.name/');
$iter = \xcache_list(XC_TYPE_VAR, 0)["cache_list"];
foreach ($iter as $item)
{
if (strpos($item["name"], 'user.name') === false)
continue;
//echo $item['key'].'<br/>';
$usrOrig = EnvironmentManager::getUserManager()->getUser(self::getVar($item["name"]));
if (!is_a($usrOrig, '\Phem\Libraries\Security\Model\Common\User'))
continue;
$usr = clone $usrOrig;
$session = str_replace('user.name.', '', $item['name']);
$usr->setSession($session);
$users->set($usr->getUsername(), $usr);
}
return $users;
}
*/
public static function notifyRaw($data)
{
if ((MESSAGEBUS_MODE == "Both") || (MESSAGEBUS_MODE == "WebSocket"))
{
$pusherClassName = ROOT_NAMESPACE."\\Libraries\\MessageBus\\"
. WEBSOCKET_PULLER_MODE."Pusher";
if (class_exists($pusherClassName))
{
$pusher = new $pusherClassName();
$pusher->push($data);
}
else if (strtolower(WEBSOCKET_PULLER_MODE) != "disabled")
{
throw new Exception
("Unsupported mode given by WEBSOCKET_PULLER_MODE");
}
}
if ((MESSAGEBUS_MODE == "Both") || (MESSAGEBUS_MODE == "Comet"))
{
$pusher = new CometPusher();
$pusher->push($data);
}
}
public static function notify(Message $message)
{
$em = EnvironmentManager::getEntityManager();
$em->persist($message);
$em->flush();
$em->clear();
self::notifyRaw($message);
}
public static function heartbeat($username)
{
self::setVar('user.name.' . EnvironmentManager::getSession()->getId(), $username);
}
public static function waitForMessages($timeout)
{
self::setVar('user.pid.' . EnvironmentManager::getSession()->getId(), getmypid());
if (!function_exists('pcntl_sigtimedwait'))
{
throw new ApplicationException('PCNTL support is missing (only available on POSIX platform)');
}
//suspend session and db connection before waiting for signal
EnvironmentManager::getSession()->writeClose();
$info = array();
\pcntl_sigtimedwait(array(SIGUSR1), $info, $timeout);
EnvironmentManager::getSession()->reopen();
self::deleteVar('user.pid.' . EnvironmentManager::getSession()->getId());
$messages = new Collection();
//echo 'user.queue.' . EnvironmentManager::getSession()->getId();
//$iter = new \APCIterator('user', '/user.queue.' . EnvironmentManager::getSession()->getId() . '/');
$iter = \xcache_list(XC_TYPE_VAR, 0)['cache_list'];
$varsToDelete = new Collection();
foreach ($iter as $item)
{
if (strpos($item["name"], 'user.queue') === false)
continue;
//echo $item["name"]."<br />";
$messages->add(self::getVar($item['name']));
$varsToDelete->add($item['name']);
}
foreach ($varsToDelete as $var)
{
self::deleteVar($var);
}
return $messages;
}
}