You might have seen on my twitter that my current company MFG Labs has opensourced the library Akka-Stream Extensions. We have developed it with Alexandre Tamborrino and Damien Pignaud for our recent production projects based on Typesafe Akka-Stream.

In this article, I won’t explain all the reasons that motivated our choice of Akka-Stream at MFG Labs and the road towards our library Akka-Stream Extensions. Here, I’ll focus on one precise aspect of our choice: types… And I’ll tell you about a specific extension I’ve created for this project: ShapelessStream.

The code is there on github



Akka-Stream loves Types

As you may know, I’m a Type lover. Types are proofs and proofs are types. Proofs are what you want in your code to ensure in a robust & reliable way that it does what it pretends with the support of the compiler.

First of all, let’s remind that typesafety is a very interesting feature of Akka-Stream.

Akka-Stream most basic primitive Flow[A, B] represents a data-flow that accepts elements of type A and will return elements of type B. You can’t pass a C to it and you are sure that this flow won’t return any C for example.

At MFG Labs, we have inherited some Scala legacy code mostly based on Akka actors which provide a very good way to handle failures but which are not typesafe at all (till Akka Typed) and not composable. Developers using Akka tend to scatter the business logic in the code and it can become hard to maintain. It has appeared that in many cases where Akka was used to transform data in a flow, call external services, Akka-Stream would be a very good way to replace those actors:

  • better type-safety,
  • fluent & composable code with great builder DSL
  • same Akka failure management
  • buffer, parallel computing, backpressure out-of-the-box

Yes, it’s quite weird to say it but Akka-Stream helped us correct most problems that had been introduced using Akka (rightly or wrongly).



Multi-type flows

Ok, Akka-Stream promotes Types as first citizen in your data flows. That’s cool! But it appears that you often need to handle multiple types in the same input channel:

When you control completely the types in input, you can represent input types by a classic ADT:

1
2
3
4
sealed trait A
case class A1(...) extends A
case class A2(...) extends A
case class A3(...) extends A

… And manage it in Flow[A, B]:

1
2
3
4
5
Flow[A].map {
  case A1(...) => // some B
  case A2(...) => // some B
  case A3(...) => // some B
}

Nice but you need to wrap all input types in an ADT and this involves some boring code that can even be different for every custom flow.

Going further, in general, you don’t want to do that, you want to dispatch every element to a different flow according to its type:

… and merge all results of all flows in one single channel…

… and every flow has its own behavior in terms of parallelism, bufferization, data generation and back-pressure…

In Akka-Stream, if you wanted to build a flow corresponding to the previous schema, you would have to use:

Have a look at the doc and see that it requires quite a bunch of lines to write one of those. It’s really powerful but quite tedious to implement and not so typesafe after all. Moreover, you certainly would have to write one FlexiRoute and one FlexiMerge per use-case as the number of inputs types and return types depend on your context.



Miles Sabin to the rescue

In my latest project, this dispatcher/flows/merger pattern was required in multiple places and as I’m lazy, I wanted something more elegant & typesafe if possible to build this kind of flow graphs.

Thinking in terms of pure types and from an external point of view, we can see the previous dispatcher/flows/merger flow graph in pseudo-code as:

1
2
3
4
Flow[
  Input = A1 or A2 or A3, // in input it accepts A1 or A2 or A3
  Ouput = B1 or B2 or B3  // in output it generates B1 or B2 or B3
]

And to build the full flow graph, we need to provide a list of flows for all pairs of input/output types corresponding to our graph branches:

1
Flow[A1, B1] and Flow[A2, B2] and Flow[A3, B3]

In Shapeless, there are 2 very very very useful structures:

  • Coproduct is a generalization of the well known Either. You have A or B in Either[A, B]. With Coproduct, you can have more than 2 alternatives A or B or C or D. So, for our previous external view of flow graph, using Coproduct, it could be written as:
1
2
3
4
Flow[
  A1 :+: A2 :+: A3 :+: CNil,
  B1 :+: B2 :+: B3 :+: CNil
]
  • HList allows to build heterogenous List of elements keeping & tracking all types at compile time. For our previous list of flows, it would fit quite well as we want to match all input/output types of all flows. It would give:
1
Flow[A1, B1] :: Flow[A2, B2] :: Flow[A3, B3] :: HNil

