Actors

Akka Wamp provides you with:

  • object-oriented representations of WAMP Messages,
  • an Akka I/O extension driver named akka.wamp.Wamp that takes care of low-level network communication with the router,
  • abstract base classes your client actor can extend from.

All operations are implemented with message passing instead of direct method calls. You can write your own client actor, in either Scala or Java, to connect to a router, open a session, publish or subscribe to topics, consume events, process invocations and register or call procedures.

Note

This shall be considered a lower level API when compared to other Akka Wamp Client APIs such as those providing Futures and Streams abstractions.

Client

Scala
import java.net.URI

import akka.actor._
import akka.io._
import akka.wamp._
import akka.wamp.client._
import akka.wamp.messages._

class ActorsScalaClient extends ClientActor with ActorLogging {

  // def receive: Receive = ...
}
Java
import java.net.URI;

import akka.actor.*;
import akka.wamp.*;
import akka.wamp.client.*;
import akka.wamp.messages.*;
import akka.wamp.messages.Error;
import akka.wamp.serialization.*;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;

import static akka.japi.pf.ReceiveBuilder.match;

public class ActorsJavaClient extends AbstractClientLoggingActor {
  public ActorsJavaClient() {
    receive(
    );
  }
}

You client actor starts as “disconnected from router”.

Connect to a router

While in initial state, your client actor sends a Connect command to the Akka Wamp extension manager so to attempt a new connection to the router addressed by the given URI.

Scala
private var router: ActorRef = _


override def preStart(): Unit = {
  implicit val system = context.system
  val manager = IO(Wamp)
  manager ! Connect(new URI("ws://router.net:8080/wamp"), "json")
}

override def receive: Receive = {
  case sig @ CommandFailed(cmd, ex) =>
    // could reattempt connection ...
    
  case sig @ Connected(handler, uri, format) =>
    this.router = handler
    context become connected
}
Java
private ActorRef router;
private PartialFunction<Object, BoxedUnit> connected;

@Override public void preStart() throws Exception {
  ActorRef manager = Wamp.get(getContext().system()).manager();
  manager.tell(new Connect(new URI("ws://router.net:8080/wamp"), "json"), self());
}

public ActorsJavaClient() {
  receive(
    match(CommandFailed.class, sig -> {
      // could reattempt connection ...
    }).
    match(Connected.class, sig -> {
      router = sig.handler();
      context().become(connected);
    }).
    build()
  );
}

Your client receives either the CommandFailed signal from the manager if the connection attempt fails or the Connected signal if it succeeds. In the latter case the manager spawns a new worker actor representing the handler for that specific connection and delivers its reference to your client via the signal.

Upon receiving the Connected signal, your client actor stores the connection handler reference, holding it as the router representative, and switches its state to connected.

In connected state

Your client actor is “connected to a router”.

Handle disconnection

Your connected client actor might receive the Disconnected signal from the manager actor if the connection disconnects. For example, clients on mobile devices could experience accidental disconnections because of wireless networks unavailability. Or, the router could deliberately shutdown.

When disconnection happens, you client actor might keep attempting to establish a new connection, then open a new session and try to restore its subscriptions/registrations.

Open a session

Your client actor sends an Hello message to the router to ask for a new session to be attached to the given realm (for example the default realm).

Scala

