jablonka.czprosek.czf

freenet-router

Subversion Repositories:
[/] [trunk/] [freenet-router/] [var/] [www/] [freenet-router/] [Framework/] [Environment/] [Application.php] - Rev 2

Compare with Previous - Blame - Download


<?php

namespace Phem\Environment;

use DateTime;
use Phem\Core\Collection;
use Phem\Libraries\MessageBus\CometPusher;
use Ratchet\Wamp\Exception;

/**
 * @author kubapet
 */
class Application
{
    private static function injectFCGIRequest($query)
    {
              
        $cmd = "SCRIPT_NAME=".getcwd().DS."index.php " .
        "SCRIPT_FILENAME=".getcwd().DS."index.php " .
        "DOCUMENT_ROOT=".getcwd()." " .
        "HTTP_HOST=".FPM_INJECT_HTTP_HOST." " .
        "REQUEST_METHOD=GET " .
        "QUERY_STRING=\"".$query."\" " .
        "REQUEST_URI=\"/index.php".$query."\" " .
        "cgi-fcgi -bind -connect 127.0.0.1:".FPM_INJECT_PORT." > /tmp/app.log &";
        
        echo $cmd;
        
        echo "BEGIN".exec($cmd)."END";
        
        
    }
    
    public static function runParallelTask($controller,$task,$args = null)
    {
        $query = "controller=$controller&task=$task";
        foreach ($args as $key => $arg)
        {
            $query .= "&$key=$arg";
        }
        
        self::injectFCGIRequest($query);
    }

    /**
     * 
     * @return Collection
     */
    public static function getUsers()
    {
        if (self::getVar("Phem.".APP_NAMESPACE.".Users") == null)
        {
            self::setVar("Phem.".APP_NAMESPACE.".Users",new Collection());
        }
        return self::getVar("Phem.".APP_NAMESPACE.".Users");
    }
    
    public static function setUsers(Collection $users)
    {
         self::setVar("Phem.".APP_NAMESPACE.".Users",$users);
    }
    
    /**
     * 
     * @return Collection
     */
    public static function getRecentlyLoggedUsers()
    {
        if (self::getVar("Phem.".APP_NAMESPACE.".RLUsers") == null)
        {
            self::setVar("Phem.".APP_NAMESPACE.".RLUsers",new Collection());
        }
        return self::getVar("Phem.".APP_NAMESPACE.".RLUsers");
    }
    
    public static function setRecentlyLoggedUsers(Collection $users)
    {
         self::setVar("Phem.".APP_NAMESPACE.".RLUsers",$users);
    }    

    public static function onNewConnection($cid,$sid,$uid)
    {   
        
        $lock = self::lockVarsWait();
        
        $users = self::getUsers();
        $rlUsers = self::getRecentlyLoggedUsers();
        
        if (!($users->containsKey($uid)))
        {
            $users->put($uid,new Collection());
            if (!($rlUsers->containsKey($uid)))
            {
                $msgArgs = new MessageArgs();
                $msgArgs->setActionUrl(EnvironmentManager::getLinkBuilder()
                        ->navigate("Administration","loggedUsers"));
                $msgArgs->setActionType("replace");
                $msgArgs->setActionTarget("loggedUsers");

                $msg = new Message;
                $msg->setSubject('UĹživatel je online');
                $msg->getArgs()->add($msgArgs);   
                $msg->setDate(new DateTime(date("Y-m-d")));
                $msg->setFromUsr($uid);

                $msgArgs->setMessage($msg);

                    //$msg->setText($uid);
                Application::notify($msg);
                
            }
        }
        $sessions = $users->get($uid);
        
        if (!($sessions->containsKey($sid)))
        {
            $sessions->set($sid,new Collection());
        }        
        $connections = $sessions->get($sid);
        
        if (!($connections->containsKey($cid)))
        {
            $connections->set($cid,$cid);
        }      
        
        self::setUsers($users);
        $lock->unlock();
        
    }

