Streaming

Note

Experimental : This API is under development and subject to breaking changes in future releases.

Normally when a server receives or a client sends a message, the entire message must be in memory and ready to encode/decode all at once. This is not an issue for relatively small messages, but when messages start to get larger, anywhere from a few MB to multiple GB, the requirement for having the entire message available all at once has both memory and performance implications.

Colossus includes a powerful and efficient API for building streaming protocols where the messages being sent/received are streamed out/in in chunks.

The Steaming API

At the heart of streaming is a small set of composable types for building reactive streams of objects. The objects themselves can either be pieces of a larger object (such as chunks of an http message), or full messages themselves. You can even have streams of streams.

In fact, Colossus itself uses these components in its own infrastructure for buffering and routing messages for service connections. Thus it should be no surprise that they are highly optimized for speed and efficient batch-processing.

Pipes, Sources, and Sinks

A Pipe[I,O] is a one-way stream of messages. Input messages of type I are pushed into the pipe and output messages of type O are pulled from it. In the simplest cases, I and O will be the same type and the pipe will act as a buffer, but in many situations a pipe can be backed by a complex transformation workflow such that the input and output types differ.

// The primary implementation of a Pipe is BufferedPipe, backed by a fixed-length buffer
val pipe = new BufferedPipe[Int](5)

//pushing to a Pipe returns a PushResult indicating if the push succeeded
val pushResult: PushResult = pipe.push(2) //PushResult.Ok

//pulling from a Pipe returns a PullResult
val pullResult: PullResult[Int] = pipe.pull() //PullResult.Item(2)

Often we want to share a pipe between a producer and consumer, such that the producer can only push messages and the consumer only pull them. The Sink[I] and Source[O] traits are used for this purpose, as Pipe[I,O] implements them both.

Both pushing and pulling are non-blocking operations.

Warning

Pipes and other types in the streaming API are not thread-safe. They are designed to efficiently handle streaming network I/O (which is always single-threaded in Colossus) and are not intended to be used for general stream-processing purposes.

Pipe Transport States

A Pipe/Source/Sink can be in one of three transport states:

  • Open : Able to push/pull messages
  • Closed : The Pipe has been shutdown without error and no further messages can be pushed/pulled.
  • Terminated : The pipe has been shutdown due to an error.

The only possible state transitions are open to closed and open to terminated.

In general, the closed state is for pipes that represent a stream of a single object like a http message. Closing the pipe indicates the message has been fully sent/received. Terminating a pipe indicate an irrecoverable error has occurred, such as closing a connection mid-stream.

Back/Forward-Pressure

Aside from acting as buffers, the primary purpose of Pipes is to efficiently mediate the asynchronous interactions between producers and consumers of a stream. Pipes handle two situations: back-pressure is when consumers are not able to keep up and need to signal to producers to back-off, and forward-pressure is when consumers are waiting for work from a producer. Both situations require signaling mechanisms; producers need to know when consumers are ready to take on more work and consumers need to know when more work is available.

Pushing to Sinks

The primary method of Sink[I] is push(input: I): PushResult. The returned PushResult indicates if the message was successfully pushed into the pipe or what should be done if it was not pushed. When a Sink is full, it is currently unable to accept any more items and will return a PushResult.Full(signal). The contained Signal provides a way for the caller to be notified when the sink is able to accept items. The caller simply supplies the signal with a callback function via the notify method.

//create a pipe with a buffer size of 1
val pipe = new BufferedPipe[Int](1)

pipe.push(10) //PushResult.ok

//the pipe can only buffer one item, so the next push fails and returns a
//PushResult.Full
val fullResult = pipe.push(12).asInstanceOf[PushResult.Full]

//provide a callback function for the returned signal
fullResult.onReady.notify {
  println("ready to push")
}

//signal is triggered as soon as the item is pulled.  "ready to push" is
//printed before pull() returns
val item = pipe.pull()

See the docs for Sink for more information on how to push to sinks and also some built-in functions to work with them.

Pulling from Sources

Similar to sinks, calling pull() on a Source returns a PullResult[T]. This may be a PullResult.item(item) containing the item pulled, or it may be PullResult.Empty(signal). This signal can be given a callback which will get called when there is at least one item to pull from the Source.

val pipe = new BufferedPipe[Int](10)

//the pipe is empty so it returns a PullResult.Empty
val result = pipe.pull().asInstanceOf[PullResult.Empty]

//provide the returned signal with a callback function
result.whenReady.notify {
  println("items available to pull")
}

//the signal is triggered as soon as an item is pushed into the pipe
pipe.push(1)

Another highly efficient way to work with sources is through the pullWhile method which allows you to supply a function that is called whenever an item is ready to pull. It also gives you control over whether to continue using the callback function.

See the documentation for Source for more ways to handle pulling items.

Transforming and Composing Pipes

There are many methods available to build complex pipes by transforming and piecing together existing pipes.

val pipe1: Pipe[Int, Int]       = new BufferedPipe[Int](10)
val pipe2: Pipe[String, String] = new BufferedPipe[String](10)

val pipe3: Pipe[Int, String] = pipe1
  .map { i =>
    (i * 2).toString
  }
  .weld(pipe2)

pipe3.push(2)
pipe3.pull() // PullResult.Item("4")

Streaming HTTP

Currently the only streaming protocol built-into Colossus is Streaming HTTP. While it shares some types with the standard HTTP protocol, it is a separate protocol with its own message types.

There are actually two different streaming HTTP API’s: a low-level API that gives you direct control over streams of http messages and message chunks, and a high-level API that is more like a standard HTTP service.

Streaming Http Service

The high-level API is more-or-less a standard HTTP service, except that the body of the request/response is a stream of message chunks.


class MyRequestHandler(serverContext: ServerContext) extends GenRequestHandler[StreamingHttp](serverContext) { def handle = { case StreamingHttpRequest(head, source) if (head.url == "/chunked") => { source.collected.map { sourceBody => val responseBody = Source.fromIterator(List("this is ", "a chunked ", "response").toIterator.map { s => Data(DataBlock(s)) }) StreamingHttpResponse( HttpResponseHead(head.version, HttpCodes.OK, Some(TransferEncoding.Chunked), None, None, None, HttpHeaders.Empty), responseBody ) } } } def unhandledError = { case err => StreamingHttpResponse(HttpResponse.error(s"error: $err")) } } def start(port: Int)(implicit sys: IOSystem) = { StreamingHttpServer.basic("stream-service", port, serverContext => new MyRequestHandler(serverContext)) } implicit val actorSystem = ActorSystem("stream") start(9000)(IOSystem())

Compression

Colossus supports Gzip and Deflate compression out of the box using the request handler filter mechanism. To enable it just add HttpStreamCustomFilters.CompressionFilter to the request handler filters. @@snipStreamingHttpCompressionExample.scala {#compressed_streaming_http}

The source code for this page can be found here.