Dando continuidade ao primeiro artigo publicado sobre o tema este texto irá discutir sobre estas duas ferramentas incríveis e facilitadores presentes no pacote React PHP: stream e sockets.
Visão Geral
Como vimos anteriormente, o EventLoop de React PHP é capaz de organizar quais funções serão executadas no loop e também de trabalhar com streams.
Streams são um tipo especial de resource no php que abstraem operações com arquivos, compressão de dados, rede e afins. (Vide manual)
Os pontos principais a se saber sobre um stream são: (1) ele não depende da aplicação em que é chamado para funcionar e (2) são passíveis de leitura e/ou gravação.
Um bom exemplo de stream é o resource retornado pela função fopen().
Nota interessante
O pacote ReactStream, até o momento em que escrevo este texto, trabalha somente com resources do tipo stream. Isto pode ser conferido através da função get_resource_type().
A título de curiosidade, há uma tabela com todos os tipos de resource existentes em PHP.
<?php $handle = fopen('http://phpsp.org.br', 'r'); var_dump(get_resource_type($handle)); // string (6) "stream"
A que veio ReactStream
Em php, streams não são estruturas muito simples de trabalhar quando queremos tratar de paralelismo: um stream quando criado em php, por padrão, é bloqueante. Trabalhar com eles de outra maneira pode ficar um tanto confuso. Note o exemplo utilizando stream_select().
<?php $descriptors = [ 0 => ['pipe', 'r'], // STDIN do subprocesso 1 => ['pipe', 'w'], // STDOUT do subprocesso 2 => ['pipe', 'w'] // stream de erro do subprocesso ]; // Inicia um processo com o comando descrito. // Resources serão postos num array chamado $pipes $process = proc_open('sleep 2; echo "nawarian";', $descriptors, $pipes); stream_set_blocking($pipes[0], 0); // writeable não bloqueante stream_set_blocking($pipes[1], 0); // readable não bloqueante while (true) { // Loop infinito para ler quando pronto echo "."; $toRead = [$pipes[1]]; $toWrite = null; $toExcept = null; // Estes resources serão monitorados para saber sobre alterações if ($numberOfStreamsChanged = stream_select($toRead, $toWrite, $toExcept, 0)) { foreach ($toRead as $r) { var_dump(fread($r, 1024)); // string (9) "nawariann" break(2); // Quebra o foreach e while } } }
Neste exemplo temos $process como resultado de proc_open(), cuja principal funcionalidade é executar um subprocesso independente da aplicação PHP. Como comando definimos a string ‘sleep 2; echo “nawarian”;’, que se executada em linha de comando deverá imprimir na tela o texto “nawarian” após 2 segundos de seu início, porém durante a sua execução a linha de comando ficaria inutilizada.
Através da função stream_set_blocking() dizemos que tanto o STDIN quanto STDOUT deste processo não serão bloqueantes. A partir deste momento a aplicação PHP torna sua execução independente do andamento de $process.
Para mostrar como o script php está independente temos um while (true) que, a cada iteração, imprime um ponto (“.”) na tela. Neste laço também verificamos, utilizando stream_select() se a linha de comando que iniciamos gerou alguma saída e, quando gerar, obtemos seu conteúdo e quebra o laço infinito.
ReactStream
Felizmente o pacote ReactStream nos traz uma ótima abstração a ser utilizada em conjunto com o EventLoop.
Sua principal classe é ReactStreamStream, que recebe em seu construtor um resource do tipo stream e uma instância de ReactEventLoopLoopInterface.
O exemplo acima seria reescrito da seguinte maneira:
<?php include_once 'vendor/autoload.php'; $loop = ReactEventLoopFactory::create(); $descriptors = [ 0 => ['pipe', 'r'], // STDIN do subprocesso 1 => ['pipe', 'w'], // STDOUT do subprocesso 2 => ['pipe', 'w'] // stream de erro do subprocesso ]; // Inicia um processo com o comando descrito. Resources serão postos num array chamado $pipes $process = proc_open('sleep 2; echo "nawarian"', $descriptors, $pipes); $stdout = new ReactStreamStream($pipes[1], $loop); $stdout->on('data', function ($saida) { var_dump($saida); exit; }); $loop->addPeriodicTimer(0, function (){ echo "."; }); $loop->run();
Desta maneira não há motivo para se preocupar em definir os streams como não bloqueantes ou em como controlar a prontidão para leitura do STDOUT do subprocesso criado, ReactStream já lida com tudo para você.
De maneira semelhante conseguimos tratar de escrita em resources. Voltemos ao bom e velho fopen() para exemplificar:
$phpsp = fopen('http://phpsp.org.br', 'r'); $saida = STDOUT; $reader = new ReactStreamStream($phpsp, $loop); $writer = new ReactStreamStream($saida, $loop); // Quando $reader possuir algo pronto para leitura $callback = function ($data) use ($writer) { // Escreva o conteúdo lido em $writer $writer->write($data); }; $reader->on('data', $callback); $reader->on('end', function () use ($writer) { $writer->end(); });
Sendo $writer a saída padrão, todo conteúdo que recebermos de fopen() será enviado à saída padrão. É claro que existe uma forma muito mais simples para se fazer isto:
$phpsp = fopen('http://phpsp.org.br', 'r'); $saida = STDOUT; $reader = new ReactStreamStream($phpsp, $loop); $writer = new ReactStreamStream($saida, $loop); $reader->pipe($writer);
Onde ReactStreamStream::pipe() direciona todo conteúdo recebido da instância, escrevendo-o em seu stream de argumento. Tudo o que recebemos de $reader foi escrito em $writer. E, como ao final foi emitido um evento end no stream $reader, o mesmo foi devidamente propagado ao $writer.
ReactSocket
ReactSocket é uma saída para facilitar o que vimos neste exemplo do texto anterior. Este pacote se utiliza do pacote ReactStream para abstrair as leituras e escritas do socket.
O mesmo exemplo apresentado lá seria representado desta menira:
$server = new ReactSocketServer($loop); $clients = []; $server->on('connection', function ($client) use (&$clients) { $client->write('Diga-nos seu nome: '); $username = false; $client->on('data', function ($rcvdData) use (&$clients, $client, &$username) { if ($username && $text = trim($rcvdData)) { foreach ($clients as $c) { if ($c !== $client) { $c->write($text); $c->write(PHP_EOL); } } } if (!$username && $username = trim($rcvdData)) { $client->write("Bem-vindo, {$username}. Você está no chat maroto!nn"); $clients[] = $client; } }); }); $server->listen(7171);
Não há muito o que se explorar por aqui além da sua imaginação, visto que ReactSocketServer nada mais é que um ReactStreamStream especializado em trabalhar com resources criados a partir da função stream_socket_server(). Portanto é um pacote especializado em trabalhar como um servidor TCP/IP.
A partir dele foram criadas diversas outras ferramentas, como o ReactHttp (componente servidor que segue o protocolo HTTP, portanto um servidor HTTP) e a famosa biblioteca Ratchet.
Conclusão
ReactStream é a implementação mais fundamental – depois de ReactEventLoop – dos pacotes React. Atavés dela é possível lidar com resources de arquivos, subprocessos, rede e quaisquer outros que sejam do tipo stream. Isto possibilitou a criação de outros pacotes como ReactSocket e ReactChildProcess.
Através destas abstrações somos capazes de utilizar diversas ferramentas externas à nossa aplicação de forma facilitada e controlada, aproveitando tudo o que o padrão reactor pode nos oferecer.
Powered by WPeMatico