You have a device monitor actor. It must poll the device every time an internal expires. When data starts flowing, it flows a lot. Sometimes, however, the device will not have readable data for long periods of time (several seconds), and continuing to read the device on short intervals will take CPU core cycles away from other parts of the actor system that are busy processing the data that has already be taken in. How can we make the device monitor actor be more in tune with the way the device operates?
Have the device monitor actor create, as part of its construction, an instance of CappedBackOffScheduler. This scheduler is used to send the “try read” messages to the monitor actor on intervals. The interval starts out at a minimal 500 milliseconds. On any given probe of the device that results in a successful read, the following interval will be 500 milliseconds. Otherwise, each successive failed read doubles the next interval, but only until a time cap is reached. Here is the CappedBackOffScheduler implementation:
class CappedBackOffScheduler( minimumInterval: Int, maximumInterval: Int, system: ActorSystem, receiver: ActorRef, message: Any) { var interval = minimumInterval def backOff = { interval = interval * 2 if (interval > maximumInterval) interval = maximumInterval schedule } def reset = { interval = minimumInterval schedule } private def schedule = { val duration = Duration.create(interval, TimeUnit.MILLISECONDS) system.scheduler.scheduleOnce(duration, receiver, message) } }
Each time the scheduler is told to backOff, it calculates a new interval, at least until it reaches 15 seconds. When the scheduler is told to reset following each successful read, the interval is (re)set to half a second. Either way, the interval is used to schedule a new “try read” message. The message will be sent to the device monitor once the interval expires.
Here’s how to use the CappedBackOffScheduler:
class DeviceMonitor(device: Device) extends Actor { val scheduler = new CappedBackOffScheduler( 500, 15000, context.system, self, TryRead()) def tryRead = { val data = device.readData(500) if (data.isDefined) { println(s"HIT: ${data.get}") scheduler.reset } else { println(s"MISS") scheduler.backOff } } def receive = { case request: TryRead => tryRead } }
Note that the device readData() attempt itself has a timeout, in this case we allow half a second for the readData() to succeed. This timeout must be carefully considered because the thread currently processing the TryRead message will block until the readData() attempt returns. And here’s some sample output:
HIT: ... HIT: ... HIT: ... MISS MISS HIT: ... HIT: ... HIT: ... MISS HIT: ... HIT: ... HIT: ... HIT: ... MISS MISS MISS MISS HIT: ...