So, from an external point of view, the process of building our dispatcher/flows/merger flow graph looks like a Function taking aHlist of flowsin input and returning the builtFlow of Coproducts`:

1
2
Flow[A1, B1] :: Flow[A2, B2] :: Flow[A3, B3] :: HNil =>
  Flow[A1 :+: A2 :+: A3 :+: CNil, B1 :+: B2 :+: B3 :+: CNil]

Let’s write it in terms of Shapeless Scala code:

1
2
3
4
5
6
7
8
9
/**
 * Builds at compile-time a fully typed-controlled flow that transforms a HList of Flows to a Flow of the Coproduct of inputs to Coproduct of outputs.
 *
 * @param a Hlist of flows Flow[A1, B1] :: FLow[A2, B2] :: ... :: Flow[An, Bn] :: HNil
 * @return the flow of the Coproduct of inputs and the Coproduct of outputs Flow[A1 :+: A2 :+: ... :+: An :+: CNil, B1 :+: B2 :+: ... +: Bn :+: CNil, Unit]
 */
def coproductFlow[HL <: HList, CIn <: Coproduct, COut <: Coproduct](
  flows: HL
): Flow[CIn, COut, Unit]

Fantastic !!!

Now the question is how can we build at compile-time this Flow[CIn, COut, Unit] from an HList of Flows and be sure that the compiler checks all links are correctly typed and all types are managed by the provided flows?



Akka-Stream Graph Mutable builders

An important concept in Akka-Stream is the separation of concerns between:

  • constructing/describing a data-flow
  • materializing with live resources (like actor system)
  • running the data-flow by plugging live sources/sinks on it (like web, file, hdfs, queues etc…).

For the curious, you find the same idea in scalaz-stream but in a FP-purer way as scalaz-stream directly relies on Free concepts that formalize this idea quite directly.

Akka-Stream has taken a more custom way to respond to these requirements. To build complex data flows, it provides a very nice DSL described here. This DSL is based on the idea of a mutable structure used while building your graph until you decide to fix it definitely into an immutable structure.

An example from the doc:

1
2
3
4
5
6
7
8
9
10
11
12
13
val g = FlowGraph.closed() { implicit builder: FlowGraph.Builder[Unit] =>
  import FlowGraph.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val f1, f2, f3, f4 = Flow[Int].map(_ + 10)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
              bcast ~> f4 ~> merge
}

builder is the mutable structure used to build the flow graph using the DSL inside the {...} block.

The value g is the immutable structure resulting from the builder that will later be materialized and run using live resources.

Please remark that once built, g value can reused and materialized/run several times, it is just the description of your flow graph.

This idea of mutable builders is really interesting in general: mutability in the small can help a lot to make your building block efficient and easy to write/read without endangering immutability in the large.



Hacking mutable builders with Shapeless

My intuition was to hack these mutable Akka-Stream builders using Shapeless type-dependent mechanics to build a Flow of Coproducts from an HList of Flows…

Let’s show the real signature of coproductFlow:

1
2
3
4
5
6
7
8
9
10
11
12
def coproductFlow[HL <: HList, CIn <: Coproduct, COut <: Coproduct, CInOutlets <: HList, COutInlets <: HList](
  flows: HL
)(
  implicit
    flowTypes: FlowTypes.Aux[HL, CIn, COut],
    obuild: OutletBuilder.Aux[CIn, CInOutlets],
    ibuild: InletBuilder.Aux[COut, COut, COutInlets],
    otrav: ToTraversable.Aux[CInOutlets, List, Outlet[_]],
    itrav: ToTraversable.Aux[COutInlets, List, Inlet[COut]],
    selOutletValue: SelectOutletValue.Aux[CIn, CInOutlets],
    flowBuilder: FlowBuilderC.Aux[CIn, COut, CInOutlets, HL, COutInlets]
): Flow[CIn, COut, Unit]

Frightening!!!!!!!

No, don’t be, it’s just the transcription in types of the requirements to build the full flow.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def coproductFlow[HL <: HList, CIn <: Coproduct, COut <: Coproduct, CInOutlets <: HList, COutInlets <: HList](
  flows: HL
)(
  implicit
    // 1 - Introspects HList of Flows to extract
    //      * all input types in CIn Coproduct
    //      * all output types in COut Coproduct
    flowTypes: FlowTypes.Aux[HL, CIn, COut],

    // 2 - Builds all Akka-Stream outlets branching the FlexiRoute
    //     outlets to the right Flow inlet in the HList
    obuild: OutletBuilder.Aux[CIn, CInOutlets],

    // 3 - Builds all Akka-Stream inlets branching the outlets of each Flow
    //     in the HList to the right inlet of FlexiMerge
    ibuild: InletBuilder.Aux[COut, COut, COutInlets],

    // 4 - Technical structures to be able to traverse
    //     and select all those previously built thingies
    otrav: ToTraversable.Aux[CInOutlets, List, Outlet[_]],
    itrav: ToTraversable.Aux[COutInlets, List, Inlet[COut]],
    selOutletValue: SelectOutletValue.Aux[CIn, CInOutlets],

    // 5 - The AkkaStream mutable Builder that:
    //      * builds the input FlexiRoute with the right output types
    //      * plugs all FlexiRoute outlets to the right flow inlet
    //      * builds the output FlexiMerge with the right input types
    //      * plugs all flow outlets to the right inlet of FlexiMerge
    flowBuilder: FlowBuilderC.Aux[CIn, COut, CInOutlets, HL, COutInlets]
): Flow[CIn, COut, Unit]

The Scala code might seem a bit ugly to a few of you. That’s not false but keep in mind what we have done: mixing shapeless-style recursive implicit typeclass inference with the versatility of Akka-Stream mutable builders… And we were able to build our complex flow graph, to check all types and to plug all together at compile-time



Sample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 1 - Create a type alias for your coproduct
type C = Int :+: String :+: Boolean :+: CNil

// The sink to consume all output data
val sink = Sink.fold[Seq[C], C](Seq())(_ :+ _)

// 2 - a sample source wrapping incoming data in the Coproduct
val f = FlowGraph.closed(sink) { implicit builder => sink =>
  import FlowGraph.Implicits._
  val s = Source(() => Seq(
    Coproduct[C](1),
    Coproduct[C]("foo"),
    Coproduct[C](2),
    Coproduct[C](false),
    Coproduct[C]("bar"),
    Coproduct[C](3),
    Coproduct[C](true)
  ).toIterator)

// 3 - our typed flows
  val flowInt = Flow[Int].map{i => println("i:"+i); i}
  val flowString = Flow[String].map{s => println("s:"+s); s}
  val flowBool = Flow[Boolean].map{s => println("s:"+s); s}

// >>>>>> THE IMPORTANT THING
// 4 - build the coproductFlow in a 1-liner
  val fr = builder.add(ShapelessStream.coproductFlow(flowInt :: flowString :: flowBool :: HNil))
// <<<<<< THE IMPORTANT THING

// 5 - plug everything together using akkastream DSL
  s ~> fr.inlet
       fr.outlet ~> sink
}

// 6 - run it
f.run().futureValue.toSet should equal (Set(
  Coproduct[C](1),
  Coproduct[C]("foo"),
  Coproduct[C](2),
  Coproduct[C](false),
  Coproduct[C]("bar"),
  Coproduct[C](3),
  Coproduct[C](true)
))

FYI, Shapeless Coproduct provides a lot of useful operations on Coproducts such as unifying all types or merging Coproducts together.



Some compile errors now ?

Imagine you forget to manage one type of the Coproduct in the HList of flows:

1
2
3
4
5
6
...

// 4 - build the coproductFlow in a 1-liner
  val fr = builder.add(ShapelessStream.coproductFlow(flowInt :: flowString :: HNil))

..

If you compile, it will produce this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ShapelessExtensionsSpec.scala:97: overloaded method value ~> with alternatives:
[error]   (to: akka.stream.SinkShape[C])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit <and>
[error]   (to: akka.stream.Graph[akka.stream.SinkShape[C], _])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit <and>
[error]   [Out](flow: akka.stream.FlowShape[C,Out])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])akka.stream.scaladsl.FlowGraph.Implicits.PortOps[Out,Unit] <and>
[error]   [Out](junction: akka.stream.UniformFanOutShape[C,Out])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])akka.stream.scaladsl.FlowGraph.Implicits.PortOps[Out,Unit] <and>
[error]   [Out](junction: akka.stream.UniformFanInShape[C,Out])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])akka.stream.scaladsl.FlowGraph.Implicits.PortOps[Out,Unit] <and>
[error]   [Out](via: akka.stream.Graph[akka.stream.FlowShape[C,Out],Any])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])akka.stream.scaladsl.FlowGraph.Implicits.PortOps[Out,Unit] <and>
[error]   (to: akka.stream.Inlet[C])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit
[error]  cannot be applied to (akka.stream.Inlet[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.CIn]]])
[error]       s ~> fr.inlet
[error]         ^
[error] /Users/pvo/workspaces/mfg/akka-stream-extensions/extensions/shapeless/src/test/scala/ShapelessExtensionsSpec.scala:98: overloaded method value ~> with alternatives:
[error]   (to: akka.stream.SinkShape[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]]])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit <and>
[error]   (to: akka.stream.Graph[akka.stream.SinkShape[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]]], _])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit <and>
[error]   [Out](flow: akka.stream.FlowShape[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]],Out])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])akka.stream.scaladsl.FlowGraph.Implicits.PortOps[Out,Unit] <and>
[error]   [Out](junction: akka.stream.UniformFanOutShape[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]],Out])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])akka.stream.scaladsl.FlowGraph.Implicits.PortOps[Out,Unit] <and>
[error]   [Out](junction: akka.stream.UniformFanInShape[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]],Out])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])akka.stream.scaladsl.FlowGraph.Implicits.PortOps[Out,Unit] <and>
[error]   [Out](via: akka.stream.Graph[akka.stream.FlowShape[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]],Out],Any])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])akka.stream.scaladsl.FlowGraph.Implicits.PortOps[Out,Unit] <and>
[error]   (to: akka.stream.Inlet[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]]])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit
[error]  cannot be applied to (sink.Shape)
[error]            fr.outlet ~> sink
[error]                      ^

OUCHHHH, this is a mix of the worst error of Akka-Stream and the kind of errors you get with Shapeless :)

Don’t panic, breathe deep and just tell yourself that in this case, it just means that your types do not fit well

In general, the first line and the last lines are the important ones.

For input:

1
2
3
4
5
ShapelessExtensionsSpec.scala:97: overloaded method value ~> with alternatives:
[error]   (to: akka.stream.SinkShape[C])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit <and>
[error]  cannot be applied to (akka.stream.Inlet[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.CIn]]])
[error]       s ~> fr.inlet
[error]         ^

It just means you try to plug a C == Int :+: String :+: Bool :+: CNil to a Int :+: String :+: CNil and the compiler is angry against you!!

For output:

1
2
3
4
5
ShapelessExtensionsSpec.scala:98: overloaded method value ~> with alternatives:
[error]   (to: akka.stream.SinkShape[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]]])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit <and>
[error]   (to: akka.stream.Inlet[shapeless.:+:[Int,shapeless.:+:[String,com.mfglabs.stream.extensions.shapeless.FlowTypes.last.COut]]])(implicit b: akka.stream.scaladsl.FlowGraph.Builder[_])Unit
[error]  cannot be applied to (sink.Shape)
[error]            fr.outlet ~> sink

It just means you try to plug a Int :+: String :+: CNil to a C == Int :+: String :+: Bool :+: CNil and the compiler is 2X-angry against you!!!



Conclusion

Mixing the power of Shapeless compile-time type dependent structures and Akka-Stream mutable builders, we are able to build at compile-time a complex dispatcher/flows/merger flow graph that checks all types and all flows correspond to each other and plugs all together in a 1-liner…

This code is the first iteration on this principle but it appeared to be so efficient and I trusted the mechanism so much (nothing happens at runtime but just at compile-time) that I’ve put that in production two weeks ago. It runs like a charm.

Finally, there are a few specificities/limitations to know:

  • Wrapping input data into the Coproduct is still the boring part with some pattern matching potentially. But this is like Json/Xml validation, you need to validate only the data you expect. Yet I expect to reduce the work soon by providing a Scala macro that will generate this part for you as it’s just mechanical…

  • Wrapping everything in Coproduct could have some impact on performance if what you expect is pure performance but in my use-cases IO are so much more impacting that this is not a problem…

  • coproductFlow is built with a custom FlexiRoute with DemandFromAll condition & FlexiMerge using ReadAny condition. This implies :

    • the order is NOT guaranteed due to the nature of used FlexiRoute & FlexiMerge and potentially to the flows you provide in your HList (each branch flow has its own parallelism/buffer/backpressure behavior and is not necessarily a 1-to-1 flow).

    • the slowest branch will slow down all other branches (as with a broadcast). To manage these issues, you can add buffers in your branch flows to allow other branches to go on pulling input data

The future?

  • A macro generating the Coproduct wrapping flow

  • Some other flows based on Shapeless

Have more backpressured and typed fun…

Draft FreeR code is on Github


Introduction

I’ve recently pushed some Free code & doc to the cool project cats and I had a few more ideas in my head on optimizing Free and never took time to make them concrete. I’ve just found this time during my holidays…

Free Monad is often used to represent embedded DSL in functional programming languages like Haskell or Scala. One generally represents his grammar with a simple Functor ADT representing the available operations. Then, from within your programming language, Free Monad provides the facilities to:

  • build in a monadic & stack-safe way a static program composed of a sequence of operations,
  • compile this program,
  • run it.

To know more about the way to use Free and some more specific theory, please refer to recent draft doc I’ve pushed on cats

The well-known classic representation in Scala is the following:

1
2
3
sealed abstract class Free[F[_], A]
case class Pure[F[_], A](a: A) extends Free[F, A]
case class Suspend[F[_], A](a: F[Free[F, A]]) extends Free[F, A]

Please note that F[_] should be a Functor to take advantage of Free construction.

Building a program can then just be a classic sequence of monadic bind/flatMap on Free[S[_], _]:

1
2
3
4
5
for {
  a <- doA(...)
  b <- doB(a, ...)
  c <- doC(c, ...)
} yield (c)

This actually constructs a recursive structure looking like:

1
Suspend(F(Suspend(F(Suspend(F(....(Pure(a))))))))

It can be seen as a left-associated sequence of operations and as any left-associated structure, appending an element to it has a quadratic complexity. So the more you flatMap, the longer (in n²) it will take to drill down the structure.

So if you try such code:

1
2
3
4
5
6
def gen[I](i: I): Trampoline[I] = Trampoline.suspend(Trampoline.done(i))

//(a flatMap (b flatMap (c flatMap (...))))
def lftBind[S[_]](n: Int)(gen: Int => S[Int])(implicit M: Monad[S]) = {
  (1 to n).foldLeft(gen(0)){ case (acc, i) => acc flatMap { a => gen(i) } }
}

You will see that it has a quadratic curve in terms of execution time when you increase n.

First weakness of classic Free is its left-associativity that induces a quadratic complexity when flatMapping



Solving left-associated quadratic complexity

To solve it, the immediate idea is to make Free right-associative instead of left associative (This idea was proposed by Kiselyov & al. in a paper and called Continuation-Passing-Style or also Codensity construction)

This is already done in current scalaz/cats.Free by adding a new element to the Free ADT:

1
2
3
4
5
6
/** Call a subroutine and continue with the given function. */
sealed abstract class Gosub[S[_], B] extends Free[S, B] {
  type C
  val a: () => Free[S, C]
  val f: C => Free[S, B]
}

If you test the same previous code, it has a linear behavior when n increases.



Quadratic observability

In this great paper Reflection Without Remorse, Atze van der Ploeg & Oleg Kiselyov show that classic Free are subject to another tricky quadratic behavior when, within your sequence of operations, one need to observe the current internal state of Free.

Observing the state requires to drill down the recursive Free structure explicitly and go up and again down then up and again and again. As explained in the paper, this case is quite tricky because it’s very hard to see that it will happen. The deeper is the structure, the longer it takes to observe current state. It’s important to note that right-association doesn’t help in this case and that complexity is once again in O(n²).

The second weakness of Free is its quadratic complexity when observing internal state.

To solve it, in Reflection Without Remorse, they propose a very interesting approach by changing the representation of Free and take advantage of its sequential nature.

A Free becomes the association of 2 elements:

  • a FreeView representing current internal state of the Free
  • a sequence of bind/flatMap functions stored in an efficient data structure that can prepent/append in O(1).

For the data structure, they propose to use a type-aligned dequeue to keep track of all types.

I have tried to implement this structure using a typed-aligned FingerTree in Scala. The code is here. The result is pretty interesting but not much efficient: it has a linear behavior for left-association & observability but…

  • for lower values of n, FingerTree costs far too much to build
  • memory cost is so high that it triggers the JVM GC far too soon and limitates a lot what you can do
  • allocated memory locality isn’t good and requires huge amounts of memory jumps.
  • type-alignment makes code quite ugly (yes Scala type inference isn’t as powerful as Haskell in this case and laziness of Haskell helps a lot for FingerTree)

As a conclusion, the idea is really nice on paper but in practice, we need to find something that costs less than this type-aligned dequeue (even if my FingerTree code is really raw, too strict and not optimized at all).



FreeR hybrid structure

I wanted to improve Free behavior and decided to create a new version of it called FreeR thinking in terms of efficient Scala…

I really liked the idea of representing a Free as a pure sequence of operations with a view of current internal state.

To gain in efficiency, I decided to choose another efficient append/prepend data structure, optimized and very well known: Vector providing:

  • append/prepend in O(1) (in average),
  • random access in constant time,
  • quite good locality.

Then, I’ve decided to relax a lot type alignment and manipulate Any values internally and cast/reify to the right types when required.

BTW, I plagiarized some code written by Alois Cochard for his new IO model in Scalaz/8.0 branch… Alois is a great dev & had made concrete the idea I had in my head so why rewrite them differently? uh ;)

I also decided to reify the 2 kinds of operations:

  • Bind for flatMap/bind calls
  • Map for map calls

So a Free becomes:

1
2
3
4
5
6
7
8
// to mitigate the shock of Any in the code
// and provide helpers for casting/reifying
type Val = Any

case class FreeR[S[_], A](
  head: FreeView[S, Val],
  ops: Ops = Vector.empty
) extends FreeR[S, A]

with FreeView as:

1
2
3
4
5
6
7
8
9
10
// The view of Free internal state can be of 2 types:
sealed abstract class FreeView[S[_], A]

object FreeView {
  // Pure value
  case class Pure[S[_], A](a: A) extends FreeView[S, A]

  // Impure computation determined by the Functor S
  case class Impure[S[_], A](a: S[FreeR[S, A]]) extends FreeView[S, A]
}

and Ops are:

1
2
3
4
5
sealed trait Op
object Op {
  case class Map(f: Val => Val) extends Op
  case class Bind[S[_]](f: Val => FreeR[S, Val]) extends Op
}

FYI This code is less than 300 lines so nothing really horrible except a few ugly casts ;)



Left Association

The code used for testing can be found here



FreeR behavior is linear even for millions of flatMap (until the GC triggers naturally) whereas classic Free has clearly quadratic curve.



Observability

The code used for testing can be found here

FreeR behavior is quite linear even for millions of flatMap (until the GC triggers naturally) whereas classic Free has clearly quadratic curve.



Right association complexity

I finally tried to check the behavior of my new FreeR when using flatMap in a right associated way like:

1
2
3
4
// (... flatMap (_ => c flatMap (_ => b flatMap (_ => a))))
def rgtBind[S[_]](n: Int)(gen: Int => S[Int])(implicit M: Monad[S]) = {
  (1 to n).foldLeft(gen(n)){ case (acc, i) => gen(n-i) flatMap { _ => acc } }
}

This is not so frequent code but anyway, Free should be efficient for left & right associated code.

Using FreeR as described previously, I’ve discovered that it wasn’t efficient in right association when increasing n because it allocates recursively a lot of Vector with one element and it becomes slower and slower apparently (I’m not even sure of the real cause of it).

I’ve refined my representation by distinguishing 3 kinds of Free in my ADT:

1
2
3
4
5
6
7
8
// No op
case class FreeR0[S[_], A](head: FreeView[S, Val]) extends FreeR[S, A]

// One single op (typically the right association case)
case class FreeR1[S[_], A](head: FreeView[S, Val], op: Op) extends FreeR[S, A]

// Multiple ops
case class FreeRs[S[_], A](head: FreeView[S, Val], ops: Ops = Vector.empty) extends FreeR[S, A]

With this optimization, here is the performance in right association:

It is quite comparable to classic Free for n under 1 million but it becomes quite bad when n becomes big. Yet, it remains for more efficient than previous representation with just Vector.

I need to work more on this issue (apparently GC is triggered too early) to see if more optimizations for right association can be found…



Cherry on the cake: map-fusion optimization

Imagine doing a lot of map operations on a Free like:

1
2
3
def mapalot(n: Int): Trampoline[Long] = {
  (1 to n).foldLeft(Trampoline.done(0L)){ case (acc, i) => acc map { a => a + 1 } }
}

If you think just a bit, you will clearly see that:

1
a.map(f).map(g) == a.map(g compose f)

This is called map-fusion and as you may have deduced already, my decision to reify explicitly Bind and Map operations was made in this purpose.

If I can know there are several Map operations in a row, I can fusion them in one single Map by just calling mapFusion on a Free to optimize it:

1
2
3
4
5
6
7
val free = FreeRTools.mapalot(x)

// optimized free
val freeOpt = free.mapFusion

// run the trampoline
freeOpt.run

Here is the difference in performance between FreeR and FreeR.mapFusion:

As you can see, mapFusion can be very interesting in some cases.



Conclusion

Finally, I have created a new representation of Free using:

  • type-relaxed version of Reflection w/o Remorse
  • sequence of operations managed by Scala Vector
  • reification of Bind & Map operations
  • differenciation of None/Single/Multiple operations cases
  • Map Fusion optimization

It allows to have a Free with:

  • Linear behavior in Left-Assocation, Observability,
  • Stack-safety is sill ensured,
  • Right-Association should be optimized because it still has a too high cost for bigger n (yet it is far more acceptable than other alternatives),
  • Map Fusion can provide an interesting optimization when using multiple consecutive Map operations,
  • For small n, the cost is a bit higher than basic Free but quite low and acceptable.

It is really interesting as it makes Free more and more usable in real-life problems without having to rewrite the code bootstrapped with Free in a more optimized way. I personally find it quite promising!

Please remark that this code has been written for the great project cats that will soon be a viable & efficient alternative for functional structures in Scala.

The full code is there.

Don’t hesitate to test, find bugs, contribute, give remarks, ideas…

Have fun in FreeR world…





SCALEDN, EDN Scala API

Scaledn is a Scala EDN parser (runtime & compile-time)/serializer/validator based on :

It works only in Scala 2.11.x

The code & sample apps can be found on Github


Why EDN?…

Because Json is not so good & quite limitating

EDN is described as an extensible data notation specified (not really standardized) there. Clojure & Datalog used in Datomic are supersets of EDN.

EDN allows much more things than Json while keeping the same simplicity.

Here are the main points making EDN great to represent & exchange Data.


EDN manages number types far better than Json

For Json, all numbers (floating or integer, exponential or not) are all considered in the same way so numbers can only be mapped to the biggest number format: BigDecimal. It is really bad in terms of semantics and performance.

In EDN, numbers can be :

  • 64bits integer aka Long in Scala: 12345
  • 64bits floating point numbers & exponentials aka Double in Scala: 123.45e-9
  • Natural Integers aka BigInt in Scala: 1234567891234N
  • Exact Floating Number aka BigDecimal in Scala: 123.4578972345M

EDN knows much more about collections

Collections in Json are just:

  • lists of heterogenous json values
  • maps of key strings and json values.

In EDN, you can have:

  • heterogenous lists
1
(1 true "toto)
  • heterogenous vectors/arrays
1
[1 true "toto]
  • heterogenous sets
1
#{1 true "toto}
  • heterogenous maps with heterogenous keys & values
1
{1 "toto", "foo" 2}

EDN accepts characters & unicode

Json doesn’t know about characters outside strings.

EDN can manage chars:

1
2
3
4
5
6
7
8
9
10
11
12
// simple char
\c

// special chars
\newline
\return
\space
\tag
\\

// unicode
\u0308

EDN accepts comments & discarded values

There are special syntaxes:

  • comments are lines starting with ;
1
; this is a comment
  • values starting with #_ are parsed but discarded
1
"toto" 3 #_discarded 1.234

EDN knows about symbols & keywords

These are notions that don’t exist in Json.

Symbols can reference anything external or internal that you want to identify. A Symbol can have a namespace such as:

1
foo.foo2/bar

Keywords are unique identifiers or enumerated values that can be reused in your data structure. A Keyword is just a symbol preceded by a : such as

1
:foo.foo2/bar

EDN is extensible using tags

EDN is an extensible format using tags starting with # such as:

1
#foo/bar value

When parsing EDN format, the parser should provide tag handlers that can be applied when a tag is discovered. In this way, you can extend default format with your own formats.

EDN specifies 2 tag handlers by default:

  • #inst "1985-04-12T23:20:50.52Z" for RFC-3339 instants
  • #uuid "f81d4fae-7dec-11d0-a765-00a0c91e6bf6" for UUID

EDN has no root node & can be streamed

Json is defined to have a root map node: { key : value } or [ ... ].

Json can’t accept single values outside of this. So Json isn’t really meant to be streamed as you need to find closing tags to finish parsing a value.

EDN doesn’t require this and can consist in multiple heterogenous values:

1
1 123.45 "toto" true nil (1 2 3)

As a consequence, EDN can be used to stream your data structures.


Conclusion: EDN should be preferred to Json

All of these points make EDN a far better & stricter & more evolutive notation to represent data structures than Json. It can be used in the same way as Json but could make a far better RPC string format than Json.

I still wonder why Json has become the de-facto standard except for the reason that the not so serious Javascript language parses it natively and because people were so sick about XML that they would have accepted anything changing their daily life.

But JS could also parse EDN without any problem and all more robust & typed backend languages would earn a lot from using EDN instead of JSON for their interfaces.

EDN could be used in REST API & also for streaming API. That’s exactly why, I wanted to provide a complete Scala API for EDN to test this idea a bit further.



Scaledn insight


Runtime Parsing

Scaledn can be used to parse the EDN string or arrays of chars received by your API.

All types described in EDN format are isomorphic to Scala types so I’ve decided to skip the complete AST wrapping those types and directly parse to Scala types.

  • "foobar" is parsed to String
  • 123 is parsed to Long
  • (1 2 3) is parsed to List[Long]
  • (1 "toto" 3) is parsed to List[Any]
  • {"toto" 1 "tata" 2} is parsed to Map[String, Long]
  • {1 "toto" 2 "tata"} is parsed to Map[Long, String]
  • {1 "toto" true 3} is parsed to Map[Any, Any]
  • etc…

The parser (based on Parboiled2) provides 2 main functions:

1
2
3
4
5
6
7
8
import scaledn._
import parser._

// parses only the first EDN value discovered in the String input
def parseEDN(in: ParserInput): Try[EDN] = ...

// parses all EDN values discovered in the String input
def parseEDNs(in: ParserInput): Try[Seq[EDN]] = ...

If you look in common package, you’ll see that EDN is just an alias for Any ;)

Here is how you can use it:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import scaledn._
import parser._

// Single Value
parseEDN("""{1 "foo", "bar" 1.234M, :foo/bar [1,2,3]} #_foo/bar :bar/foo""") match {
  case Success(t) => \/-(t)
  case Failure(f : org.parboiled2.ParseError) => -\/(parser.formatError(f))
}

// Multiple Value
parseEDNs("""{1 "foo", "bar" 1.234M, :foo/bar [1,2,3]} :bar/foo""").success.value should be (
  Vector(
    Map(
      1L -> "foo",
      "bar" -> BigDecimal("1.234"),
      EDNKeyword(EDNSymbol("foo/bar", Some("foo"))) -> Vector(1, 2, 3)
    ),
    EDNKeyword(EDNSymbol("bar/foo", Some("bar")))
  )
))

Some people will think Any is a bit too large and I agree but it’s quite practical to use. Moreover, using validation explained a bit later, you can parse your EDN and then map it to a stronger typed scala structure and then Any disappears.


Compile-time parsing with Macros

When you use static EDN structures in your Scala code, you can write them in their string format and scaledn can parse them at compile-time using Scala macros and thus prevent a lot of errors you can encounter in dynamic languages.

The macro mechanism is based on quasiquotes & whitebox macro contexts which allow to infer types of your parsed EDN structures at compile-time. For example:

1
2
3
4
5
> val s:Long = EDN("\"toto\"")

[error]  found   : String("toto")
[error]  required: Long
[error]     val e: Long = EDN("\"toto\"")

Whooohooo magic :)


Classic Scala types

Here is how you can use it:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import scaledn._
import macros._

// All types are just for info and can be omitted below, the macro infers them quite well
val e: String = EDN("\"toto\"")

val bt: Boolean = EDN("true")

val bf: Boolean = EDN("false")

val l: Long = EDN("123")

val d: Double = EDN("123.456")

val bi: BigInt = EDN("123M")

val bd: BigDecimal = EDN("123.456N")

val s: EDNSymbol = EDN("foo/bar")

val kw: EDNKeyword = EDN(":foo/bar")

// Homogenous collection inferred as Vecto[String]
val vector: Vector[String] = EDN("""["tata" "toto" "tutu"]""")

// multiple heterogenous values inferred as Seq[Any]
val s = EDNs("""(1 2 3) "toto" [true false] :foo/bar""")
// note the small s at the end of EDN to inform the macro there are several values

Shapeless heterogenous collections

EDN allows to manipulate heterogenous collections. In Scala, when one thinks heterogenous collection, one thinks Shapeless. Scaledn macros can parse & map your EDN stringified structures to Scala strongly typed structures.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import scaledn._
import macros._

import shapeless.{HNil, ::}
import shapeless.record._
import shapeless.syntax.singleton._

// Heterogenous list
val s = EDNH("""(1 "toto" true)""")
s should equal (1L :: "toto" :: true :: HNil)

// Heterogenous Map/Record
val s3 = EDNH("""{1 "toto" true 1.234 "foo" (1 2 3)}""")
s3 should equal (
  1L ->> "toto" ::
  true ->> 1.234 ::
  "foo" ->> List(1L, 2L, 3L) ::
  HNil
)

please note the H in EDNH for heterogenous

I must say using these macros, it might be even simpler to write Shapeless hlists or records than using scala API ;)


Macro API

Scaledn provides different macros depending on the depth of introspection you require in your collection with respect to heterogeneity.

Have a look directly at Macro API


Mixing macro with Scala string interpolation

Following ideas implemented by Daniel James in Datomisca, scaledn proposes to use String interpolation mixed with parsing macro such as:

1
2
3
4
5
6
7
8
9
10
11
12
import scaledn._
import macros._

import shapeless.{HNil, ::}

val l = 123L
val s = List("foo", "bar")

val r: Long = EDN(s"$l")

val r1: Seq[Any] = EDN(s"($l $s)")
val r2: Long :: List[String] :: HNil = EDNH(s"($l $s)")

Nothing to add, macros are cool sometimes :)



Runtime validation of EDN to Scala

When writing REST or external API, the received data can never be trusted before being validated. So, you generally try to validate what is received and map it to a strong-typed structures. For example:

1
2
3
4
5
6
7
8
9
10
11
// parse the received string input
parseEDN("""{ 1 "toto" 2 "tata" 3 "tutu" }""")
// then validate it to a Scala type
  .map(validate[Map[Long, String]])
  .success.value should be (
    play.api.data.mapping.Success(Map(
      1L -> "toto",
      2L -> "tata",
      3L -> "tutu"
    ))
  )

The validation API is the following:

1
2
3
4
import scaledn._
import validate._

def validate[T](edn: EDN)(implicit r: RuleLike[EDN, T]): Validation[EDN, T] = r.validate(edn)

Scaledn validation is based on Generic Validation API developed by my MFGLabs’s colleague & friend Julien Tournay. This API was developed for Play Framework & Typesafe last year to generalize Json validation API to all data formats. But it will never be integrated in Play as Typesafe considers it to be too pure Scala & pure FP-oriented. Yet, we use this API in production at MFGLabs and maintain/extend it ourselves.

As explained before, Scaledn parser parses EDN values directly to Scala types as they are bijective so validation is often just a runtime cast and not very interesting in general.

What’s much more interesting is to validate to Shapeless HList, Records and even more interesting to CaseClasses & Tuples based on Shapeless fantastic auto-generated Generic macros.

Let’s take a few examples to show the power of this feature:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import scaledn._
import validate._

import play.api.data.mapping._
import shapeless.{HNil, ::}

case class CP(cp: Int)
case class Address(street: String, cp: CP)
case class Person(name: String, age: Int, addr: Address)
// Remark that NO implicits must be declared on our case classes

// HLISTS
parseEDN("""(1 "toto" true nil)""").map(
  validate[Long :: String :: Boolean :: EDNNil.type :: HNil]
).success.value should be (
  Success(1L :: "toto" :: true :: EDNNil :: HNil)
)

// TUPLES
parseEDN("""("toto" 34 {"street" "chboing", "cp" {"cp" 75009}})""").map(
  validate[Tuple3[String, Int, Address]]
).success.value should be (
  Success(("toto", 34, Address("chboing", CP(75009))))
)

// CASECLASSES
parseEDN("""("toto" 34 ("chboing" (75009)))""").map(
  validate[Person]
).success.value should be (
  Success(Person("toto", 34, Address("chboing", CP(75009))))
)

parseEDN("""{"name" "toto", "age" 34, "addr" {"street" "chboing", "cp" {"cp" 75009}}}""").map(
  validate[Person]
).success.value should be (
  Success(Person("toto", 34, Address("chboing", CP(75009))))
)

I think here you can see the power of this validation feature without writing any boilerplate…



Serializing Scala to EDN

Using Generic Validation API, you can also write scala structures to any other data format.

Scaledn provides serialization from scala structures to EDN Strings. For example:

1
2
3
4
5
import scaledn._
import write._

toEDNString("toto") should equal ("\"toto\"")
toEDNString(List(1, 2, 3)) should equal ("""(1 2 3)""")

The write API is the following:

1
2
3
4
import scaledn._
import write._

def toEDNString[I](i: I)(implicit w: WriteLike[I, String]): String = w.writes(i)

Once again, what’s more interesting is using shapeless & caseclasses & tuples.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import scaledn._
import write._

import shapeless.{HNil, ::}

// HLIST
toEDNString(1 :: true :: List(1L, 2L, 3L) :: HNil) should equal ("""(1 true (1 2 3))""")

// TUPLE
toEDNString((23, true)) should equal ("""(23 true)""")

// CASE CLASS
case class Address(street: String, cp: Int)
case class Person(name: String, age: Int, addr: Address)
// Remark that NO implicits must be declared on our case classes

toEDNString(Person("toto", 34, Address("chboing", 75009))) should equal (
  """{"name" "toto", "age" 34, "addr" {"street" "chboing", "cp" 75009}}"""
)

TODO

This project is a first draft so it requires a bit more work.

Here are a few points to work on:

  • patch remaining glitches/bugs
  • write more tests for all cases
  • study streamed parser asap
  • write sample apps

Don’t hesitate to test, find bugs, contribute, give remarks, ideas…

Have fun in EDN world…





Not an article, just some reflections on this idea…

You know what is a functor?

  • 2 categories C & D, simplifying a category as:

    • a set of objects with some arrows/morphisms/functions between objects: f: x -> y
    • those morphisms are associative h . (g . f) = (h . g) .f where . is the composition (g . f)(x) = g(f(x)))
    • for each object there is an identity morphism id(x) = x -> x
  • a functor F associates :

    • each object x of C with an object of F(x) of D.
    • each morphism f: x -> y of C with an element F(f): F(x) -> F(y) of D such that:
      • F(id(x)) = id(F(x))
      • F(g . f) = F(g) . F(f)

A Functor is a mapping (an homomorphism) between categories that conserves the structure of the category (the morphisms, the relation between objects) whatever the kind of objects those categories contain.

In scalaz, here is the definition of a Functor:

1
2
3
4
5
6
7
trait Functor[F[_]] {
  ////

  /** Lift `f` into `F` and apply to `F[A]`. */
  def map[A, B](fa: F[A])(f: A => B): F[B]
  ...
}

You can see the famous map function that you can find in many structures in Scala : List[_], Option[_], Map[_, _], Future[_], etc…

Why? Because all these structures are Functors between the categories of Scala types…

Math is everywhere in programming & programming is Math so don’t try to avoid it ;)

So you can write a Functor[List] or Functor[Option] as those structures are monoids.

Now let’s consider HList the heterogenous List provided by Miles Sabin’s fantastic Shapeless. HList looks like a nice Functor.

1
2
3
4
(1 :: true :: HNil) map (
   (i:Int)     => i.toString ,
   (b:Boolean) => b.toString)
) => ("1" :: "true" :: HNil)

Ok, it’s a bit more complex as this functor requires not one function but several for each element type constituting the HList, a kind of polymorphic function. Hopefully, Shapeless provides exactly a structure to represent this: Poly

What about writing a functor for HList?

Scalaz Functor isn’t very helpful (ok I just copy the HMonoid text & tweak it ;)):

To be able to write a Functor of HList, we need something else based on multiple different types…

I spent a few hours having fun on this idea with Shapeless and tried to implement a Functor for heterogenous structures like HList, Sized and even not heterogenous structures.

Here are the working samples.

Here is the code based on pseudo-dependent types as shapeless.

The signature of the HFunctor as a map function as expected:

1
2
3
4
5
6
7
8
9
trait HFunctor[HA, F <: Poly] {
    type Real

    trait HMapper[P <: Poly, In] extends DepFn1[In] { type Out }

    val hmapper: HMapper[F, HA]

    def map(ha: HA)(f: F): hmapper.Out = hmapper(ha)
  }

This is just a sandbox to open discussion on this idea so I won’t explain more and let the curious ones think about it…

Have F(un)!





Not an article, just some reflections on this idea…

You know what is a monoid?

  • a binary operation taking 2 elements and giving another element e x e -> e (aka a SemiGroup)
  • a Identity id element id . e = e . id = e (also called zero element)

(and some associativity)

In scalaz, here is the definition:

1
2
3
4
5
6
7
trait Monoid[F] extends Semigroup[F] { self =>
  ////
  /** The identity element for `append`. */
  def zero: F
  def append(f1: F, f2: => F): F
  ...
}

You can see the zero & the SemiGroup.append operations, right?

So you can write a Monoid[Int] or Monoid[List[A]] as those structures are monoids.

Now let’s consider HList the heterogenous List provided by Miles Sabin’s fantastic Shapeless. HList looks like a nice monoid.

1
(1 :: "toto" :: HNil) ++ (true :: HNil) => (1 :: "toto" :: true :: HNil)

What about writing a monoid for HList?

Scalaz monoid isn’t very helpful because our monoid operations would look like:

1
2
def zero: HNil.type
  def append(f1: H1 <: HList, f2: => H2 <: HList): H3 <: HList

So, to be able to write a Monoid of HList, we need something else based on multiple different types…

I spent a few hours having fun on this idea with Shapeless and tried to implement a Monoid for heterogenous structures like HList, Nat, Sized and even not heterogenous structures.

Here are the working samples.

Here is the code based on pseudo-dependent types as shapeless.

The signature of the HMonoid shows the zero and the Semigroup as expected:

1
trait HMonoid[A, B] extends HZero with HSemiGroup[A, B]

This is just a sandbox to open discussion on this idea so I won’t explain more and let the curious ones think about it…

Have Monoids of Fun!





The code & sample apps can be found on Github

Forget the buzz title, this project is still very draft but it’s time to expel it out of my R&D sandbox as imperfect as it might be… Before I lose my sanity while wandering in Scala macro hygiene ;)

What?

Daemonad is a nasty Scala macro that aims at:

  • marking where you manipulate monads or stacks of monads
  • compile-checking monadic behavior & implicit monad instances
  • allowing to snoop monad values deep into (some) monad stacks in the same way as ScalaAsync i.e. in a pseudo-imperative way.

This project is NOT yet stable, NOT very robust so use it at your own risks but we can discuss about it…

Here is what you can write right now.

1
2
3
4
5
6
7
8
9
Await.result(
  monadic[Future, Option] {
    val a = Future ( Some(9) )
    val b = Some(7)
    val c = 10
    if(snoop2(a) < 10) snoop1(b) + 10
    else c
  }, duration.Duration("1 second")
) should equal (Some(17))

Motivations

1 - Experiment writing a very ugly Scala macro

I wanted to write a huge & complex Scala macro that would move pieces of code, create more code, types etc…

I wanted to know the difficulties that it implies.

I felt reckless, courageous!

Nothing could stop me!!!!

Result: I was quite insane and I will certainly write a post-mortem article about it to show the horrible difficulties I’ve encountered. My advice: let people like hit their head against the wall and wait for improved hygienic macros that should come progressively before writing big macros ;)

2 - Investigate ScalaAsync generalization to all monads + (some) monad stacks

I had investigated ScalaAsync code and thought it would be possible to generalize it to all kinds of monads and go further by managing monad stacks.

Result : Simple monads are easy to manage (as seen also in scala-workflow which I discovered very recently) and some monad stacks can be managed with Scalaz monad transformers.

But don’t think you can use all kinds of monad transformers: the limits of Scala compiler with type-lambdas in macros and my very own limits blocked me from going as far as I expected.

So for now, it can manage Future/Option/List stacks & also Either \/ using type aliases.


3 - Explicitly Mark monadic blocks

There are 2 ways of seeing monads:

You don’t need or you don’t want to know what is a monad…

… And yet you use it everyday/everywhere.

This is what most of us (and it’s so shameful) do using those cool map/flatMap functions provided by Scala libraries that allow to access the values inside Future, List, Option in a protected way etc… That’s enough for you need in your everyday life, right?


You want to know or you know what is a monad …

… and you want to use them on purpose.

This is what hippy developers do in advanced Scala using Scalaz or even crazier in pure FP languages like Haskell.

Guess what I prefer?

Here is the kind of code I’d like to write :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// I write my datastructure without any map/flatMap function
case class Toto[A](a: A)

// Hey I proved Toto was a monad (yes believe me)

// Let's bring this concept into my scope
implicit object TotoMonad extends Monad[Toto] {
  def bind[A, B](fa: Toto[A])(f: A => Toto[B]): Toto[B] = {
    f(fa.a)
  }

  def point[A](a: => A): Toto[A] = Toto(a)
}

...
// I create my toto
val toto = Toto("this is toto")

...

// Suddenly I decide that I must use this monadic behavior of toto
monadic[Toto] {
  val a = <snoop_value_inside_monad>(toto) // outside the monadic block, you shall not do that
  do_something_with_value(a)
  // The compiler takes care that my structure is used in a pure monadic way
  // and returns a monad Toto of the right type
}
...

Ok I speak about pure functional programming and then about snooping the value out of the monad. This might seem a bit useless or even stupid compared to using directly Monad facilities. I agree and I still wonder about the sanity of this project but I’m stubborn and I try to finish what I start ;)


Back to code Sample

1
2
3
4
5
6
7
8
9
Await.result(
  monadic[Future, Option] {
    val a = Future ( Some(9) )
    val b = Some(7)
    val c = 10
    if(snoop2(a) < 10) snoop1(b) + 10
    else c
  }, duration.Duration("1 second")
) should equal (Some(17))

What does it do ?

  • monadic marks the monadic block
  • monadic[Future, Option] declares that you manipulate a stack Future[Option] (and no other)
  • snoopX means that you want to snoop the monad value at X-th level (1, 2, 3, 4 and no more for now)
  • the macro checks for implicit instances of monads (here List, Option, Future) and monad transformers (here OptionT & ListT) for this stack
  • the macro translates this code into crazy embedded Monad.bind/point/lift/run
  • snoop2 is used in first position: if you have used snoop1, the macro would have rejected your monadic block. It’s logical, when you use flatMap, you always start with the deeper stack of monad and I chose not to change the order of your code as I find this macro is already far too intrusive :)

I’m sure you don’t want to see the code you would have to write for this, this is quite long and boring.

Let just say that this code is generated by a Scala macro for you.

The current generated code isn’t optimized at all and quite redundant but this is for next iterations.


What is working ?

  • stacks with List/Option/Either up to depth 4
  • custom Monads
  • a few preliminary checkings that prevent you from writing stupid code but not so much
  • if/then/else & case/match in some cases

What isn’t working ?

  • monadic block assigned to a val not explicitly typed.
  • many things or edge-cases with mixed monad depth and if/match.
  • can’t use advanced monad transformers like StateT or WriterT in monadic block because Scala compiler doesn’t allow what I expected with type lambdas. This needs to be studied further.

A very stupid example to finish with 4-depth stack

1
2
3
4
5
6
7
8
9
10
11
12
it should """snoop4 on stupid stack""" in {
    type S[T] = ({ type l[T] = \/[String, T] })#l[T]
    Await.result(
      monadic[Future, S, List, Option] {
        val a: Future[S[List[Option[Int]]]] = Future(\/-(List(Some(5), Some(10))))
        val b: S[List[Option[Int]]] = \/-(List(Some(1), Some(2)))
        val c: List[Option[Int]] = List(Some(3), Some(4))
        val d: Option[Int] = Some(2)
        (snoop4(a) + snoop3(b) * 2 - snoop2(c)) / snoop1(d)
      }, duration.Duration("1 second")
    ) should equal (\/-(List(Some(2), Some(1), Some(3), Some(2), Some(4), Some(4), Some(5), Some(5))))
  }

Note that:

  • you have to use a type alias to Scalaz \/ to one parametric type.
  • you have to help a bit the compiler about type alias S or it will infer \/[A, B] which is not what we want for the monad. This might seem tedious but I’ll study if I can go around this.
  • look at the result: you have a 2 elements list and at the end, you have a 8 elements list: WHAT???? No it’s normal, this is the result flatmap between first and second and third list. 2*2*2 = 8… nothing strange but it can be surprising at first glance ;)

TODO

  • refactor all code because it’s ugly, not robust and redundant!!!
  • rely on MonadTrans[F[_], _] instead of hardcoding monad transformers as now.
  • accept custom MonadTrans provided in the user code.
  • steal some inspiration from scala-workflow because I find this code cool.

Special Thanks

  • Eugene Burmako (@xeno_by) for helping me each time I was lost in macros
  • Jason Zaugg (@retronym) for Scala Async and splicer
  • Daniel James (@dwhjames) for the snoop name
  • Thibaut Duplessis (@ornicar) for the monad stack idea

Have a look at the code on Github.

Have snoop22(macrofun)!





Synopsis

The code & sample apps can be found on Github

Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…

Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.

So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:

  1. Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
  2. Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
  3. Train Spark-ML recommendation-like model using NIO client/server (Part 3)
  4. Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)

[Part 3/3] Fancy Spark Machine Learning with NIO client/server & DStream…


Let’s remind that I’m not an expert in ML but more a student. So if I tell or do stupid ML things, be indulgent ;)

Here is what I propose:

  • Train a collaborative filtering rating model for a recommendation system (as explained in Spark doc there) using a first NIO server and a client as presented in part 2.

  • When model is trained, create a second server that will accept client connections to receive data.

  • Stream/merge all received data into one single stream, dstreamize it and perform streamed predictions using previous model.

Train collaborative filtering model

Training client

As explained in Spark doc about collaborative filtering, we first need some data to train the model. I want to send those data using a NIO client.

Here is a function doing this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//////////////////////////////////////////////////
// TRAINING CLIENT
def trainingClient(addr: InetSocketAddress): Process[Task, Bytes] = {

  // naturally you could provide much more data
  val basicTrainingData = Seq(
    // user ID, item ID, Rating
    "1,1,5.0",
    "1,2,1.0",
    "1,3,5.0",
    "1,4,1.0",
    "2,4,1.0",
    "2,2,5.0"
  )
  val trainingProcess =
    Process.emitAll(basicTrainingData map (s => Bytes.of((s+"\n").getBytes)))

  // sendAndCheckSize is a special client sending all data emitted 
  // by the process and verifying the server received all data 
  // by acknowledging all data size
  val client = NioClient.sendAndCheckSize(addr, trainingProcess)

  client
}

Training server

Now we need the training NIO server waiting for the training client to connect and piping the received data to the model.

Here is a useful function to help creating a server as described in previous article part:

1
2
3
4
5
6
7
8
9
10
11
12
13
def server(addr: InetSocketAddress): (Process[Task, Bytes], Signal[Boolean]) = {

  val stop = async.signal[Boolean]
  stop.set(false).run

  // this is a server that is controlled by a stop signal
  // and that acknowledges all received data by their size
  val server =
    ( stop.discrete wye NioServer.ackSize(addr) )(wye.interrupt)

  // returns a stream of received data & a signal to stop server
  (server, stop)
}

We can create the training server with it:

1
2
3
4
val trainingAddr = NioUtils.localAddress(11100)
//////////////////////////////////////////////////
// TRAINING SERVER
val (trainingServer, trainingStop) = server(trainingAddr)

trainingServer is a Process[Task, Bytes] streaming the training data received from training client. We are going to train the rating model with them.


Training model

To train a model, we can use the following API:

1
2
3
4
5
6
7
8
// the rating with user ID, product ID & rating
case class Rating(val user: Int, val product: Int, val rating: Double)

// A RDD of ratings
val ratings: RDD[Rating] = ...

// train the model with it
val model: MatrixFactorizationModel = ALS.train(ratings, 1, 20, 0.01)

Building RDD[Rating] from server stream

Imagine that we have a continuous flow of training data that can be very long.

We want to train the model with just a slice of this flow. To do this, we can:

  • dstreamize the server output stream
  • run the dstream for some time
  • retrieve the RDDs received during this time
  • union all of those RDDs
  • push them to the model

Here is the whole code with previous client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
val trainingAddr = NioUtils.localAddress(11100)

//////////////////////////////////////////////////
// TRAINING SERVER
val (trainingServer, trainingStop) = server(trainingAddr)

//////////////////////////////////////////////////
// TRAINING CLIENT
val tclient = trainingClient(trainingAddr)

//////////////////////////////////////////////////
// DStreamize server received data
val (trainingServerSink, trainingDstream) = dstreamize(
  trainingServer
      // converts bytes to String (doesn't care about encoding, it shall be UTF8)
      .map  ( bytes => new String(bytes.toArray) )
      // rechunk received strings based on a separator \n 
      // to keep only triplets: "USER_ID,PROD_ID,RATING"
      .pipe (NioUtils.rechunk { s:String => (s.split("\n").toVector, s.last == '\n') } )
  , ssc
)

//////////////////////////////////////////////////
// Prepare dstream output 
// (here we print to know what has been received)
trainingDstream.print()

//////////////////////////////////////////////////
// RUN

// Note the time before
val before = new Time(System.currentTimeMillis)

// Start SSC
ssc.start()

// Launch server
trainingServerSink.run.runAsync( _ => () )

// Sleeps a bit to let server listen
Thread.sleep(300)

// Launches client and awaits until it ends
tclient.run.run

// Stop server
trainingStop.set(true).run

// Note the time after
val after = new Time(System.currentTimeMillis)

// retrieves all dstreamized RDD during this period
val rdds = trainingDstream.slice(
  before.floor(Milliseconds(1000)), after.floor(Milliseconds(1000))
)

// unions them (this operation can be expensive)
val union: RDD[String] = new UnionRDD(ssc.sparkContext, rdds)

// converts "USER_ID,PROD_ID,RATING" triplets into Ratings
val ratings = union map { e =>
  e.split(',') match {
    case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
  }
}

// finally train the model with it
val model = ALS.train(ratings, 1, 20, 0.01)

// Predict
println("Prediction(1,3)=" + model.predict(1, 3))

//////////////////////////////////////////////////
// Stop SSC
ssc.stop()

Run it

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-------------------------------------------
Time: 1395079621000 ms
-------------------------------------------
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,4,1.0
2,2,5.0

-------------------------------------------
Time: 1395079622000 ms
-------------------------------------------

-------------------------------------------
Time: 1395079623000 ms
-------------------------------------------

-------------------------------------------
Time: 1395079624000 ms
-------------------------------------------

-------------------------------------------
Time: 1395079625000 ms
-------------------------------------------

Prediction(1,3)=4.94897842056338

Fantastic, we have trained our model in a very fancy way, haven’t we?

Personally, I find it interesting that we can take advantage of both APIs…



Predict Ratings

Now that we have a trained model, we can create a new server to receive data from clients for rating prediction.


Prediction client

Firstly, let’s generate some random data to send for prediction.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//////////////////////////////////////////////////
// PREDICTION CLIENT
def predictionClient(addr: InetSocketAddress): Process[Task, Bytes] = {

  // PREDICTION DATA
  def rndData =
    // userID
    (Math.abs(scala.util.Random.nextInt) % 4 + 1).toString +
    // productID
    (Math.abs(scala.util.Random.nextInt) % 4 + 1).toString +
    "\n"

  val rndDataProcess = Process.eval(Task.delay{ rndData }).repeat

  // a 1000 elements process emitting every 10ms
  val predictDataProcess =
    (Process.awakeEvery(10 milliseconds) zipWith rndDataProcess){ (_, s) => Bytes.of(s.getBytes) }
      .take(1000)

  val client = NioClient.sendAndCheckSize(addr, predictDataProcess)

  client
}

Prediction server

1
2
3
4
val predictAddr = NioUtils.localAddress(11101)
//////////////////////////////////////////////////
// PREDICTION SERVER
val (predictServer, predictStop) = server(predictAddr)

Prediction Stream

predictServer is the stream of data to predict. Let’s stream it to the model by dstreamizing it and transforming all built RDDs by passing them through model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//////////////////////////////////////////////////
// DStreamize server
val (predictServerSink, predictDstream) = dstreamize(
  predictServer
      // converts bytes to String (doesn't care about encoding, it shall be UTF8)
      .map  ( bytes => new String(bytes.toArray) )
      // rechunk received strings based on a separator \n
      .pipe (NioUtils.rechunk { s:String => (s.split("\n").toVector, s.last == '\n') } )
  , ssc
)

//////////////////////////////////////////////////
// pipe dstreamed RDD to prediction model
// and print result
predictDstream map { _.split(',') match {
  // converts to integers required by the model (USER_ID, PRODUCT_ID)
  case Array(user, item) => (user.toInt, item.toInt)
}} transform { rdd =>
  // prediction happens here
  model.predict(rdd)
} print()

Running all in same StreamingContext

I’ve discovered a problem here because the recommendation model is built in a StreamingContext and uses RDDs built in it. So you must use the same StreamingContext for prediction. So I must build my training dstreamized client/server & prediction dstreamized client/server in the same context and thus I must schedule both things before starting this context.

Yet the prediction model is built from training data received after starting the context so it’s not known before… So it’s very painful and I decided to be nasty and consider the model as a variable that will be set later. For this, I used a horrible SyncVar to set the prediction model when it’s ready… Sorry about that but I need to study more about this issue to see if I can find better solutions because I’m not satisfied about it at all…

So here is the whole training/predicting painful code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
//////////////////////////////////////////////////
// TRAINING

val trainingAddr = NioUtils.localAddress(11100)

// TRAINING SERVER
val (trainingServer, trainingStop) = server(trainingAddr)

// TRAINING CLIENT
val tclient = trainingClient(trainingAddr)

// DStreamize server
val (trainingServerSink, trainingDstream) = dstreamize(
  trainingServer
      // converts bytes to String (doesn't care about encoding, it shall be UTF8)
      .map  ( bytes => new String(bytes.toArray) )
      // rechunk received strings based on a separator \n
      .pipe (NioUtils.rechunk { s:String => (s.split("\n").toVector, s.last == '\n') } )
  , ssc
)

// THE HORRIBLE SYNCVAR CLUDGE (could have used atomic but not better IMHO)
var model = new SyncVar[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]
// THE HORRIBLE SYNCVAR CLUDGE (could have used atomic but not better IMHO)


//////////////////////////////////////////////////
// PREDICTING
val predictAddr = NioUtils.localAddress(11101)

// PREDICTION SERVER
val (predictServer, predictStop) = server(predictAddr)

// PREDICTION CLIENT
val pClient = predictionClient(predictAddr)

// DStreamize server
val (predictServerSink, predictDstream) = dstreamize(
  predictServer
      // converts bytes to String (doesn't care about encoding, it shall be UTF8)
      .map  ( bytes => new String(bytes.toArray) )
      // rechunk received strings based on a separator \n
      .pipe ( NioUtils.rechunk { s:String => (s.split("\n").toVector, s.last == '\n') } )
  , ssc
)

// Piping received data to the model
predictDstream.map {
  _.split(',') match {
    case Array(user, item) => (user.toInt, item.toInt)
  }
}.transform { rdd =>
  // USE THE HORRIBLE SYNCVAR
  model.get.predict(rdd)
}.print()

//////////////////////////////////////////////////
// RUN ALL
val before = new Time(System.currentTimeMillis)

// Start SSC
ssc.start()

// Launch training server
trainingServerSink.run.runAsync( _ => () )

// Sleeps a bit to let server listen
Thread.sleep(300)

// Launch training client
tclient.run.run

// Await SSC termination a bit
ssc.awaitTermination(1000)
// Stop training server
trainingStop.set(true).run
val after = new Time(System.currentTimeMillis)

val rdds = trainingDstream.slice(before.floor(Milliseconds(1000)), after.floor(Milliseconds(1000)))
val union: RDD[String] = new UnionRDD(ssc.sparkContext, rdds)

val ratings = union map {
  _.split(',') match {
    case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
  }
}

// SET THE HORRIBLE SYNCVAR
model.set(ALS.train(ratings, 1, 20, 0.01))

println("**** Model Trained -> Prediction(1,3)=" + model.get.predict(1, 3))

// Launch prediction server
predictServerSink.run.runAsync( _ => () )

// Sleeps a bit to let server listen
Thread.sleep(300)

// Launch prediction client
pClient.run.run

// Await SSC termination a bit
ssc.awaitTermination(1000)
// Stop server
predictStop.set(true).run

Run it…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
-------------------------------------------
Time: 1395144379000 ms
-------------------------------------------
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,4,1.0
2,2,5.0

**** Model Trained -> Prediction(1,3)=4.919459410565401

...

-------------------------------------------
Time: 1395144384000 ms
-------------------------------------------
----------------

-------------------------------------------
Time: 1395144385000 ms
-------------------------------------------
Rating(1,1,4.919459410565401)
Rating(1,1,4.919459410565401)
Rating(1,1,4.919459410565401)
Rating(1,1,4.919459410565401)
Rating(1,2,1.631952450379809)
Rating(1,3,4.919459410565401)
Rating(1,3,4.919459410565401)

-------------------------------------------
Time: 1395144386000 ms
-------------------------------------------
Rating(1,1,4.919459410565401)
Rating(1,1,4.919459410565401)
Rating(1,3,4.919459410565401)
Rating(1,3,4.919459410565401)
Rating(1,3,4.919459410565401)
Rating(1,4,0.40813133837755494)
Rating(1,4,0.40813133837755494)

...


Final conclusion

3 long articles to end in training a poor recommendation system with 2 clients/servers… A bit bloated isn’t it? :)

Anyway, I hope I printed in your brain a few ideas, concepts about spark & scalaz-stream and if I’ve reached this target, it’s already enough!

Yet, I’m not satisfied about a few things:

  • Training a model and using it in the same StreamingContext is still clumsy and I must say that calling model.predict from a map function in a DStream might not be so good in a cluster environment. I haven’t been digging this code enough to have a clear mind on it.
  • I tried using multiple clients for prediction (like 100 in parallel) and it works quite well but I have encountered problems ending both my clients/servers and the streaming context and I often end into having zombies SBT process that I can’t kill until reboot (some threads remain RUNNING while other AWAITS and sockets aren’t released… resources issues…). Closing cleanly all of these tools creating threads & more after intensive work isn’t yet good.

But, I’m satisfied globally:

  • Piping a scalaz-stream Process into a spark DStream works quite well and might be interesting after all.
  • The new scalaz-stream NIO API considering clients & servers as pure streams of data gave me so many ideas that my free-time has suddenly been frightened and went away.


GO TO PART2 <—————————————————————————————————-

Have a look at the code on Github.

Have distributed & resilient yet continuous fun!





Synopsis

The code & sample apps can be found on Github

Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…

Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.

So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:

  1. Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
  2. Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
  3. Train Spark-ML recommendation-like model using NIO client/server (Part 3)
  4. Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)

[Part 2/3] From Scalaz-Stream NIO client & server to Spark DStream

Scalaz-Stream NIO Client

What is a client?

  • Something sending some data W (for Write) to a server
  • Something reading some data I (for Input) from a server

Client seen as Process

A client could be represented as:

  • a Process[Task, I] for input channel (receiving from server)
  • a Process[Task, W] for output channel (sending to server)

In scalaz-stream, recently a new structure has been added :

1
final case class Exchange[I, W](read: Process[Task, I], write: Sink[Task, W])

Precisely what we need!

Now, let’s consider that we work in NIO mode with everything non-blocking, asynchronous etc…

In this context, a client can be seen as something generating soon or later one (or more) Exchange[I, W] i.e :

1
Client[I, W] === Process[Task, Exchange[I, W]]

In the case of a pure TCP client, I and W are often Bytes.

Creating a client

Scalaz-Stream now provides a helper to create a TCP binary NIO client:

1
2
3
4
5
6
7
8
9
10
11
12
// the address of the server to connect to
val address: InetSocketAddress = new InetSocketAddress("xxx.yyy.zzz.ttt", port)

// create a client
val client: Process[Task, Exchange[Bytes, Bytes]] = nio.connect(address)

client map { ex: Exchange[Bytes, Bytes] =>
  // read data sent by server in ex.read
  ???
  // write data to the server with ex.write
  ???
}

Plug your own data source on Exchange

To plug your own data source to write to server, Scalaz-Stream provides 1 more API:

1
2
3
4
5
6
7
8
9
case class Exchange[I, W](read: Process[Task, I], write: Sink[Task, W]) {
  /**
   * Runs supplied Process of `W` values by sending them to remote system.
   * Any replies from remote system are received as `I` values of the resulting process.
   */
  def run(p:Process[Task,W]): Process[Task,I]

  // the W are sent to the server and we retrieve only the received data
}

With this API, we can write data to the client and output received data.

1
2
3
4
5
6
7
8
9
// some data to be sent by client
val data: Process[Task, W] = ...

// send data and retrieve only responses received by client
val output: Process[Task, I] = client flatMap { ex =>
  ex.run(data)
}

val receivedData: Seq[Bytes] = output.runLog.run

Yet, in general, we need to:

  • send custom data to the server
  • expect its response
  • do some business logic
  • send more data
  • etc…

So we need to be able to gather in the same piece of code received & emitted data.


Managing client/server business logic with Wye

Scalaz-stream can help us with the following API:

1
2
3
4
5
6
7
8
9
10
11
12
case class Exchange[I, W](read: Process[Task, I], write: Sink[Task, W]) {
...
  /**
   * Transform this Exchange to another Exchange where queueing, and transformation of this `I` and `W`
   * is controlled by supplied WyeW.
   */
  def wye(w: Wye[Task, I, W2, W \/ I2]): Exchange[I2, W2]
...

// It transforms the original Exchange[I, W] into a Exchange[I2, W2]

}

Whoaaaaa complex isn’t it? Actually not so much…

Wye is a fantastic tool that can:

  • read from a left and/or right branch (in a non-deterministic way: left or right or left+right),
  • perform computation on left/right received data,
  • emit data in output.

I love ASCII art schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
> Wye[Task, I, I2, W]

    I(left)       I2(right)
          v       v
          |       |
          ---------
              |
 ---------------------------
|    Wye[Task, I, I2, W]    |
 ---------------------------
              |
              v
              W

\/ is ScalaZ disjunction also called `Either in the Scala world.