private var gen: SessionScopedIdGenerator = _ private var sessionId: Long = _ override def receive: Receive = { case sig @ Connected(handler, uri, format) => this.router = handler context become connected router ! Hello("default") } private def connected: Receive = { case Disconnected => this.sessionId = 0 this.router = null // reattempt connection ... // reopen session ... // restore subscriptions/registrations ... case Abort(details, reason) => this.sessionId = 0 // open new sesson ... case Welcome(sessionId, details) => this.sessionId = sessionId this.gen = new SessionScopedIdGenerator context become open // submit subscriptions/registrations ... }
Java
private PartialFunction<Object, BoxedUnit> open;
private Long sessionId;
private SessionScopedIdGenerator gen;

public ActorsJavaClient() {
  receive(
    match(Connected.class, sig -> {
      router = sig.handler();
      context().become(connected);
      router.tell(new Hello("default", Hello.defaultDetails(), validator()), self());
    }).
    build()
  );

  connected =
    match(Disconnected.class, sig -> {
      this.sessionId = 0L;
      this.router = null;
      // reattempt connection ...
      // open new sesson ...
      // restore subscriptions/registrations ...
    }).
    match(Abort.class, msg -> {
      this.sessionId = 0L;
      // open new sesson ...
    }).
    match(Welcome.class, msg -> {
      this.sessionId = msg.sessionId();
      this.gen = new SessionScopedIdGenerator();
      context().become(open);
      // submit subscriptions/registrations
    }).
    build();
  
}

Your client actor receives either the Abort message or the Welcome message. In the former case, the router rejects the session opening, while in the latter case the router creates a new session and delivers its identifier to your client.

Upon receiving the Welcome message, your client can switch to the open state.

In open state

You client actor now “holds an open session”. In this state it can finally publish or subscribe to topics, process incoming events, register procedures, process incoming invocations, call procedures, etc.

Handle session close

In addition to unexpected disconnections, your client actor might receive the Goodbye message from router if session gets closed for some reason.

If that happens, you client actor might keep attempting to open a new session so to restore its subscriptions/registrations …

Publish to a topic

Scala
private def connected: Receive = {
  case Welcome(sessionId, details) =>
    this.sessionId = sessionId
    this.gen = new SessionScopedIdGenerator
    context become open
    // submit subscriptions/registrations ...
    router ! Publish(gen.nextId(), Publish.defaultOptions, "myapp.topic")
    router ! Publish(gen.nextId(), Publish.defaultOptions.withAcknowledge(true), "myapp.topic")
}
Java
connected =
  match(Welcome.class, msg -> {
    this.sessionId = msg.sessionId();
    this.gen = new SessionScopedIdGenerator();
    context().become(open);
    // submit subscriptions/registrations
    router.tell(new Publish(gen.nextId(), Publish.defaultOptions(), "mytopic", Payload.apply(), validator()), self());
    // TODO tell publish with acknowledgment
  }).
  build();

Your client actor sends a Publish message to the router so to publish some data to the given topic. It might set the acknowledgment flag to ask the router to reply back upon publication.

Your client actor receives either the Error message from router if the publication fails or the Published message if it succeeds (and the acknowledgment flag was set). In the latter case the router creates a new publication and delivers its identifier to your client actor via the published message.

Please refer to the Payloads section for details about reading or writing payload contents.

Subscribe to a topic

Your client actor generate the next request identifier and sends a Subscribe message to the router so to subscribe to a given topic.

Scala
private var requestId: Id = _
private var subscriptionId: Id = _

private def connected: Receive = {
  case Welcome(sessionId, details) =>
    this.sessionId = sessionId
    this.gen = new SessionScopedIdGenerator
    context become open
    // submit subscriptions/registrations ...
    this.requestId = gen.nextId()
    router ! Subscribe(this.requestId, Subscribe.defaultOptions, "myapp.topic")
}

private def open: Receive = {
  case Disconnected =>
    // ...
    
  case Goodbye(details, reason) =>
    this.sessionId = 0
    log.warning(reason)
    // open new sesson ...
    // restore subscriptions/registrations ...

  case Error(Subscribe.tpe, requestId, details, error, _) =>
    if (this.requestId == requestId) {
      log.warning(error)
      self ! PoisonPill
    }
    
  case Subscribed(requestId, subscriptionId) =>
    if (this.requestId == requestId) {
      this.subscriptionId += subscriptionId
      context become subscribed
      // OR become anyElseYouLike
    }
}
Java
private Long requestId = 0L;
private Long subscriptionId = 0L;



private PartialFunction<Object, BoxedUnit> open;
private Long sessionId;
private SessionScopedIdGenerator gen;


public ActorsJavaClient() {
  connected =
    match(Welcome.class, msg -> {
      this.sessionId = msg.sessionId();
      this.gen = new SessionScopedIdGenerator();
      context().become(open);
      // submit subscriptions/registrations
      this.requestId = gen.nextId();
      router.tell(new Subscribe(requestId, Subscribe.defaultOptions(), "mytopic", validator()), self());
    }).
    build();
  
  open =
    match(Disconnected.class, sig -> {
      // ...
    }).
    match(Goodbye.class, msg -> {
      this.sessionId = 0L;
      // open new sesson ...
      // restore subscriptions/registrations ...
    }).
    match(Error.class, msg -> {
      if (msg.requestType() == 33 && this.requestId == msg.requestId()) {
        context().stop(self());
      }
    }).
    match(Subscribed.class, msg -> {
       if (this.requestId == msg.requestId()) {
         this.subscriptionId = msg.subscriptionId();
         context().become(subscribed);
         // OR become anyElseYouLike
       }
    }).
    build();
}

Your client actor receives either the Error message from router if the subscription fails or the Subscribed message if it succeeds. In the latter case the router creates a new subscription and delivers its identifier to your client actor via the subscribed message.

Register a procedure

TBD

Call a procedure

TBD

Consume events

TBD

Process invocations

TBD