You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
488 lines
10 KiB
488 lines
10 KiB
4 years ago
|
<?php
|
||
|
|
||
|
/*
|
||
|
|
||
|
Copyright (c) 2009-2019 F3::Factory/Bong Cosca, All rights reserved.
|
||
|
|
||
|
This file is part of the Fat-Free Framework (http://fatfreeframework.com).
|
||
|
|
||
|
This is free software: you can redistribute it and/or modify it under the
|
||
|
terms of the GNU General Public License as published by the Free Software
|
||
|
Foundation, either version 3 of the License, or later.
|
||
|
|
||
|
Fat-Free Framework is distributed in the hope that it will be useful,
|
||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||
|
General Public License for more details.
|
||
|
|
||
|
You should have received a copy of the GNU General Public License along
|
||
|
with Fat-Free Framework. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
*/
|
||
|
|
||
|
namespace CLI;
|
||
|
|
||
|
//! RFC6455 WebSocket server
|
||
|
class WS {
|
||
|
|
||
|
const
|
||
|
//! UUID magic string
|
||
|
Magic='258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
|
||
|
//! Max packet size
|
||
|
Packet=65536;
|
||
|
|
||
|
//@{ Mask bits for first byte of header
|
||
|
const
|
||
|
Text=0x01,
|
||
|
Binary=0x02,
|
||
|
Close=0x08,
|
||
|
Ping=0x09,
|
||
|
Pong=0x0a,
|
||
|
OpCode=0x0f,
|
||
|
Finale=0x80;
|
||
|
//@}
|
||
|
|
||
|
//@{ Mask bits for second byte of header
|
||
|
const
|
||
|
Length=0x7f;
|
||
|
//@}
|
||
|
|
||
|
protected
|
||
|
$addr,
|
||
|
$ctx,
|
||
|
$wait,
|
||
|
$sockets,
|
||
|
$protocol,
|
||
|
$agents=[],
|
||
|
$events=[];
|
||
|
|
||
|
/**
|
||
|
* Allocate stream socket
|
||
|
* @return NULL
|
||
|
* @param $socket resource
|
||
|
**/
|
||
|
function alloc($socket) {
|
||
|
if (is_bool($buf=$this->read($socket)))
|
||
|
return;
|
||
|
// Get WebSocket headers
|
||
|
$hdrs=[];
|
||
|
$EOL="\r\n";
|
||
|
$verb=NULL;
|
||
|
$uri=NULL;
|
||
|
foreach (explode($EOL,trim($buf)) as $line)
|
||
|
if (preg_match('/^(\w+)\s(.+)\sHTTP\/[\d.]{1,3}$/',
|
||
|
trim($line),$match)) {
|
||
|
$verb=$match[1];
|
||
|
$uri=$match[2];
|
||
|
}
|
||
|
else
|
||
|
if (preg_match('/^(.+): (.+)/',trim($line),$match))
|
||
|
// Standardize header
|
||
|
$hdrs[
|
||
|
strtr(
|
||
|
ucwords(
|
||
|
strtolower(
|
||
|
strtr($match[1],'-',' ')
|
||
|
)
|
||
|
),' ','-'
|
||
|
)
|
||
|
]=$match[2];
|
||
|
else {
|
||
|
$this->close($socket);
|
||
|
return;
|
||
|
}
|
||
|
if (empty($hdrs['Upgrade']) &&
|
||
|
empty($hdrs['Sec-Websocket-Key'])) {
|
||
|
// Not a WebSocket request
|
||
|
if ($verb && $uri)
|
||
|
$this->write(
|
||
|
$socket,
|
||
|
'HTTP/1.1 400 Bad Request'.$EOL.
|
||
|
'Connection: close'.$EOL.$EOL
|
||
|
);
|
||
|
$this->close($socket);
|
||
|
return;
|
||
|
}
|
||
|
// Handshake
|
||
|
$buf='HTTP/1.1 101 Switching Protocols'.$EOL.
|
||
|
'Upgrade: websocket'.$EOL.
|
||
|
'Connection: Upgrade'.$EOL;
|
||
|
if (isset($hdrs['Sec-Websocket-Protocol']))
|
||
|
$buf.='Sec-WebSocket-Protocol: '.
|
||
|
$hdrs['Sec-Websocket-Protocol'].$EOL;
|
||
|
$buf.='Sec-WebSocket-Accept: '.
|
||
|
base64_encode(
|
||
|
sha1($hdrs['Sec-Websocket-Key'].WS::Magic,TRUE)
|
||
|
).$EOL.$EOL;
|
||
|
if ($this->write($socket,$buf)) {
|
||
|
// Connect agent to server
|
||
|
$this->sockets[(int)$socket]=$socket;
|
||
|
$this->agents[(int)$socket]=
|
||
|
new Agent($this,$socket,$verb,$uri,$hdrs);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Close stream socket
|
||
|
* @return NULL
|
||
|
* @param $socket resource
|
||
|
**/
|
||
|
function close($socket) {
|
||
|
if (isset($this->agents[(int)$socket]))
|
||
|
unset($this->sockets[(int)$socket],$this->agents[(int)$socket]);
|
||
|
stream_socket_shutdown($socket,STREAM_SHUT_WR);
|
||
|
@fclose($socket);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Read from stream socket
|
||
|
* @return string|FALSE
|
||
|
* @param $socket resource
|
||
|
* @param $len int
|
||
|
**/
|
||
|
function read($socket,$len=0) {
|
||
|
if (!$len)
|
||
|
$len=WS::Packet;
|
||
|
if (is_string($buf=@fread($socket,$len)) &&
|
||
|
strlen($buf) && strlen($buf)<$len)
|
||
|
return $buf;
|
||
|
if (isset($this->events['error']) &&
|
||
|
is_callable($func=$this->events['error']))
|
||
|
$func($this);
|
||
|
$this->close($socket);
|
||
|
return FALSE;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Write to stream socket
|
||
|
* @return int|FALSE
|
||
|
* @param $socket resource
|
||
|
* @param $buf string
|
||
|
**/
|
||
|
function write($socket,$buf) {
|
||
|
for ($i=0,$bytes=0;$i<strlen($buf);$i+=$bytes) {
|
||
|
if (($bytes=@fwrite($socket,substr($buf,$i))) &&
|
||
|
@fflush($socket))
|
||
|
continue;
|
||
|
if (isset($this->events['error']) &&
|
||
|
is_callable($func=$this->events['error']))
|
||
|
$func($this);
|
||
|
$this->close($socket);
|
||
|
return FALSE;
|
||
|
}
|
||
|
return $bytes;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Return socket agents
|
||
|
* @return array
|
||
|
* @param $uri string
|
||
|
***/
|
||
|
function agents($uri=NULL) {
|
||
|
return array_filter(
|
||
|
$this->agents,
|
||
|
/**
|
||
|
* @var $val Agent
|
||
|
* @return bool
|
||
|
*/
|
||
|
function($val) use($uri) {
|
||
|
return $uri?($val->uri()==$uri):TRUE;
|
||
|
}
|
||
|
);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Return event handlers
|
||
|
* @return array
|
||
|
**/
|
||
|
function events() {
|
||
|
return $this->events;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Bind function to event handler
|
||
|
* @return object
|
||
|
* @param $event string
|
||
|
* @param $func callable
|
||
|
**/
|
||
|
function on($event,$func) {
|
||
|
$this->events[$event]=$func;
|
||
|
return $this;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Terminate server
|
||
|
**/
|
||
|
function kill() {
|
||
|
die;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Execute the server process
|
||
|
**/
|
||
|
function run() {
|
||
|
// Assign signal handlers
|
||
|
declare(ticks=1);
|
||
|
pcntl_signal(SIGINT,[$this,'kill']);
|
||
|
pcntl_signal(SIGTERM,[$this,'kill']);
|
||
|
gc_enable();
|
||
|
// Activate WebSocket listener
|
||
|
$listen=stream_socket_server(
|
||
|
$this->addr,$errno,$errstr,
|
||
|
STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,
|
||
|
$this->ctx
|
||
|
);
|
||
|
$socket=socket_import_stream($listen);
|
||
|
register_shutdown_function(function() use($listen) {
|
||
|
foreach ($this->sockets as $socket)
|
||
|
if ($socket!=$listen)
|
||
|
$this->close($socket);
|
||
|
$this->close($listen);
|
||
|
if (isset($this->events['stop']) &&
|
||
|
is_callable($func=$this->events['stop']))
|
||
|
$func($this);
|
||
|
});
|
||
|
if ($errstr)
|
||
|
user_error($errstr,E_USER_ERROR);
|
||
|
if (isset($this->events['start']) &&
|
||
|
is_callable($func=$this->events['start']))
|
||
|
$func($this);
|
||
|
$this->sockets=[(int)$listen=>$listen];
|
||
|
$empty=[];
|
||
|
$wait=$this->wait;
|
||
|
while (TRUE) {
|
||
|
$active=$this->sockets;
|
||
|
$mark=microtime(TRUE);
|
||
|
$count=@stream_select(
|
||
|
$active,$empty,$empty,(int)$wait,round(1e6*($wait-(int)$wait))
|
||
|
);
|
||
|
if (is_bool($count) && $wait) {
|
||
|
if (isset($this->events['error']) &&
|
||
|
is_callable($func=$this->events['error']))
|
||
|
$func($this);
|
||
|
die;
|
||
|
}
|
||
|
if ($count) {
|
||
|
// Process active connections
|
||
|
foreach ($active as $socket) {
|
||
|
if (!is_resource($socket))
|
||
|
continue;
|
||
|
if ($socket==$listen) {
|
||
|
if ($socket=@stream_socket_accept($listen,0))
|
||
|
$this->alloc($socket);
|
||
|
else
|
||
|
if (isset($this->events['error']) &&
|
||
|
is_callable($func=$this->events['error']))
|
||
|
$func($this);
|
||
|
}
|
||
|
else {
|
||
|
$id=(int)$socket;
|
||
|
if (isset($this->agents[$id]))
|
||
|
$this->agents[$id]->fetch();
|
||
|
}
|
||
|
}
|
||
|
$wait-=microtime(TRUE)-$mark;
|
||
|
while ($wait<1e-6) {
|
||
|
$wait+=$this->wait;
|
||
|
$count=0;
|
||
|
}
|
||
|
}
|
||
|
if (!$count) {
|
||
|
$mark=microtime(TRUE);
|
||
|
foreach ($this->sockets as $id=>$socket) {
|
||
|
if (!is_resource($socket))
|
||
|
continue;
|
||
|
if ($socket!=$listen &&
|
||
|
isset($this->agents[$id]) &&
|
||
|
isset($this->events['idle']) &&
|
||
|
is_callable($func=$this->events['idle']))
|
||
|
$func($this->agents[$id]);
|
||
|
}
|
||
|
$wait=$this->wait-microtime(TRUE)+$mark;
|
||
|
}
|
||
|
gc_collect_cycles();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @param $addr string
|
||
|
* @param $ctx resource
|
||
|
* @param $wait int
|
||
|
**/
|
||
|
function __construct($addr,$ctx=NULL,$wait=60) {
|
||
|
$this->addr=$addr;
|
||
|
$this->ctx=$ctx?:stream_context_create();
|
||
|
$this->wait=$wait;
|
||
|
$this->events=[];
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
//! RFC6455 remote socket
|
||
|
class Agent {
|
||
|
|
||
|
protected
|
||
|
$server,
|
||
|
$id,
|
||
|
$socket,
|
||
|
$flag,
|
||
|
$verb,
|
||
|
$uri,
|
||
|
$headers;
|
||
|
|
||
|
/**
|
||
|
* Return server instance
|
||
|
* @return WS
|
||
|
**/
|
||
|
function server() {
|
||
|
return $this->server;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Return socket ID
|
||
|
* @return string
|
||
|
**/
|
||
|
function id() {
|
||
|
return $this->id;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Return socket
|
||
|
* @return resource
|
||
|
**/
|
||
|
function socket() {
|
||
|
return $this->socket;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Return request method
|
||
|
* @return string
|
||
|
**/
|
||
|
function verb() {
|
||
|
return $this->verb;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Return request URI
|
||
|
* @return string
|
||
|
**/
|
||
|
function uri() {
|
||
|
return $this->uri;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Return socket headers
|
||
|
* @return array
|
||
|
**/
|
||
|
function headers() {
|
||
|
return $this->headers;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Frame and transmit payload
|
||
|
* @return string|FALSE
|
||
|
* @param $op int
|
||
|
* @param $data string
|
||
|
**/
|
||
|
function send($op,$data='') {
|
||
|
$server=$this->server;
|
||
|
$mask=WS::Finale | $op & WS::OpCode;
|
||
|
$len=strlen($data);
|
||
|
$buf='';
|
||
|
if ($len>0xffff)
|
||
|
$buf=pack('CCNN',$mask,0x7f,$len);
|
||
|
elseif ($len>0x7d)
|
||
|
$buf=pack('CCn',$mask,0x7e,$len);
|
||
|
else
|
||
|
$buf=pack('CC',$mask,$len);
|
||
|
$buf.=$data;
|
||
|
if (is_bool($server->write($this->socket,$buf)))
|
||
|
return FALSE;
|
||
|
if (!in_array($op,[WS::Pong,WS::Close]) &&
|
||
|
isset($this->server->events['send']) &&
|
||
|
is_callable($func=$this->server->events['send']))
|
||
|
$func($this,$op,$data);
|
||
|
return $data;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Retrieve and unmask payload
|
||
|
* @return bool|NULL
|
||
|
**/
|
||
|
function fetch() {
|
||
|
// Unmask payload
|
||
|
$server=$this->server;
|
||
|
if (is_bool($buf=$server->read($this->socket)))
|
||
|
return FALSE;
|
||
|
while($buf) {
|
||
|
$op=ord($buf[0]) & WS::OpCode;
|
||
|
$len=ord($buf[1]) & WS::Length;
|
||
|
$pos=2;
|
||
|
if ($len==0x7e) {
|
||
|
$len=ord($buf[2])*256+ord($buf[3]);
|
||
|
$pos+=2;
|
||
|
}
|
||
|
else
|
||
|
if ($len==0x7f) {
|
||
|
for ($i=0,$len=0;$i<8;$i++)
|
||
|
$len=$len*256+ord($buf[$i+2]);
|
||
|
$pos+=8;
|
||
|
}
|
||
|
for ($i=0,$mask=[];$i<4;$i++)
|
||
|
$mask[$i]=ord($buf[$pos+$i]);
|
||
|
$pos+=4;
|
||
|
if (strlen($buf)<$len+$pos)
|
||
|
return FALSE;
|
||
|
for ($i=0,$data='';$i<$len;$i++)
|
||
|
$data.=chr(ord($buf[$pos+$i])^$mask[$i%4]);
|
||
|
// Dispatch
|
||
|
switch ($op & WS::OpCode) {
|
||
|
case WS::Ping:
|
||
|
$this->send(WS::Pong);
|
||
|
break;
|
||
|
case WS::Close:
|
||
|
$server->close($this->socket);
|
||
|
break;
|
||
|
case WS::Text:
|
||
|
$data=trim($data);
|
||
|
case WS::Binary:
|
||
|
if (isset($this->server->events['receive']) &&
|
||
|
is_callable($func=$this->server->events['receive']))
|
||
|
$func($this,$op,$data);
|
||
|
break;
|
||
|
}
|
||
|
$buf = substr($buf, $len+$pos);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Destroy object
|
||
|
**/
|
||
|
function __destruct() {
|
||
|
if (isset($this->server->events['disconnect']) &&
|
||
|
is_callable($func=$this->server->events['disconnect']))
|
||
|
$func($this);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @param $server WS
|
||
|
* @param $socket resource
|
||
|
* @param $verb string
|
||
|
* @param $uri string
|
||
|
* @param $hdrs array
|
||
|
**/
|
||
|
function __construct($server,$socket,$verb,$uri,array $hdrs) {
|
||
|
$this->server=$server;
|
||
|
$this->id=stream_socket_get_name($socket,TRUE);
|
||
|
$this->socket=$socket;
|
||
|
$this->verb=$verb;
|
||
|
$this->uri=$uri;
|
||
|
$this->headers=$hdrs;
|
||
|
|
||
|
if (isset($server->events['connect']) &&
|
||
|
is_callable($func=$server->events['connect']))
|
||
|
$func($this);
|
||
|
}
|
||
|
|
||
|
}
|