Sequence wrapper in stream in F #

I have a function that accepts a Stream. My data is on a large list of millions of items.

Is there an easy way to wrap a sequence in a stream by returning chunks of my sequence in a stream? One obvious approach is to implement a custom thread class that returns pieces of a sequence. Sort of:

type SeqStream(sequence:seq<'a>) = 
    inherit Stream()
    default x.Read(buf, offset, count) =
        // get next chunk
        // yield chunk

Is there an easier way to do this? I don't have the means to change the objective function that the stream accepts, though.

+3
source share
1 answer

, . , Stream , , , , - , , . , Read, :

type SeqStream<'a>(sequence:seq<'a>, formatter:'a -> byte[]) =
  inherit Stream()

  // Keeps bytes that were read previously, but were not used    
  let temp = ResizeArray<_>() 
  // Enumerator for reading data from the sequence
  let en = sequence.GetEnumerator()

  override x.Read(buffer, offset, size) = 
    // Read next element and add it to temp until we have enough
    // data or until we reach the end of the sequence
    while temp.Count < size && en.MoveNext() do
      temp.AddRange(formatter(en.Current))

    // Copy data to the output & return count (may be less then 
    // required (at the end of the sequence)
    let ret = min size temp.Count
    temp.CopyTo(0, buffer, offset, ret)
    temp.RemoveRange(0, ret)
    ret

  override x.Seek(offset, dir) = invalidOp "Seek"
  override x.Flush() = invalidOp "Flush"
  override x.SetLength(l) = invalidOp "SetLength"
  override x.Length = invalidOp "Length"
  override x.Position 
    with get() = invalidOp "Position"
    and set(p) = invalidOp "Position"
  override x.Write(buffer, offset, size) = invalidOp "Write"
  override x.CanWrite = false
  override x.CanSeek = false
  override x.CanRead = true

, - . , - ( ), , , . , :

let stream = new SeqStream<_>([ 1 .. 5 ], System.BitConverter.GetBytes)
+6

All Articles