rx/stream

Async Stream for RxPHP

3.0.2 2020-05-05 14:08 UTC

This package is auto-updated.

Last update: 2025-01-06 00:41:14 UTC


README

Provides RxPHP Observables for PHP streams

This library is a wrapper around the ReactPHP stream library. It uses the Voryx event-loop which behaves like the Javascript event-loop. ie. You don't need to start it.

Usage

From File

    
    $source = new \Rx\React\FromFileObservable("example.csv");
    
    $source
        ->cut() //Cut the stream by PHP_EOL
        ->map('str_getcsv') //Convert csv row to an array
        ->map(function (array $row) {
            //Strip numbers from the first field
            $row[0] = preg_replace('/\d+/u', '', $row[0]);
            return $row;
        })
        ->subscribe(
            function ($data) {
                echo $data[0] . "\n";
            },
            function ($e) {
                echo "error\n";
            },
            function () {
                echo "done\n";
            }
        );
    

Read and Write to File

$source = new \Rx\React\FromFileObservable("source.txt");
$dest   = new \Rx\React\ToFileObserver("dest.txt");

$source
    ->cut()
    ->filter(function ($row) {
        return strpos($row, 'foo');
    })
    ->map(function ($row) {
        return $row . 'bar';
    })
    ->subscribe($dest);

Stream - echo example

$read  = new \Rx\React\StreamSubject(STDIN);

$read
    ->takeWhile(function ($x) {
        return trim($x) != 15;
    })
    ->subscribe(new \Rx\React\StreamSubject(STDOUT));