    public static function onConnectionClose($cid)
    {
        
        //$lb = EnvironmentManager::getLinkBuilder();
        //$lb->externalLink(array("controller"=>"dispatcher",""));    
        
        $uid = null;
        $sid = null;
        
        $lock = self::lockVarsWait();
        
        $users = self::getUsers();
        foreach ($users as $user => $sessions)
        {
            foreach ($sessions as $session => $connections)
            {
                foreach ($connections as $connection)
                {
                    if ($connection == $cid)
                    {
                        $uid = $user;
                        $sid = $session;
                        break 3;
                    }
                }
            }
        }
        $sessions = $users->get($uid);
        $connections = $sessions->get($sid);
        $connections->removeKey($cid);
        
        if ($connections->isEmpty())
        {
            $sessions->removeKey($sid);
        }
        
        if ($sessions->isEmpty())
        {
            $users->removeKey($uid);
            self::setUsers($users);
            
            $rlUsers = self::getRecentlyLoggedUsers();
            $rlUsers->put($uid,$uid);
            self::setRecentlyLoggedUsers($rlUsers);
            
            $lock->unlock();
            
            //exec("php ".getcwd().DS.FRAMEWORK_DIR.DS."Static".DS."userWentOffline.php ".$uid." &");
            //exec("php ".getcwd().DS.FRAMEWORK_DIR.DS."Static".DS."userWentOffline.sh ".$uid." &");            
            
            self::runParallelTask("Notification", "userWentOffline", array("uid"=>$uid));
            
            return;
        }
        
        self::setUsers($users);
        $lock->unlock();
    }

    public static function setVar($key, $value, $ttl = 0)
    {
        switch (strtolower(APPLICATION_SCOPE_STORAGE))
        {
            case 'xcache':
                if (!function_exists('xcache_set'))
                {
                    throw new ApplicationException('XCache support is missing');
                }
                xcache_set($key, serialize($value), $ttl);
                break;

            case 'apc':
                if (!function_exists('apc_store'))
                {
                    throw new ApplicationException('APC support is missing');
                }
                apc_store($key, $value, $ttl);
                break;

            default:
            case 'disabled':
                throw new ApplicationException(
                'Application scope backend not configured'
                );
        }
    }

    public static function getVar($key)
    {

        switch (strtolower(APPLICATION_SCOPE_STORAGE))
        {
            case 'xcache':
                if (!function_exists('xcache_set'))
                {
                    throw new ApplicationException('XCache support is missing');
                }
                return unserialize(\xcache_get($key));

            case 'apc':
                if (!function_exists('apc_store'))
                {
                    throw new ApplicationException('APC support is missing');
                }
                return apc_fetch($key);

            default:
            case 'disabled':
                throw new ApplicationException(
                'Application scope backend not configured'
                );
        }
    }

    public static function deleteVar($key)
    {
        switch (strtolower(APPLICATION_SCOPE_STORAGE))
        {
            case 'xcache':
                if (!function_exists('xcache_set'))
                {
                    throw new ApplicationException('XCache support is missing');
                }
                xcache_unset($key);
                break;

            case 'apc':
                if (!function_exists('apc_store'))
                {
                    throw new ApplicationException('APC support is missing');
                }
                apc_delete($key);
                break;

            default:
            case 'disabled':
                throw new ApplicationException(
                'Application scope backend not configured'
                );
        }
    }
    
    public static function lockVarsWait()
    {
        $lock = new \Phem\Core\ExclusiveLock("appscope");
        while(!$lock->lock())
        {
            sleep(0.01);
        }
        return $lock;
    }

    public static function getUserSessions($username)
    {
        $sessions = new Collection();

        //$iter = new \APCIterator('user', '/user.name/');
        $iter = \xcache_list(XC_TYPE_VAR, 0)['cache_list'];
        //print_r($iter);
        foreach ($iter as $item)
        {
            if (strpos($item["name"], 'user.name') === false)
                continue;
            if (self::getVar($item['name']) == $username)
            {
                $session = str_replace('user.name.', '', $item['name']);
                $sessions->add($session);
            }
        }

        return $sessions;
    }
    