So Wye[Task, I, W2, W \/ I2] can be seen as:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> Wye[Task, I, W2, W \/ I2]

          I       W2
          v       v
          |       |
          ---------
              |
 ---------------------------
| Wye[Task, I, W2, W \/ I2] |
 ---------------------------
              |
          ---------
          |       |
          v       v
          W       I2

So what does this Exchange.wye API do?

  • It plugs the original Exchange.write: Sink[Task, W] to the W output of the Wye[Task, I, W2, W \/ I2] for sending data to the server.
  • It plugs the Exchange.read: Process[Task, I] receiving data from server to the left input of the Wye[Task, I, W2, W].
  • The right intput W2 branch provides a plug for an external source of data in the shape of Process[Task, W2].
  • The right output I2 can be used to pipe data from the client to an external local process (like streaming out data received from the server).
  • Finally it returns an Exchange[I2, W2].

In a summary:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> (ex:Exchange[I, W]).wye( w:Wye[Task, I, W2, W \/ I2] )

        ex.read
          |
          v
          I       W2
          v       v
          |       |
          ---------
              |
 -----------------------------
| w:Wye[Task, I, W2, W \/ I2] |
 -----------------------------
              |
          ---------
          |       |
          v       v
          W       I2
          |
          v
      ex.write

======> Returns Exchange[I2, W2]

