PHP SP

[React PHP] Streams e Sockets

Dando continuidade ao primeiro artigo publicado sobre o tema este texto irá discutir sobre estas duas ferramentas incríveis e facilitadores presentes no pacote React PHP: streamsockets.

 

Visão Geral

Como vimos anteriormente, o EventLoop de React PHP é capaz de organizar quais funções serão executadas no loop e também de trabalhar com streams.

Streams são um tipo especial de resource no php que abstraem operações com arquivos, compressão de dados, rede e afins. (Vide manual)

Os pontos principais a se saber sobre um stream são: (1) ele não depende da aplicação em que é chamado para funcionar e (2) são passíveis de leitura e/ou gravação.

Um bom exemplo de stream é o resource retornado pela função fopen().

 

Nota interessante

O pacote ReactStream, até o momento em que escrevo este texto, trabalha somente com resources do tipo stream. Isto pode ser conferido através da função get_resource_type().

A título de curiosidade, há uma tabela com todos os tipos de resource existentes em PHP.

<?php

$handle = fopen('http://phpsp.org.br', 'r');

var_dump(get_resource_type($handle)); // string (6) "stream"

 

A que veio ReactStream

Em php, streams não são estruturas muito simples de trabalhar quando queremos tratar de paralelismo: um stream quando criado em php, por padrão, é bloqueante. Trabalhar com eles de outra maneira pode ficar um tanto confuso. Note o exemplo utilizando stream_select().

<?php

$descriptors = [
    0 => ['pipe', 'r'], // STDIN do subprocesso
    1 => ['pipe', 'w'], // STDOUT do subprocesso
    2 => ['pipe', 'w'] // stream de erro do subprocesso
];

// Inicia um processo com o comando descrito.
// Resources serão postos num array chamado $pipes
$process = proc_open('sleep 2; echo "nawarian";', $descriptors, $pipes);

stream_set_blocking($pipes[0], 0); // writeable não bloqueante
stream_set_blocking($pipes[1], 0); // readable não bloqueante

while (true) { // Loop infinito para ler quando pronto
    echo ".";
    $toRead = [$pipes[1]];
    $toWrite = null;
    $toExcept = null;

    // Estes resources serão monitorados para saber sobre alterações
    if ($numberOfStreamsChanged = stream_select($toRead, $toWrite, $toExcept, 0)) {
        foreach ($toRead as $r) {
            var_dump(fread($r, 1024)); // string (9) "nawariann"
            break(2); // Quebra o foreach e while
        }
    }
}

Neste exemplo temos $process como resultado de proc_open(), cuja principal funcionalidade é executar um subprocesso independente da aplicação PHP. Como comando definimos a string ‘sleep 2; echo “nawarian”;’, que se executada em linha de comando deverá imprimir na tela o texto “nawarian” após 2 segundos de seu início, porém durante a sua execução a linha de comando ficaria inutilizada.

Através da função stream_set_blocking() dizemos que tanto o STDIN quanto STDOUT deste processo não serão bloqueantes. A partir deste momento a aplicação PHP torna sua execução independente do andamento de $process.

Para mostrar como o script php está independente temos um while (true) que, a cada iteração, imprime um ponto (“.”) na tela. Neste laço também verificamos, utilizando stream_select() se a linha de comando que iniciamos gerou alguma saída e, quando gerar, obtemos seu conteúdo e quebra o laço infinito.

centenas de pontos e um string(9) "nawariann" ao fim.

Resultado esperado do código acima

 

ReactStream

Felizmente o pacote ReactStream nos traz uma ótima abstração a ser utilizada em conjunto com o EventLoop.

Sua principal classe é ReactStreamStream, que recebe em seu construtor um resource do tipo stream e uma instância de ReactEventLoopLoopInterface.

O exemplo acima seria reescrito da seguinte maneira:

<?php

include_once 'vendor/autoload.php';

$loop = ReactEventLoopFactory::create();