    public static function getLoggedUsers()
    {  
        
        $em = EnvironmentManager::getEntityManager();
        
        $usernames = self::getUsers()->getKeys();
        $usersCollection = new Collection();
        
        if ((is_array($usernames))&&(count($usernames)>0))
        {
            $users = $em->getRepository('Phem\Libraries\Security\Model\MySQL\User')
                ->findBy(array('username' => self::getUsers()->getKeys()));
        }
        else
        {
            return $usersCollection;
        }
        

        foreach ($users as $user)
        {
            $usersCollection->put($user->getUsername(), $user);
        }
        
        return $usersCollection;
    }

 /*   public static function getLoggedUsers()
    {
        
        //  if (!class_exists("\APCIterator"))
        //  throw new ApplicationException('APC support is missing');
        
        $users = new Collection();

        //$iter = new \APCIterator('user', '/user.name/');
        $iter = \xcache_list(XC_TYPE_VAR, 0)["cache_list"];
        foreach ($iter as $item)
        {
            if (strpos($item["name"], 'user.name') === false)
                continue;
            //echo $item['key'].'<br/>';
            $usrOrig = EnvironmentManager::getUserManager()->getUser(self::getVar($item["name"]));
            if (!is_a($usrOrig, '\Phem\Libraries\Security\Model\Common\User'))
                continue;

            $usr = clone $usrOrig;
            $session = str_replace('user.name.', '', $item['name']);
            $usr->setSession($session);

            $users->set($usr->getUsername(), $usr);
        }
        return $users;
    }
    */
    
    public static function notifyRaw($data)
    {
        if ((MESSAGEBUS_MODE == "Both") || (MESSAGEBUS_MODE == "WebSocket"))
        {
            $pusherClassName = ROOT_NAMESPACE."\\Libraries\\MessageBus\\"
                    . WEBSOCKET_PULLER_MODE."Pusher";
            if (class_exists($pusherClassName))
            {
                $pusher = new $pusherClassName();
                $pusher->push($data);
            }
            else if (strtolower(WEBSOCKET_PULLER_MODE) != "disabled")
            {
                throw new Exception
                    ("Unsupported mode given by WEBSOCKET_PULLER_MODE");
            }        
        }
        
        if ((MESSAGEBUS_MODE == "Both") || (MESSAGEBUS_MODE == "Comet"))
        {
            $pusher = new CometPusher();
            $pusher->push($data);
        }            
    }

    public static function notify(Message $message)
    {
        $em = EnvironmentManager::getEntityManager();
        $em->persist($message);
        $em->flush();
        $em->clear();
        
    
        self::notifyRaw($message);
    }

    public static function heartbeat($username)
    {
        self::setVar('user.name.' . EnvironmentManager::getSession()->getId(), $username);
    }

    public static function waitForMessages($timeout)
    {
        self::setVar('user.pid.' . EnvironmentManager::getSession()->getId(), getmypid());

        if (!function_exists('pcntl_sigtimedwait'))
        {
            throw new ApplicationException('PCNTL support is missing (only available on POSIX platform)');
        }


        //suspend session and db connection before waiting for signal
        EnvironmentManager::getSession()->writeClose();

        $info = array();

       \pcntl_sigtimedwait(array(SIGUSR1), $info, $timeout);

        EnvironmentManager::getSession()->reopen();
        self::deleteVar('user.pid.' . EnvironmentManager::getSession()->getId());

        $messages = new Collection();
        //echo 'user.queue.' . EnvironmentManager::getSession()->getId();
        //$iter = new \APCIterator('user', '/user.queue.' . EnvironmentManager::getSession()->getId() . '/');

        $iter = \xcache_list(XC_TYPE_VAR, 0)['cache_list'];
        $varsToDelete = new Collection();
        foreach ($iter as $item)
        {
            if (strpos($item["name"], 'user.queue') === false)
                continue;
            //echo $item["name"]."<br />";
            $messages->add(self::getVar($item['name']));
            $varsToDelete->add($item['name']);
        }

        foreach ($varsToDelete as $var)
        {
            self::deleteVar($var);
        }

        return $messages;
    }

}

Powered by WebSVN 2.2.1