As a conclusion, Exchange.wye combines the original Exchange[I, W] with your custom Wye[Task, I, W2, W \/ I2] which represents the business logic of data exchange between client & server and finally returns a Exchange[I2, W2] on which you can plug your own data source and retrieve output data.


Implement the client with wye/run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// The source of data to send to server
val data2Send: Process[Task, Bytes] = ...

// The logic of your exchange between client & server
val clientWye: Wye[Task, Bytes, Bytes, Bytes \/ Bytes])= ...
// Scary, there are so many Bytes

val clientReceived: Process[Task, Bytes] = for {
  // client connects to the server & returns Exchange
  ex   <- nio.connect(address)

  // Exchange is customized with clientWye
  // Data are injected in it with run
  output <- ex.wye(clientWye).run(data2Send)
} yield (output)

Implement simple client/server business logic?

Please note, I simply reuse the basic echo example provided in scalaz-stream ;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def clientEcho(address: InetSocketAddress, data: Bytes): Process[Task, Bytes] = {

  // the custom Wye managing business logic
  def echoLogic: Wye[Bytes, Bytes, Bytes, Byte \/ Bytes] = {

    def go(collected: Int): WyeW[Bytes, Bytes, Bytes, Bytes] = {
      // Create a Wye that can receive on both sides
      receiveBoth {
        // Receive on left == receive from server
        case ReceiveL(rcvd) =>
          // `emitO` outputs on `I2` branch and then...
          emitO(rcvd) fby
            // if we have received everything sent, halt
            (if (collected + rcvd.size >= data.size) halt
            // else go on collecting
            else go(collected + rcvd.size))

        // Receive on right == receive on `W2` branch == your external data source
        case ReceiveR(data) =>
          // `emitW` outputs on `W` branch == sending to server
          // and loops
          emitW(data) fby go(collected)

        // When server closes
        case HaltL(rsn)     => Halt(rsn)
        // When client closes, we go on collecting echoes
        case HaltR(_)       => go(collected)
      }
    }

    // Init
    go(0)
  }

  // Finally wiring all...
  for {
    ex   <- nio.connect(address)
    rslt <- ex.wye(echoSent).run(emit(data))
  } yield {
    rslt
  }
}