$descriptors = [
    0 => ['pipe', 'r'], // STDIN do subprocesso
    1 => ['pipe', 'w'], // STDOUT do subprocesso
    2 => ['pipe', 'w'] // stream de erro do subprocesso
];

// Inicia um processo com o comando descrito. Resources serão postos num array chamado $pipes
$process = proc_open('sleep 2; echo "nawarian"', $descriptors, $pipes);

$stdout = new ReactStreamStream($pipes[1], $loop);
$stdout->on('data', function ($saida) {
  var_dump($saida);
  exit;
});

$loop->addPeriodicTimer(0, function (){
  echo ".";
});

$loop->run();

Desta maneira não há motivo para se preocupar em definir os streams como não bloqueantes ou em como controlar a prontidão para leitura do STDOUT do subprocesso criado, ReactStream já lida com tudo para você.

De maneira semelhante conseguimos tratar de escrita em resources. Voltemos ao bom e velho fopen() para exemplificar:

$phpsp = fopen('http://phpsp.org.br', 'r');
$saida = STDOUT;

$reader = new ReactStreamStream($phpsp, $loop);
$writer = new ReactStreamStream($saida, $loop);

// Quando $reader possuir algo pronto para leitura
$callback = function ($data) use ($writer) {
   // Escreva o conteúdo lido em $writer
   $writer->write($data);
};
$reader->on('data', $callback);
$reader->on('end', function () use ($writer) {
   $writer->end();
});

Sendo $writer a saída padrão, todo conteúdo que recebermos de fopen() será enviado à saída padrão. É claro que existe uma forma muito mais simples para se fazer isto:

$phpsp = fopen('http://phpsp.org.br', 'r');
$saida = STDOUT;

$reader = new ReactStreamStream($phpsp, $loop);
$writer = new ReactStreamStream($saida, $loop);

$reader->pipe($writer);

Onde ReactStreamStream::pipe() direciona todo conteúdo recebido da instância, escrevendo-o em seu stream de argumento. Tudo o que recebemos de $reader foi escrito em $writer. E, como ao final foi emitido um evento end no stream $reader, o mesmo foi devidamente propagado ao $writer.

 

ReactSocket

ReactSocket é uma saída para facilitar o que vimos neste exemplo do texto anterior. Este pacote se utiliza do pacote ReactStream para abstrair as leituras e escritas do socket.

O mesmo exemplo apresentado lá seria representado desta menira:

$server = new ReactSocketServer($loop);

$clients = [];
$server->on('connection', function ($client) use (&$clients) {
    $client->write('Diga-nos seu nome: ');
    $username = false;

    $client->on('data', function ($rcvdData) use (&$clients, $client, &$username) {
        if ($username && $text = trim($rcvdData)) {
            foreach ($clients as $c) {
                if ($c !== $client) {
                    $c->write($text);
                    $c->write(PHP_EOL);
                }
            }
        }

        if (!$username && $username = trim($rcvdData)) {
            $client->write("Bem-vindo, {$username}. Você está no chat maroto!nn");
            $clients[] = $client;
        }
    });
});

$server->listen(7171);

 

Não há muito o que se explorar por aqui além da sua imaginação, visto que ReactSocketServer nada mais é que um ReactStreamStream especializado em trabalhar com resources criados a partir da função stream_socket_server(). Portanto é um pacote especializado em trabalhar como um servidor TCP/IP.

A partir dele foram criadas diversas outras ferramentas, como o ReactHttp (componente servidor que segue o protocolo HTTP, portanto um servidor HTTP) e a famosa biblioteca Ratchet.

 

Conclusão

ReactStream é a implementação mais fundamental – depois de ReactEventLoop – dos pacotes React. Atavés dela é possível lidar com resources de arquivos, subprocessos, rede e quaisquer outros que sejam do tipo stream. Isto possibilitou a criação de outros pacotes como ReactSocket e ReactChildProcess.

Através destas abstrações somos capazes de utilizar diversas ferramentas externas à nossa aplicação de forma facilitada e controlada, aproveitando tudo o que o padrão reactor pode nos oferecer.

Powered by WPeMatico