Services

A service receives requests and returns responses. Colossus has built in support for http, memcache, redis and telnet.

Http

A http service will take the following form:

import akka.actor.ActorSystem
import colossus.core.IOSystem
import colossus.protocols.http.Http
import colossus.protocols.http.HttpMethod._
import colossus.protocols.http.UrlParsing._
import colossus.protocols.http.{HttpServer, Initializer, RequestHandler}
import colossus.service.Callback
import colossus.service.GenRequestHandler.PartialHandler

object HttpServiceExample extends App {

  implicit val actorSystem = ActorSystem()
  implicit val ioSystem    = IOSystem()

  HttpServer.start("example-server", 9000) { initContext =>
    new Initializer(initContext) {
      override def onConnect: RequestHandlerFactory =
        serverContext =>
          new RequestHandler(serverContext) {
            override def handle: PartialHandler[Http] = {
              case request @ Get on Root / "hello" =>
                Callback.successful(request.ok("Hello world!"))
            }
        }
    }
  }
}

ok is a helper method on HttpRequest that returns a HttpResponse. The implicit colossus.service.Callback.Implicits.objectToSuccessfulCallback then turns the response into a Callback[HttpResponse].

The following helper method are available, which will default the content type to text and return the corresponding http code.

  • ok
  • notFound
  • error
  • badRequest
  • unauthorized
  • forbidden

There are several different ways to set headers on the response.

request.ok("hello").withHeader("header-name", "header-value")
request.ok("hello", HttpHeaders(HttpHeader("header-name", "header-value")))
request.ok("hello", HttpHeaders(HttpHeader(HttpHeaders.CookieHeader, "header-value")))

The content type header is a little special since it is usually set automatically but can also be set manually. It can be set just like any other header.

request.ok("hello", HttpHeaders(HttpHeader("Content-Type", "header-value")))

You can also use .withContent() to set the content type header. For a different HTTP status code, the helper methods don’t exist; instead just use the respond method.

request
  .respond(
    HttpCodes.CONFLICT,
    """{"name":"value"}"""
  )
  .withContentType(ContentType.ApplicationJson)

On the incoming request, body and content type are on the HttpBody and headers and parameters are on the HttpHead.

val body: String                = request.body.bytes.utf8String
val contentType: Option[String] = request.head.headers.contentType
val headers: HttpHeaders        = request.head.headers
val parameter: Option[String]   = request.head.parameters.getFirst("key")

There is no built in middleware, but the same effect can be achieved by wrapping routes in functions. For example, if you wanted easy access to the request body you might write:

def withBody(req: HttpRequest)(f: String => Callback[HttpResponse]): Callback[HttpResponse] = {
  val bytes = req.body.bytes
  if (bytes.isEmpty) {
    Callback.successful(req.badRequest("Missing body"))
  } else {
    f(bytes.utf8String)
  }
}

And then use it like so:

case request @ Post on Root / "shout" =>
  withBody(request) { body =>
    Callback.successful(request.ok(body.toUpperCase))
  }

You can add filters to modify and/or shortcut requests and responses.

Defining Filters:

/**
  * Primitive allowed hosts filter
  */
class AllowedHostsFilter extends Filter[Http] {
  override def apply(next: PartialHandler[Http]): PartialHandler[Http] = {
    case request =>
      request.head.headers.firstValue("Host") match {
        case Some("localhost:9011") => next(request)
        case Some(host)             => request.error(s"host $host not allowed")
        case None                   => request.error("no host header")
      }
  }
}

/**
  * filter to reverse response
  */
class ReverseResponseFilter extends Filter[Http] {
  override def apply(next: PartialHandler[Http]): PartialHandler[Http] = {
    case request =>
      next(request).flatMap { response =>
        request.ok(response.body.toString.reverse, response.head.headers)
      }
  }
}

To use them override filters function in your service.

override def filters: Seq[Filter[Http]] = Seq(
  new AllowedHostsFilter(),
  new ReverseResponseFilter(),
  new HttpCustomFilters.CompressionFilter()
)

To enable Gzip and Deflate compression just add HttpCustomFilters.CompressionFilter to the request handler filters. @@snipHttpCompressionExample.scala {#compressed_http}

Redis

A redis server will take the following form:

import java.util.concurrent.ConcurrentHashMap

import akka.actor.ActorSystem
import akka.util.ByteString
import colossus.core.{IOSystem, ServerContext}
import colossus.protocols.redis._
import colossus.protocols.redis.server.{Initializer, RedisServer, RequestHandler}
import colossus.service.Callback
import colossus.service.GenRequestHandler.PartialHandler

object RedisServiceExample extends App {

  implicit val actorSystem: ActorSystem = ActorSystem()
  implicit val ioSystem: IOSystem       = IOSystem()

  val db = new ConcurrentHashMap[String, String]()

  RedisServer.start("example-server", 6379) { initContext =>
    new Initializer(initContext) {
      override def onConnect: RequestHandlerFactory = { serverContext =>
        new MyRequestHandler(serverContext, db)
      }
    }
  }
}

class MyRequestHandler(context: ServerContext, db: ConcurrentHashMap[String, String]) extends RequestHandler(context) {
  override def handle: PartialHandler[Redis] = {
    case Command("GET", args) =>
      args match {
        case head +: _ =>
          Option(db.get(head.utf8String)) match {
            case Some(value) => Callback.successful(BulkReply(ByteString(value)))
            case None        => Callback.successful(NilReply)
          }
        case Nil =>
          Callback.successful(ErrorReply("ERR wrong number of arguments for 'get' command"))
      }

    case Command("SET", args) =>
      args match {
        case key +: value +: _ =>
          db.put(key.utf8String, value.utf8String)
          Callback.successful(StatusReply("OK"))
        case Nil =>
          Callback.successful(ErrorReply("ERR wrong number of arguments for 'set' command"))
      }
  }
}

Configuration

A service can be configured either programmatically or by using typesafe config. The settings in colossus/src/resources/reference.conf will be used as defaults.

To configure via config, create or update application.conf with the server specific settings:

colossus {
  service {
    example-server {
      request-timeout : 1 second
      request-buffer-size : 100
      log-errors : true
      request-metrics : true
      max-request-size : "1000 MB"
    }
  }
}

To configure via code, create a ServiceConfig object and pass it to the RequestHandler.

val serviceConfig = ServiceConfig(
  requestTimeout = 1.second,
  requestBufferSize = 100,
  logErrors = true,
  requestMetrics = true,
  maxRequestSize = 10.MB
)
override def onConnect: RequestHandlerFactory =
  serverContext =>
    new RequestHandler(serverContext, serviceConfig) {

RequestHandler allows for the configuration of how request errors are reported. By default, ColossusRuntimeExceptions are converted to Strings and logged with no stack trace, and other exceptions are logged with a stack trace. A custom implementation of RequestExceptionFormatter can be provided as demonstrated in this example.

override def requestLogFormat: RequestExceptionFormatter[Request] = new RequestExceptionFormatter[Request] {
  override def format(request: Option[Request], error: Throwable): String = {
    s"$request failed with ${error.getClass.getSimpleName}"
  }

  override def formatterOption(error: Throwable): RequestFormatType = RequestFormatType.LogWithStackTrace
}
The source code for this page can be found here.