This might seem hard to catch to some people because of scalaz-stream notations and wye Left/Right/Both or wye.emitO/emitW. But actually, you’ll get used to it quite quickly as soon as you understand wye. Keep in mind that this code uses low-level scalaz-stream API without anything else and it remains pretty simple and straighforward.


Run the client for its output

1
2
3
4
5
6
7
8
9
// create a client that sends 1024 random bytes
val dataArray = Array.fill[Byte](1024)(1)
scala.util.Random.nextBytes(dataArray)
val clientOutput = clientEcho(addr, Bytes.of(dataArray))

// consumes all received data... (it should contain dataArray)
val result = clientOutput.runLog.run

println("Client received:"+result)

It would give something like:

1
Client received:Vector(Bytes1: pos=0, length=1024, src: (-12,28,55,-124,3,-54,-53,66,-115,17...)

Now, you know about scalaz-stream clients, what about servers???



Scalaz-stream NIO Server

Let’s start again :D

What is a server?

  • Something listening for client(s) connection
  • When there is a client connected, the server can :
    • Receive data I (for Input) from the client
    • Send data W (for Write) to the client
  • A server can manage multiple clients in parallel

Server seen as Process

Remember that a client was defined above as:

1
Client === Process[Task, Exchange[I, W]]

In our NIO, non-blocking, streaming world, a server can be considered as a stream of clients right?

So finally, we can model a server as :

1
2
Server === Process[Task, Client[I, W]]
       === Process[Task, Process[Task, Exchange[I, W]]]

Whoooohoooo, a server is just a stream of streams!!!!


Writing a server

Scalaz-Stream now provides a helper to create a TCP binary NIO server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// the address of the server
val address: InetSocketAddress = new InetSocketAddress("xxx.yyy.zzz.ttt", port)

// create a server
val server: Process[Task, Process[Task, Exchange[Bytes, Bytes]]] =
  nio.server(address)

server map { client =>
  // for each client
  client flatMap { ex: Exchange[Bytes, Bytes] =>
    // read data sent by client in ex.read
    ???
    // write data to the client with ex.write
    ???
  }
}

Don’t you find that quite elegant? ;)


