Streams


Streams

A lightweight F#/C# library for efficient functional-style pipelines on streams of data.

Install via NuGet:
PM> Install-Package Streams
PM> Install-Package Streams.CSharp

The main design behind Streams is inspired by Java 8 Streams and is based on the observation that many functional pipelines follow the pattern:

1: 
source/generator |> lazy |> lazy |> lazy |> eager/reduce
  • Source/generator are functions that create Streams like Stream.ofArray/Stream.init.
  • Lazy functions take in streams and return streams like Stream.map/Stream.filter, these operations are fused together for efficient iteration.
  • Eager/reduce are functions like Stream.iter/Stream.sum that force the Stream to evaluate up to that point.

The main difference between LINQ/Seq and Streams is that LINQ is about composing external iterators (Enumerable/Enumerator) and Streams is based on the continuation-passing-style composition of internal iterators.

For simple pipelines we have observed performance improvements of a factor of four and for more complex pipelines the performance gains are even greater. Important performance tip: Make sure that FSI is running with 64-bit option set to true and fsproj option prefer 32-bit is unchecked.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
open Nessos.Streams

let data = [|1..10000000|] |> Array.map int64

// Sequential

// Real: 00:00:00.044, CPU: 00:00:00.046, GC gen0: 0, gen1: 0, gen2: 0
data
|> Stream.ofArray
|> Stream.filter (fun x -> x % 2L = 0L)
|> Stream.map (fun x -> x + 1L)
|> Stream.sum

// Real: 00:00:00.264, CPU: 00:00:00.265, GC gen0: 0, gen1: 0, gen2: 0
data
|> Seq.filter (fun x -> x % 2L = 0L)
|> Seq.map (fun x -> x + 1L)
|> Seq.sum

// Real: 00:00:00.217, CPU: 00:00:00.202, GC gen0: 0, gen1: 0, gen2: 0
data
|> Array.filter (fun x -> x % 2L = 0L)
|> Array.map (fun x -> x + 1L)
|> Array.sum

// Parallel
open FSharp.Collections.ParallelSeq

// Real: 00:00:00.017, CPU: 00:00:00.078, GC gen0: 0, gen1: 0, gen2: 0
data
|> ParStream.ofArray
|> ParStream.filter (fun x -> x % 2L = 0L)
|> ParStream.map (fun x -> x + 1L)
|> ParStream.sum

// Real: 00:00:00.045, CPU: 00:00:00.187, GC gen0: 0, gen1: 0, gen2: 0
data
|> PSeq.filter (fun x -> x % 2L = 0L)
|> PSeq.map (fun x -> x + 1L)
|> PSeq.sum

References

Contributing and copyright

The project is hosted on GitHub where you can report issues, fork the project and submit pull requests.

The library is available under the Apache License. For more information see the License file in the GitHub repository.

namespace Nessos
namespace Nessos.Streams
val data : int64 []

Full name: Index.data
module Array

from Microsoft.FSharp.Collections
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = System.Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
Multiple items
module Stream

from Nessos.Streams

--------------------
type Stream<'T> =
  private {Run: Context<'T> -> Iterable;}
  member private RunBulk : ctxt:Context<'T> -> unit
  override ToString : unit -> string

Full name: Nessos.Streams.Stream<_>
val ofArray : source:'T [] -> Stream<'T>

Full name: Nessos.Streams.Stream.ofArray
val filter : predicate:('T -> bool) -> stream:Stream<'T> -> Stream<'T>

Full name: Nessos.Streams.Stream.filter
val x : int64
val map : f:('T -> 'R) -> stream:Stream<'T> -> Stream<'R>

Full name: Nessos.Streams.Stream.map
val sum : stream:Stream<'T> -> 'T (requires member ( + ) and member get_Zero)

Full name: Nessos.Streams.Stream.sum
module Seq

from Microsoft.FSharp.Collections
val filter : predicate:('T -> bool) -> source:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.filter
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val sum : source:seq<'T> -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Seq.sum
val filter : predicate:('T -> bool) -> array:'T [] -> 'T []

Full name: Microsoft.FSharp.Collections.Array.filter
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Array.sum
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Collections

--------------------
namespace Microsoft.FSharp.Collections
namespace FSharp.Collections.ParallelSeq
Multiple items
module ParStream

from Nessos.Streams

--------------------
type ParStream<'T> =
  private {Impl: ParStreamImpl<'T>;}
  member Apply : collector:ParCollector<'T,'R> -> 'R
  member private Stream : unit -> Stream<'T>
  member DegreeOfParallelism : int
  member private PreserveOrdering : bool
  member private SourceType : SourceType

Full name: Nessos.Streams.ParStream<_>
val ofArray : source:'T [] -> ParStream<'T>

Full name: Nessos.Streams.ParStream.ofArray
val filter : predicate:('T -> bool) -> stream:ParStream<'T> -> ParStream<'T>

Full name: Nessos.Streams.ParStream.filter
val map : f:('T -> 'R) -> stream:ParStream<'T> -> ParStream<'R>

Full name: Nessos.Streams.ParStream.map
val sum : stream:ParStream<'T> -> 'T (requires member ( + ) and member get_Zero)

Full name: Nessos.Streams.ParStream.sum
module PSeq

from FSharp.Collections.ParallelSeq
val filter : predicate:('T -> bool) -> source:seq<'T> -> pseq<'T>

Full name: FSharp.Collections.ParallelSeq.PSeq.filter
val map : mapping:('T -> 'U) -> source:seq<'T> -> pseq<'U>

Full name: FSharp.Collections.ParallelSeq.PSeq.map
val sum : source:seq<'T> -> 'T (requires member ( + ) and member get_Zero)

Full name: FSharp.Collections.ParallelSeq.PSeq.sum
Fork me on GitHub