Alexander Chepurnoy

The Web of Mind

Akka-based Data Extraction System Design

| Comments

Introduction

If you have an experience in data extraction systems, you know how hard it could be to develop. You need to implement workers then combine them in error-prone, scalable and flexible system. Sounds like a lot of pain, isn’t it? But with modern painkillers the job could be done much simpler. I mean Akka.

I already used Akka for some real-world systems, including realtime forex data mashup forexnotions.com, domains value estimation system, data gathering systems for SEO parameters, real estate etc. And I want to publish common approach I use in the simplest form.

The Example

Consider real estate data extraction system, where some sources have XML/RSS output, some only HTML. Workers already written, one for each site, so it’s the time to combine them into higher-level logic. Consider, for example, we have 3 sites to get data from, and we want to recrawl them every 30 seconds(too crazy, but it’s just an example).

A worker is derived from base trait BasePropertyExtractor and returns list of properties or exception. Let’s define sample workers as well as sample property bean

case class Property(name:String)
case class ExtractionResult(value: Either[Throwable, List[Property]])

trait BasePropertyExtractor {
    def extractData:ExtractionResult
    def label:String
}

class SiteAExtractor extends BasePropertyExtractor{
    override def extractData = ExtractionResult(Right(List[Property](Property("Nice beachside boongalow"))))
    override def label = "SiteAExtractor"
}

class SiteBExtractor extends BasePropertyExtractor{
    override def extractData = ExtractionResult(Right(List[Property](Property("Awesome apartments"))))
    override def label = "SiteBExtractor"
}

class SiteCExtractor extends BasePropertyExtractor{
    override def extractData = ExtractionResult(Left(new Exception("XML parsing failed")))
    override def label = "SiteCExtractor"
}

Define control signals to be sent to system’s components

case class ExtractionCommand(extractor:BasePropertyExtractor)
object StartParsing

StartExtraction is signal to start whole extraction process, while ExtractionCommand is signal to start concrete extractor

Data extraction actor:

class ExtractingActor extends Actor with akka.actor.ActorLogging {
    override def receive = {
        case ExtractionCommand(extractor:BasePropertyExtractor) =>
        println("Going to extract data by "+extractor)
        sender ! extractor.extractData
    }
}

Database writer actor(in our example it doesn’t write to a database actually, but just prints result to console:

class DbWriterActor extends Actor with akka.actor.ActorLogging {
    override def receive = {
        case p: Property  => println(p)
    }
}

And we’re going to define main actor incapsulating control logic and implementing ScatterGather design pattern(yeah, meet design patterns in actors field):

class ScatterGather extends Actor with akka.actor.ActorLogging {
    context.setReceiveTimeout(29 seconds)

    private val actorsCommands = Map(
        context.actorOf(Props[ExtractingActor]) -> ExtractionCommand(new SiteAExtractor),
        context.actorOf(Props[ExtractingActor]) -> ExtractionCommand(new SiteBExtractor),
        context.actorOf(Props[ExtractingActor]) -> ExtractionCommand(new SiteCExtractor)
    )

    private val dbWriterActor = context.actorOf(Props[DbWriterActor])

    override def receive = {
        case StartExtraction =>
            actorsCommands foreach {
                case (actor, command) => actor ! command
            }

        case result: ExtractionResult => result.value match{
            case Left(t:Throwable) => log.warning("Exception found instead of result: " + t)
            case Right(l:List[Property]) => l foreach {writer ! _}
        }

        case ReceiveTimeout =>
            context.stop(self)
            actorsCommands foreach {_ => context.stop(_)}
            context.stop(dbWriterActor)
    }
}

And launcher

object RealEstateExtractionLauncher extends App {
    import ExecutionContext.Implicits.global
    val system = ActorSystem("RealEstateExample")

    system.scheduler.schedule(0 seconds, 30 seconds){
        val listeningActor = system.actorOf(Props[ScatterGather])
        listeningActor ! StartParsing
    }
}

Complete Code & Output

Complete code is on Github : https://github.com/kushti/blog-examples/blob/master/scala/akka/DataExtractionSystem.scala

Running it you’ll get something like

Going to extract data by SiteCExtractor
Going to extract data by SiteBExtractor
Going to extract data by SiteAExtractor
Property(Awesome apartments)
Property(Nice beachside boongalow)
[WARN] [02/27/2013 13:22:40.331] [RealEstateExample-akka.actor.default-dispatcher-2] [akka://RealEstateExample/user/$a] Exception found instead of result: java.lang.Exception: XML parsing failed
Going to extract data by SiteAExtractor
Going to extract data by SiteCExtractor
Going to extract data by SiteBExtractor
Property(Nice beachside boongalow)
Property(Awesome apartments)
[WARN] [02/27/2013 13:23:10.297] [RealEstateExample-akka.actor.default-dispatcher-7] [akka://RealEstateExample/user/$b] Exception found instead of result: java.lang.Exception: XML parsing failed

Conclusion

In less than 100 lines of code we got fully scalable recurrent data extraction example with simple logging and error handling. And as Akka is a close friend of Play 2.x framework, to say more preciously, Play includes Akka, it’s easy to build Web Application on top of our system.

You can now start to play with remote actors to build distibuted system. Or implement real-world application starting with the example design provided. Or visit “Hire me” section.

Comments