Managing client/server interaction business logic

There we simply re-use the Exchange described above so you can use exactly the same API than the ones for client. Here is another API that can be useful:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type Writes1[W, I, I2] = Process[I, W \/ I2]

case class Exchange[I, W](read: Process[Task, I], write: Sink[Task, W]) {
...
  /**
   * Transforms this exchange to another exchange, that for every received `I` will consult supplied Writer1
   * and eventually transforms `I` to `I2` or to `W` that is sent to remote system.
   */
  def readThrough[I2](w: Writer1[W, I, I2])(implicit S: Strategy = Strategy.DefaultStrategy) : Exchange[I2,W]
...
}

// A small schema?
            ex.read
              |
              v
              I
              |
 ---------------------------
|    Writer1[W, I, I2]      |
 ---------------------------
              |
          ---------
          |       |
          v       v
          W       I2
          |
          v
       ex.write

======> Returns Exchange[I2, W]

With this API, you can compute some business logic on the received data from client.

Let’s write the echo server corresponding to the previous client (you can find this sample in scalaz-stream too):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def serverEcho(address: InetSocketAddress): Process[Task, Process[Task, Bytes]] = {

  // This is a Writer1 that echoes everything it receives to the client and emits it locally
  def echoAll: Writer1[Bytes, Bytes, Bytes] =
    receive1[Bytes, Bytes \/ Bytes] { i =>
      // echoes on left, emits on right and then loop (fby = followed by)
      emitSeq( Seq(\/-(i), -\/(i)) ) fby echoAll
    }

  // The server that echoes everything
  val receivedData: Process[Task, Process[Task, Bytes]] =
    for {
      client <- nio.server(address)
      rcv    <- ex.readThrough(echoAll).run()
    } yield rcv
  }

  receivedData
}

receivedData is Process[Task, Process[Task, Bytes]] which is not so practical: we would prefer to gather all data received by clients in 1 single Process[Task, Bytes] to stream it to another module.

Scalaz-Stream has the solution again:

1
2
3
4
5
6
7
package object merge {
  /**
   * Merges non-deterministically processes that are output of the `source` process.
   */
  def mergeN[A](source: Process[Task, Process[Task, A]])
    (implicit S: Strategy = Strategy.DefaultStrategy): Process[Task, A]
}

Please note the Strategy which corresponds to the way Tasks will be executed and that can be compared to Scala ExecutionContext.

Fantastic, let’s plug it on our server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// The server that echoes everything
def serverEcho(address: InetSocketAddress): Process[Task, Bytes] = {

  // This is a Writer1 that echoes everything it receives to the client and emits it locally
  def echoAll: Writer1[Bytes, Bytes, Bytes] =
    receive1[Bytes, Bytes \/ Bytes] { i =>
      // echoes on left, emits on right and then loop (fby = followed by)
      emitSeq( Seq(\/-(i), -\/(i)) ) fby echoAll
    }

  // The server that echoes everything
  val receivedData: Process[Task, Process[Task, Bytes]] =
    for {
      client <- nio.server(address)
      rcv    <- ex.readThrough(echoAll).run()
    } yield rcv
  }

  // Merges all client streams
  merge.mergeN(receivedData)
}

Finally, we have a server and a client!!!!!

Let’s plug them all together

Run a server

First of all, we need to create a server that can be stopped when required.

Let’s do in the scalaz-stream way using:

  • wye.interrupt :
1
2
3
4
5
6
/**
   * Let through the right branch as long as the left branch is `false`,
   * listening asynchronously for the left branch to become `true`.
   * This halts as soon as the right branch halts.
   */
  def interrupt[I]: Wye[Boolean, I, I]
  • async.signal which is a value that can be changed asynchronously based on 2 APIs:
1
2
3
4
5
6
7
8
9
10
/**
   * Sets the value of this `Signal`. 
   */
  def set(a: A): Task[Unit]

  /**
   * Returns the discrete version of this signal, updated only when `value`
   * is changed ...
   */
  def discrete: Process[Task, A]

Without lots of imagination, we can use a Signal[Boolean].discrete to obtain a Process[Task, Boolean] and wye it with previous server process using wye.interrupt. Then, to stop server, you just have to call:

1
signal.set(true)

Here is the full code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// local bind address
val addr = localAddress(12345)

// The stop signal initialized to false
val stop = async.signal[Boolean]
stop.set(false).run

// Create the server controlled by the previous signal
val stoppableServer = (stop.discrete wye echoServer(addr))(wye.interrupt)

// Run server in async without taking care of output data
stopServer.runLog.runAsync( _ => ())

// DO OTHER THINGS

// stop server
stop.set(true)

Run server & client in the same code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// local bind address
val addr = localAddress(12345)

// the stop signal initialized to false
val stop = async.signal[Boolean]
stop.set(false).run

// Create the server controlled by the previous signal
val stoppableServer = (stop.discrete wye serverEcho(addr))(wye.interrupt)

// Run server in async without taking care of output data
stoppableServer.runLog.runAsync( _ => ())

// create a client that sends 1024 random bytes
val dataArray = Array.fill[Byte](1024)(1)
scala.util.Random.nextBytes(dataArray)
val clientOutput = clientEcho(addr, Bytes.of(dataArray))

// Consume all received data in a blocking way...
val result = clientOutput.runLog.run

// stop server
stop.set(true)

Naturally you rarely run the client & server in the same code but this is funny to see our easily you can do that with scalaz-stream as you just manipulate Process run on provided Strategy


Finally, we can go back to our subject: feeding a DStream using a scalaz-stream NIO client/server

Pipe server output to DStream

clientEcho/serverEcho are simple samples but not very useful.

Now we are going to use a custom client/server I’ve written for this article:

  • NioClient.sendAndCheckSize is a client streaming all emitted data of a Process[Task, Bytes] to the server and checking that the global size has been ack’ed by server.
  • NioServer.ackSize is a server acknowledging all received packets by their size (as a 4-bytes Int)

Now let’s write a client/server dstreamizing data to Spark:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// First create a streaming context
val ssc = new StreamingContext(clusterUrl, "SparkStreamStuff", Seconds(1))

// Local bind address
val addr = localAddress(12345)

// The stop signal initialized to false
val stop = async.signal[Boolean]
stop.set(false).run

// Create the server controlled by the previous signal
val stoppableServer = (stop.discrete wye NioServer.ackSize(addr))(wye.interrupt)

// Create a client that sends a natural integer every 50ms as a string (until reaching 100)
val clientData: Process[Task, Bytes] = naturalsEvery(50 milliseconds).take(100).map(i => Bytes.of(i.toString.getBytes))
val clientOutput = NioClient.sendAndCheckSize(addr, clientData)

// Dstreamize the server into the streaming context
val (consumer, dstream) = dstreamize(stoppableServer, ssc)

// Prepare dstream output
dstream.map( bytes => new String(bytes.toArray) ).print()

// Start the streaming context
ssc.start()

// Run the server just for its effects
consumer.run.runAsync( _ => () )

// Run the client in a blocking way
clientOutput.runLog.run

// Await SSC termination a bit
ssc.awaitTermination(1000)

// stop server
stop.set(true)

// stop the streaming context
ssc.stop()

When run, it prints :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
-------------------------------------------
Time: 1395049304000 ms
-------------------------------------------

-------------------------------------------
Time: 1395049305000 ms
-------------------------------------------
0
1
2
3
4
5
6
7
8
9
...

-------------------------------------------
Time: 1395049306000 ms
-------------------------------------------
20
21
22
23
24
25
26
27
28
29
...

Until 100…



Part2’s conclusion

I spent this second part of my tryptic mainly explaining a few concepts of the new scalaz-stream brand new NIO API. With it, a client becomes just a stream of exchanges Process[Task, Exchange[I, W]] and a server becomes a stream of stream of exchanges Process[Task, Process[Task, Exchange[I, W]]].

As soon as you manipulate Process, you can then use the dstreamize API exposed in Part 1 to pipe streamed data into Spark.

Let’s go to Part 3 now in which we’re going to do some fancy Machine Learning training with these new tools.



GO TO PART1 < —————————————————————————–> GO TO PART3






Synopsis

The code & sample apps can be found on Github

Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…

Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.

So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:

  1. Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
  2. Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
  3. Train Spark-ML recommendation-like model using NIO client/server (Part 3)
  4. Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)

[Part 1/3] From Scalaz-Stream Process to Spark DStream



Reminders on Process[Task, T]

Scalaz-stream Process[Task, T] is a stream of T elements that can interleave some Tasks (representing an external something doing somewhat). Process[Task, T] is built as a state machine that you need to run to process all Task effects and emit a stream of T. This can manage both continuous or discrete, finite or infinite streams.

I restricted to Task for the purpose of this article but it can be any F[_].


Reminders on DStream[T]

Spark DStream[T] is a stream of RDD[T] built by discretizing a continuous stream of T. RDD[T] is a resilient distributed dataset which is the ground data-structure behind Spark for distributing in-memory batch/map/reduce operations to a cluster of nodes with fault-tolerance & persistence.

In a summary, DStream slices a continuous stream of T by windows of time and gathers all Ts in the same window into one RDD[T]. So it discretizes the continuous stream into a stream of RDD[T]. Once built, those RDD[T]s are distributed to Spark cluster. Spark allows to perform transform/union/map/reduce/… operations on RDD[T]s. Therefore DStream[T] takes advantage if the same operations.

Spark-Streaming also persists all operations & relations between DStreams in a graph. Thus, in case of fault in a remote node while performing operations on DStreams, the whole transformation can be replayed (it also means streamed data are also persisted).

Finally, the resulting DStream obtained after map/reduce operations can be output to a file, a console, a DB etc…

Please note that DStream[T] is built with respect to a StreamingContext which manages its distribution in Spark cluster and all operations performed on it. Moreover, DStream map/reduce operations & output must be scheduled before starting the StreamingContext. It could be somewhat compared to a state machine that you build statically and run later.


From Process[Task, T] to RDD[T]

You may ask why not simply build a RDD[T] from a Process[Task, T] ?

Yes sure we can do it:

1
2
3
4
5
6
7
8
9
// Initialize Spark Context
implicit scc = new SparkContext(...)

// Build a process
val p: Process[Task, T] = ...

// Run the process using `runLog` to aggregate all results
// and build a RDD using spark context parallelization
val rdd = sc.parallelize(p.runLog.run)

This works but what if this Process[Task, T] emits huge quantity of data or is infinite? You’ll end in a OutOfMemoryException

So yes you can do it but it’s not so interesting. DStream seems more natural since it can manage stream of data as long as it can discretize it over time.



From Process[Task, T] to DStream[T]


Pull from Process[Task, T], Push to DStream[T] with LocalInputDStream

To build a DStream[T] from a Process[Task, T], the idea is to:

  • Consume/pull the T emitted by Process[Task, O],
  • Gather emitted T during a window of time & generate a RDD[T] with them,
  • Inject RDD[T] into the DStream[T],
  • Go to next window of time…

Spark-Streaming library provides different helpers to create DStream from different sources of data like files (local/HDFS), from sockets…

The helper that seemed the most appropriate is the NetworkInputDStream:

  • It provides a NetworkReceiver based on a Akka actor to which we can push streamed data.
  • NetworkReceiver gathers streamed data over windows of time and builds a BlockRDD[T] for each window.
  • Each BlockRDD[T] is registered to the global Spark BlockManager (responsible for data persistence).
  • BlockRDD[T] is injected into the DStream[T].

So basically, NetworkInputDStream builds a stream of BlockRDD[T]. It’s important to note that NetworkReceiver is also meant to be sent to remote workers so that data can be gathered on several nodes at the same time.

But in my case, the data source Process[Task, T] run on the Spark driver node (at least for now) so instead of NetworkInputDStream, a LocalInputDStream would be better. It would provide a LocalReceiver based on an actor to which we can push the data emitted by the process in an async way.

LocalInputDStream doesn’t exist in Spark-Streaming library (or I haven’t looked well) so I’ve implemented it as I needed. It does exactly the same as NetworkInputDStream without the remoting aspect. The current code is there


Process vs DStream ?

There is a common point between DStream and Process: both are built as state machines that are passive until run.

  • In the case of Process, it is run by playing all the Task effects while gathering emitted values or without taking care of them, in blocking or non-blocking mode etc…

  • In the case of DStream, it is built and registered in the context of a SparkStreamingContext. Then you must also declare some outputs for the DStream like a simple print, an HDFS file output, etc… Finally you start the SparkStreamingContext which manages everything for you until you stop it.

So if we want to adapt a Process[Task, T] to a DStream[T], we must perform 4 steps (on the Spark driver node):

  • build a DStream[T] using LocalInputDStream[T] providing a Receiver in which we’ll be able to push asynchronously T.
  • build a custom scalaz-stream Sink[Task, T, Unit] in charge of consuming all emitted data from Process[Task, T] and pushing them using previous Receiver.
  • pipe the Process[Task, T] to this Sink[Task, T, Unit] & when Process[Task, T] has halted, stop previous DStream[T]: the result of this pipe operation is a Process[Task, Unit] which is a pure effectful process responsible for pushing T into the dstream without emitting anything.
  • return previous DStream[T] and effectful consumer Process[Task, Unit].

dstreamize implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def dstreamize[T : ClassTag](
  p: Process[Task, T],
  ssc: StreamingContext,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): (Process[Task, Unit], ZparkInputDStream[T]) = {

  // Build a custom LocalInputDStream
  val dstream = new ZparkInputDStream[T](ssc, storageLevel)

  // Build a Sink pushing into dstream receiver
  val sink = receiver2Sink[T](dstream.receiver.asInstanceOf[ZparkReceiver[T]])

  // Finally pipe the process to the sink and when finished, closes the dstream
  val consumer: Process[Task, Unit] =
    (p to sink)
    // when finished, it closes the dstream
    .append ( eval(Task.delay{ dstream.stop() }) )
    // when error, it closes the dstream
    .handle { case e: Exception =>
      println("Stopping on error "+e.getMessage)
      e.printStackTrace()
      eval(Task.delay{ dstream.stop() })
    }

  // Return the effectful consumer sink and the DStream
  (consumer, dstream)
}

Please remark that this builds a Process[Task, Unit] and a DStream[T] but nothing has happened yet in terms of data consumption & streaming. Both need to be run now.

Use it…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// First create a streaming context
val ssc = new StreamingContext(clusterUrl, "SparkStreamStuff", Seconds(1))

// Create a data source sample as a process generating a natural every 50ms 
// (take 1000 elements)
val p: Process[Task, Int] = naturalsEvery(50 milliseconds).take(1000)

// Dstreamize the process in the streaming context
val (consumer, dstream) = dstreamize(p, ssc)

// Prepare the dstream operations (count) & output (print)
dstream.count().print()

// Start the streaming context
ssc.start()

// Run the consumer for its effects (consuming p and pushing into dstream)
// Note this is blocking but it could be runAsync too
consumer.run.run

// await termination of stream with a timeout
ssc.awaitTermination(1000)

// stops the streaming context
ssc.stop()

Please note that you have to:

  • schedule your dstream operations/output before starting the streaming context.
  • start the streaming context before running the consumer.

Run it…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
14/03/11 11:32:09 WARN util.Utils: Your hostname, localhost.paris.zenexity.com resolves to a loopback address: 127.0.0.1; using 10.0.24.228 instead (on interface en0)
14/03/11 11:32:09 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
-------------------------------------------
Time: 1394533933000 ms
-------------------------------------------
0

-------------------------------------------
Time: 1394533934000 ms
-------------------------------------------
14

-------------------------------------------
Time: 1394533935000 ms
-------------------------------------------
20

-------------------------------------------
Time: 1394533936000 ms
-------------------------------------------
20

...

Ok cool, we can see a warmup phase at beginning and then windows of 1 sec counting 20 elements which is great since one element every 50ms gives 20 elements in 1sec.



Part 1’s conclusion

Now we can pipe a Process[Task, T] into a DStream[T].

Please not that as we run the Process[Task, T] on the Spark driver node, if this node fails, there is no real way to restore lost data. Yet, LocalInputDStream relies on DStreamGraph & BlockRDDs which persist all DStream relations & all received blocks. Moreover, DStream has exactly the same problem with respect to driver node for now.

That was fun but what can we do with that?

In part2, I propose to have more fun and stream data to DStream using the brand new Scalaz-Stream NIO API to create cool NIO client/server streams…



——————————————————————————————————-> GO TO PART2





The code & sample apps can be found on Github

Today I’m going to write about a Proof of Concept I’ve been working on those last weeks: I wanted to use scalaz-stream as a driver of Spark distributed data processing. This is simply an idea and I don’t even know whether it is viable or stupid. But the idea is interesting!

Introduction

2 of my preferred topics those last months are :

  • Realtime streaming
  • Realtime clustered data processing (in-memory & fault-tolerant)

2 tools have kept running through my head those last months:

  • Scalaz-Stream for realtime/continuous streaming using pure functional concepts: I find it very interesting conceptually speaking & very powerful, specially the deterministic & non-deterministic demuxtiplexers provided out-of-the-box (Tee & Wye).

  • Spark for fast/fault-tolerant in-memory, resilient & clustered data processing.

I won’t speak much about Scalaz-Stream because I wrote a few articles about it.


Let’s focus on Spark.

Spark provides tooling for cluster processing of huge datasets in the same batch mode way as Hadoop, the very well known map/reduce infrastructure. But at the difference of Hadoop which is exclusively relying on HDFS cluster file systems when distributing data through the cluster, Spark tries to cache data in memory as much as possible so that latency of access is reduced as much as possible. Hadoop can scale a lot but is known to be slow in the context of a single node.

Spark is aimed at scaling as much as Hadoop but running faster on each node using in-memory caching. Fault-tolerance & data resilience is managed by Spark too using persistence & redundancy based on any nice storage like HDFS or files or whatever you can plug on Spark. So Spark is meant to be a super fast in-memory, fault-tolerant batch processing engine.


RDD Resilient Distributed Dataset

The basic concept of Spark is Resilient Distributed Dataset aka RDD which is a read-only, immutable data structure representing a collection of objects or dataset that can be distributed across a set of nodes in a cluster to perform map/reduce style algorithms.

The dataset represented by this RDD is partitioned i.e. cut into slices called partitions that can be distributed across the cluster of nodes.

Resilient means these data can be rebuilt in case of fault on a node or data loss. To perform this, the dataset is replicated/persisted across nodes in memory or in distributed file system such as HDFS.

So the idea of RDD is to provide a seamless structure to manage clustered datasets with very simple API in “monadic”-style :

1
2
3
4
5
6
7
8
val sc = new SparkContext(
  "local[4]",
  "Simple App",
  "YOUR_SPARK_HOME",
  List("target/scala-2.10/simple-project_2.10-1.0.jar")
)

val logData = sc.textFile(logFile, 2).cache().filter(line => line.contains("a")).map( _ + "foo" ).count()

Depending on your SparkContext configuration, Spark takes in charge of distributing behind the curtain your data to the cluster nodes to perform the required processing in a fully distributed way.

One thing to keep in mind is that Spark distributes data to remote nodes but it also distributes the code/closures remotely. So it means your code has to be serializable which is not the case of scalaz-stream in its current implementation.


Just a word on Spark code

As usual, before using Spark in any big project, I’ve been diving in its code to know whether I can trust this project. I must say I know Spark’s code better than its API ;)

I find Spark Scala implementation quite clean with explicit choices of design made clearly in the purpose of performance. The need to provide a compatible Java/Python API and to distribute code across clustered nodes involves a few restrictions in terms of implementation choices. Anyway, I won’t criticize much because I wouldn’t have written it better and those people clearly know what they do!


Spark Streaming

So Spark is very good to perform fast clustered batch data processing. Yet, what if your dataset is built progressively, continuously, in realtime?

On top of the core module, Spark provides an extension called Spark Streaming aiming at manipulating live streams of data using the power of Spark.

Spark Streaming can ingest different continuous data feeds like Kafka, Flume, Twitter, ZeroMQ or TCP socket and perform high-level operations on it such as map/reduce/groupby/window/…


DStream

The core data structure behind Spark Streams is DStream for Discretized Stream (and not distributed).

Discretized means it gets a continuous stream of data and makes it discrete by slicing it across time and wrapping those sliced data into the famous RDD described above.

A DStream is just a temporal data partitioner that can distribute data slices across the cluster of nodes to perform some data processing using Spark capabilities.

Here is the illustration in official Spark Stream documentation:

streaming-dstream

DStream also tries to leverage Spark automated persistence/caching/fault-tolerance to the domain of live streaming.

DStream is cool but it’s completely based on temporal aspects. Imagine you want to slice the stream depending on other criteria, with DStream, it would be quite hard because the whole API is based on time. Moreover, using DStream, you can discretize a dataflow but you can’t go in the other way and make it continuous again (in my knowledge). This is something that would be cool, isn’t it?

If you want to know more about DStream discretization mechanism, have a look at the official doc.


As usual, I’m trying to investigate the edge-cases of concepts I like. In general, this is where I can test the core design of the project and determine whether it’s worth investigating in my every-day life.


Driving Spark Streams with Scalaz-Stream

I’ve been thinking about scalaz-stream concepts quite a lot and scalaz-stream is very good at manipulating continuous streams of data. Moreover, it can very easily partition a continuous stream regrouping data into chunks based on any criteria you can imagine.

Scalaz-stream represents a data processing algorithm as a static state machine that you can run when you want. This is the same idea behind map/reduce Spark API: you build your chain of map/filter/window and finally reduce it. Reducing a spark data processing is like running a scalaz-stream machine.

So my idea was the following:

  • build a continuous stream of data based on scalaz-stream Process[F, O]
  • discretize the stream Process[F, O] => Process[F, RDD[O]]
  • implement count/reduce/reduceBy/groupBy for Process[F, RDD[O]]
  • provide a continuize method to do Process[F, RDD[O]] => Process[F, O]

So I’ve been hacking between Scalaz-stream Process[F, O] & Spark RDD[O] and here is the resulting API that I’ve called ZPark-ZStream (ZzzzzzPark-Zzzzztream).

Let’s play a bit with my little alpha API.


Discretization by simple slicing

Let’s start with a very simple example.

Take a simple finite process containing integers:

1
val p: Process[Task, Long] = Process(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L, 5L, 5L, 6L, 6L)

Now I want to slice this stream of integer by slices of 4 elements.

First we have to create the classic Spark Streaming context and make it implicit (needed by my API).

Please remark that I could plug existing StreamingContext on my code without any problem.

1
2
val clusterUrl = "local[4]"
implicit ssc = new StreamingContext(clusterUrl, "SparkSerial", Seconds(1))

Then let’s parallelize the previous process :

1
2
3
val prdd: Process[Task, RDD[Long]] = p.parallelize(4)
// type is just there to show what scalac will infer
// Just to remind that Task is the Future equivalent in Scalaz

Ok folks, now, we have a discretized stream of Long that can be distributed across a Spark cluster.

DStream provides count API which count elements on each RDD in the stream.

Let’s do the same with my API:

1
val pcount: Process[Task, RDD[Int]] = prdd.countRDD()

What happens here? The `count operation on each RDD in the stream is distributed across the cluster in a map/reduce-style and results are gathered.

Ok that’s cool but you still have a discretized stream Process[Task, RDD[Int]] and that’s not practical to use to see what’s inside it. So now we are going to re-continuize it and make it a Process[Task, Int] again.

1
val pfinal: Process[Task, Int] = pcount.continuize()

Easy isn’t it?

All together :

1
2
3
4
5
val p =
  Process(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L)
  .parallelize(4)
  .countRDD()
  .continuize()

Let’ print the result in the console

1
2
3
4
5
6
7
8
def stdOutLines[I]: Sink[Task, I] =
  Process.constant{ (s: I) => Task.delay { println(s" ----> [${System.nanoTime}] *** $s") }}

(p through stdOutLines).run.run
// 1 run for the process & 1 run for the Task

 ----> [1392418478569989000] *** 4
 ----> [1392418478593226000] *** 4

Oh yes that works: in each slice of 4 elements, we actually have 4 elements! Reassuring ;)

Let’s do the same with countByValue:

1
2
3
4
5
6
7
8
9
10
11
12
13
val p =
  Process(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L)
  .parallelize(4)
  .countRDDByValue()
  .continuize()

(p through stdOutLines).run.run
// 1 run for the process & 1 run for the Task

 ----> [1392418552751011000] *** (1,2)
 ----> [1392418552751176000] *** (2,2)
 ----> [1392418552770527000] *** (4,2)
 ----> [1392418552770640000] *** (3,2)

You can see that 4 comes before 3. This is due to the fact the 2nd slice of 4 elements (3,3,4,4) is converted into a RDD which is then partitioned and distributed across the cluster to perform the map/reduce count operation. So the order of return might be different at the end.

An example of map/reduce ?

1
2
3
4
5
6
7
8
9
10
val p =
  Process(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L)
  .parallelize(4)
  .mapRDD(_ + 1L)
  .reduceRDD(_ + _)
  .continuize()

(p through stdOutLines).run.run
 ----> [1392418619885745000] *** 10 (2+2+3+3)
 ----> [1392418619905817000] *** 18 (4+4+5+5)

Please note that:

1
p mapRDD f === p.map{ rdd => rdd map f }

Discretization by time slicing

Now we could try to slice according to time in the same idea as DStream

First of all, let’s define a continuous stream of positive integers:

1
2
3
4
5
6
def naturals: Process[Task, Int] = {
  def go(i: Int): Process[Task, Int] =
    Process.await(Task.delay(i)){ i => Process.emit(i) ++ go(i+1) }

  go(0)
}

Now, I want integers to be emitted at a given tick for example:

1
2
def naturalsEvery(duration: Duration): Process[Task, Int] =
  (naturals zipWith Process.awakeEvery(duration)){ (i, b) => i }

Then, let’s discretize the continuous stream with ZPark-Ztream API:

1
2
val p: Process[Task, RDD[Int]] =
  naturalsEvery(10 milliseconds).discretize(500 milliseconds)

The stream is sliced in slice of 500ms and all elements emitted during these 500ms are gathered in a Spark RDD.

On this stream of RDD, we can applycountRDD` as before and finally re-continuize it. All together we obtain:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
val p =
  naturalsEvery(10 milliseconds)
  .take(5000)  // takes only 5000 because an infinite stream is hard to log in an article
  .discretize(500 milliseconds)
  .countRDD()
  .continuize()

(p through stdOutLines).run.run

 ----> [1392395213389954000] *** 47
 ----> [1392395213705505000] *** 28
 ----> [1392395214191637000] *** 47
 ----> [1392395214688724000] *** 48
 ----> [1392395215189453000] *** 45
 ----> [1392395215697655000] *** 48
 ----> [1392395240677357000] *** 50
 ----> [1392395241175632000] *** 49
 ----> [1392395241674446000] *** 50
 ----> [1392395242175416000] *** 50
 ----> [1392395242675183000] *** 50
 ----> [1392395243177056000] *** 50
 ----> [1392395243676848000] *** 49
 ----> [1392395244175938000] *** 49
 ----> [1392395244676315000] *** 50
 ----> [1392395245175042000] *** 50
 ----> [1392395245677394000] *** 50
 ...

Approximatively we have 50 elements per slice which looks like what we expected.

Please note that there is a short period of warmup where values are less homogenous.


Discretization by time slicing keeping track of time

DStream keeps track of all created RDD slices of data (following Spark philosophy to cache as much as possible) and allows to do operation of windowing to redistribute RDD.

With ZPark API, you can write the same as following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val p =
  naturalsEvery(10 milliseconds)
  .take(500)
  .discretizeKeepTime(500 milliseconds)
  .windowRDD(1000 milliseconds)
  .map { case (time, rdd) =>
    (time, rdd.count())
  }

(p through stdOutLines).run.run

 ----> [1392397573066484000] *** (1392397571981061000,68)
 ----> [1392397574069315000] *** (1392397572981063000,85)
 ----> [1392397575058895000] *** (1392397573981072000,87)
 ----> [1392397576059640000] *** (1392397574981078000,89)
 ----> [1392397577069518000] *** (1392397575981086000,89)
 ----> [1392397577538941000] *** (1392397576981095000,82)

We can see here that final interval haven’t 100 elements as we could expect. This is still a mystery to me and I must investigate a bit more to know where this differences comes from. I have a few ideas but need to validate.

Anyway, globally we get 500 elements meaning we haven’t lost anything.


Mixing scalaz-stream IO & Spark streaming

Playing with naturals is funny but let’s work with a real source of data like a file.

It could be anything pluggable on scalaz-stream like kafka/flume/whatever as DStream provides…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val p =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => line.toDouble)
    .discretize(100 milliseconds)
    .mapRDD { x => (x, 1L) }
    .groupByKey()
    .mapRDD { case (k, v) => (k, v.size) }
    .continuize()

(p through stdOutLines).run.run

 ----> [1392398529009755000] *** (18.0,23)
 ----> [1392398529010064000] *** (19.0,22)
 ----> [1392398529010301000] *** (78.0,22)
 ----> [1392398529010501000] *** (55.3,22)
 ----> [1392398529010700000] *** (66.0,22)
 ----> [1392398529010892000] *** (64.0,22)
...

Infusing tee with RDD Processes

Is it possible to combine RDD Processes using scalaz-stream ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
val p0 = naturalsEvery(100 milliseconds).take(50).discretize(250 milliseconds)
val p1 = naturalsEvery(100 milliseconds).take(50).discretize(250 milliseconds)
val p =
 (p0 zipWith p1){ (a,b) =>
   new org.apache.spark.rdd.UnionRDD(ssc.sparkContext, Seq(a,b))
 }.countRDDByValue()
  .continuize()

(p through stdOutLines).run.run

 ----> [1392412464151650000] *** (0,2)
 ----> [1392412464151819000] *** (1,2)
 ----> [1392412464230343000] *** (2,2)
 ----> [1392412464230528000] *** (3,1)
 ----> [1392412464477775000] *** (4,2)
 ----> [1392412464477921000] *** (5,2)
 ----> [1392412464478034000] *** (6,2)
 ----> [1392412464478143000] *** (3,1)
 ----> [1392412464726860000] *** (8,2)
 ----> [1392412464727039000] *** (7,2)
 ----> [1392412464975370000] *** (9,2)
 ----> [1392412464975511000] *** (10,2)
 ----> [1392412464975620000] *** (11,2)
 ----> [1392412465224087000] *** (12,2)
 ----> [1392412465224227000] *** (13,2)
 etc...

Please note that I drive Spark RDD stream with Scalaz-Stream always remains on the driver node and is never sent to a remote node as map/reduce closures are in Spark. So Scalaz-stream is used a stream driver in this case. Moreover, Scalaz Process isn’t serializable in its current implementation so it wouldn’t be possible as is.



What about persistence & fault tolerance?

After discretizing a process, you can persist each RDD :

1
p.discretize(250 milliseconds).mapRDD { _.persist() }

Ok but DStream does much more trying to keep in-memory every RDD that is generated and potentially persist it across the cluster. This makes things stateful & mutable which is not the approach of pure functional API like scalaz-stream. So, I need to think a bit more about this persistence topic which is huge.

Anyway I believe I’m currently investigating another way of manipulating distributed streams than DStream.



Conclusion

Spark is quite amazing and easy to use with respect to the complexity of the subject.

I was also surprised to be able to use it with scalaz-stream so easily.

I hope you liked the idea and I encourage you to think about it and if you find it cool, please tell it! And if you find it stupid, please tell it too: this is still a pure experiment ;)

Have a look at the code on Github.

Have distributed & resilient yet continuous fun!