Futures

Akka Wamp provides you with

All operations are provided as direct method calls returning composable futures. You can write your client applications, in either Scala or Java, so to connect to routers, open sessions to attach realms, publish or subscribe to topics, consume events, register or call remote procedures and handle invocations in few liens of code.

Note

This shall be considered an higher level API when compared to Akka Wamp Actors as it doesn’t require you to know anything about how WAMP Messages are exchanged by peers.

Client

Clients are those peers that, indirectly, communicate each other through a router. The Akka Wamp client instance can be created as follows:

Scala
import akka.actor._
import akka.wamp.client._
import com.typesafe.config._

class FuturesScalaClient {

  val config = ConfigFactory.load("myapp.conf")
  val system = ActorSystem("myapp", config)
  val client = Client(system)
  // ...
}
Java
import akka.actor.*;
import akka.wamp.client.japi.*;
import com.typesafe.config.*;

public class FuturesJavaClient {
  public FuturesJavaClient() {
    Config config = ConfigFactory.load("myapp.conf");
    ActorSystem system = ActorSystem.create("myapp", config);
    Client client = Client.create(system);
    // ...
  }
}

Just invoke the Client factory method and pass the following arguments:

  • system: ActorSystem
    Is the Akka Actor System the client needs to spawn actors and provide execution context to futures.

Connections

Clients that wish to communicate each other shall connect to the same router. The Akka Wamp client can connect to a router as follows:

Scala
import scala.concurrent.Future
val conn: Future[Connection] = client.connect("myrouter")
Java
// import java.util.concurrent.CompletionStage;
CompletionStage<Connection> conn = client.connect("myrouter");

Just invoke the connect method and pass the following arguments:

  • endpoint: String
    Is the name of a configured endpoint (default is "local"). Read the configuration section further below.

or the following arguments:

  • address: String
    Is the address to connect to (e.g. "wss://hostname:8433/router")

  • format: String
    Is the format of messages as exchanged the wire (e.g. "msgpack")

Configuration

// akka.wamp.client {

# Named endpoint configurations
#
endpoint {
  local {
    # Router address to connect to. Scheme can be:
    #
    #   - tcp
    #       Raw TCP
    #   - tsl
    #       Transport Secure Layer
    #   - ws
    #       WebSocket
    #   - wss
    #       WebSocket over TLS
    #
    address = "ws://localhost:8080/wamp"

    # Message format can be:
    #
    #   - json
    #     JSON Javascript Object Notation
    #
    #   - msgpack
    #     Message Pack
    #
    format = "json"
  }
  # Additional endpoint configurations here ...
  #
  # Each endpoint configuration will use the above "local"
  # configuration settings as fallback.
  #
  #  address = "wss://127.0.0.1:8443/wamp"
  #}
}

# The minimum (initial) duration until the connector actor
# will be started again, if it is terminated because of failures
#
min-backoff = 3 seconds

# The maximum exponential back-off will be capped to
#
max-backoff = 30 seconds

# After calculation of the exponential back-off an additional
# random delay based on this factor is added, e.g. 0.2 adds
# up to 20% delay.
#
random-factor = 0.2

# The boolean switch to validate against strict URIs
# rather than loose URIs
#
validate-strict-uris = false

# NOTE
# Clients will always disconnect on offending messages
# No configuration setting is provided to change this behaviour.
#

Disconnect

The Akka Wamp client makes a distiction between deliberate and accidental disconnections. In either cases, any action performed in disconnected state will make the client throw ClientException("Disconnected")

Deliberate

Disconnection is requested on purpose as follows:

Scala
val disconnected: Future[Disconnected] = conn.flatMap(c => c.disconnect())
Java
CompletionStage<Disconnected> disconnected = conn.thenCompose(c -> c.disconnect());

Just invoke the disconnect method.

Accidental

Disconnection is not requested but suddenly happens (for example on mobile devices connected via wireless networks). The Akka Wamp client does not provide any mechanism to recover from this state. The connection object becomes useless and a new connection must be established.

Warning

Please join the ongoing Session Resumption discussion as some different behaviour proposals are under review.

Sessions

A realm is a routing and administrative domain, optionally protected by authentication and authorization, that holds subscriptions to topics and registrations of procedures for all clients attached to it.

A session is a transient conversation between a client and a router, running over a transport connection, that starts when the client requests to be attached to a specific realm. Attaching to a realm is also referred as “opening a session”

Open

Once got a (future of) connection, open a session over it so to attach the client to a specific realm.

Scala
// val conn: Future[Connection] = ...
val session: Future[Session] = conn.flatMap(c => c.open("myrealm"))
Java
// CompletionStage<Connection> conn =
CompletionStage<Session> session = conn.thenCompose(c -> c.open("myrealm"));

Just invoke the open method passing the following arguments:

  • realm: Uri
    Is the realm name (default is "default")

Close

Once the client doesn’t need to keep the session attached, it can close it as follows:

Scala
val close: Future[Closed] = session.flatMap(s => s.close())
Java
CompletionStage<Closed> closed = session.thenCompose(s -> s.close());

Just invoke the close method.

Topics

The client either publish events or subscribe to topics.

Publish

Once got a (future of) session, the client can publish an event to a topic with either “fire and forget” or “acknowldeged” pattern.

Scala
// fire and forget
session.map(s => s.publish("mytopic"))

// with acknowledge
val publication: Future[Publication] =
  session.flatMap(s => s.publishAck("mytopic"))
Java
// fire and forget
session.thenAccept(s -> s.publish("mytopic"));

// with acknowledge
CompletionStage<Publication> publication =
  session.thenCompose(s -> s.publishAck("mytopic"));

Just invoke any the following overloaded methods:

  • publish
    It publishes in “fire and forget” pattern and returns no indication of what happened (neither failures).

    • topic: Uri
      Is the topic to publish to.

    • payload: Payload
      Is the outgoing event payload. Please refer to the Payloads section for further details.

  • publishAck
    It publishes with “acknowledged” pattern so to return a (future of) publication. It accepts the same arguments as above.

When publishing with “acknowledged” pattern the client can provide callbacks to be invoked upon future completion so to test against success or failure.

Scala
publication.onComplete {
  case Success(pb) =>
    log.info(s"Published with ${pb.id}")
  case Failure(ex) =>
    log.error(ex.getMessage, ex)
}
Java
publication.whenComplete((pb, ex) -> {
  if (pb != null)
    log.info("Published with {}", pb.id());
  else
    log.error(ex.getMessage(), ex);
});

Subscribe

Once got a (future of) session, the client can subscribe an lambda consumer to a topic as follows:

Scala
// val consumer: (Event) => Unit  = ...
val subscription: Future[Subscription] =
  session.flatMap(s => s.subscribe("mytopic", consumer))
Java
// Consumer<Event> consumer = ...;
CompletionStage<Subscription> subscription =
    session.thenCompose(s -> s.subscribe("mytopic", consumer));

Just invoke the subscribe method with the following arguments:

  • topic: Uri
    Is the topic to subscribe to

  • consumer
    Is a consumer callback as explained further below.

The client can provide callbacks to be invoked upon future completion so to test against success or failure.

Scala
subscription.onComplete {
  case Success(sb) =>
    log.info(s"Subscribed to ${sb.topic} with ${sb.id}")
  case Failure(ex) =>
    log.error(ex.getMessage, ex)
}
Java
subscription.whenComplete((sb, ex) -> {
  if (sb != null)
    log.info("Subscribed to {} with {}", sb.topic(), sb.id());
  else
    log.error(ex.getMessage(), ex);
});

Consumer

The client can subscribe any callback function given either as an event consumer or as lambda consumer.

Please note that, as this API is build atop of Akka Wamp Actors, your callback function will be invoked in the same thread which delivers the Event message from underlying actor’s mailbox. Therefore, it is safe to close your callback over free variables as there’s no risk to have multiple threads executing the handler at the same time.

Lambda Consumer

The client can subscribe a lambda consumer that accepts as many parameters as it would expect to be conveyed by incoming events.

Scala
val subscription: Future[Subscription] =
  session.flatMap { implicit s =>
    subscribe("mytopic", (name: String, age: Int) => {

      // do something with arguments ...
    })
  }

Please refer to the Macros section for further details about how to access arguments conveyed by incoming events.

Warning

Lambda consumers are supported for Scala only

Event Consumer

The client can subscribe an event consumer as a function that accepts exactly one argument of type Event and returns (future of) Done.

Scala
var freeVar: Long = 0
val consumer: (Event) => Unit =
  event => {
    val publicationId = event.publicationId
    val subscriptionId = event.subscriptionId
    val details = event.details
    val args = event.args
    val kwargs = event.kwargs

    // so something with incoming args and free vars ...
  }
Java
Long freeVar = 0l;
Consumer<Event> consumer =
  event -> {
    Long publicationId = event.publicationId();
    Long subscriptionId = event.subscriptionId();
    Map<String, Object> details = event.details();
    List<Object> args = event.args();
    Map<String, Object> kwargs = event.kwargs();

    // so something with incoming args and free vars ...
  };

Please refer to the Payloads section for details about how to access arguments conveyed by incoming events.

Unsubscribe

Once got a (future of) subscription, the client can unsubscribe from it.

Scala
val unsubscribed: Future[Unsubscribed] =
  subscription.flatMap(s => s.unsubscribe())
Java
CompletionStage<Unsubscribed> unsubscribed =
  subscription.thenCompose(s -> s.unsubscribe());

Just invoke the unsubscribe method.

Procedures

The client can either call or register remote procedures.

Call

Once got a (future of) session, the client can call a remote procedure as follows

Scala
val result: Future[Result] =
  session.flatMap(s => s.call("myprocedure", List("paolo", 99)))
Java
CompletionStage<Result> result =
  session.thenCompose(s -> s.call("myprocedure", asList("paolo", 99)));

Just invoke the call method with the following arguments:

  • procedure: Uri
    Is the remote procedure name to call

  • args
    Are the arguments to provide the invocation with

The client can provide callbacks to be invoked upon future completion so to test against success or failure.

Scala
result.onComplete {
  case Success(res) =>
    log.info("Result is {}", res)
  case Failure(ex) =>
    log.error(ex.getMessage, ex)
}
Java
result.whenComplete((res, ex) -> {
  if (res != null)
    log.info("Result: {}", res);
  else
    log.error(ex.getMessage(), ex);
});

Register

Once got a (future of) session, the client can register a local invocation handler as endpoint of a remote procedure as follows:

Scala
// val handler: (Invocation) => Any = ...
val registration: Future[Registration] =
  session.flatMap(s => s.register("myprocedure", handler))
Java
// Function<Invocation, CompletionStage<Payload>> handler = ...
CompletionStage<Registration> registration =
  session.thenCompose(s -> s.register("myprocedure", handler));

Just invoke the register method with the following arguments:

  • procedure: Uri
    Is the procedure to register.

  • handler
    Is a handler as explained further below.

The client can provide callbacks to be invoked upon future completion to test against success or failure.

Scala
registration.onComplete {
  case Success(sb) =>
    log.info(s"Registered ${sb.procedure} with ${sb.id}")
  case Failure(ex) =>
    log.error(ex.getMessage, ex)
}
Java
registration.whenComplete((reg, ex) -> {
  if (reg != null)
    log.info("Registered with {}", reg.id());
  else
    log.error(ex.getMessage(), ex);
});

Handler

The client can register any callback function given either as an invocation handler or as lambda handler.

Please note that, as this API is build atop of Akka Wamp Actors, your callback function will be invoked in the same thread which delivers the Invocation message from underlying actor’s mailbox. Therefore, it is safe to close your callback over free variables as there’s no risk to have multiple threads executing the handler at the same time.

Lambda Handler

The client can register a lambda handler that accepts as many parameters as you would expect to be conveyed by incoming invocations.

Scala
val registration: Future[Registration] =
  session.flatMap { implicit s =>
    register("myprocedure", (name: String, age: Int) => {

      // do something with arguments ...

    })
  }

Please refer to the Macros section for further details about how to access arguments conveyed by incoming invocations.

Warning

Lambda handlers are supported for Scala only.

Invocation Handler

The client can subscribe an invocation handler as a function that accepts exactly one argument of type Invocation and returns a (future of) Payload.

Scala
val handler: (Invocation) => Any =
  (invocation) => {
    val registrationId = invocation.registrationId
    val details = invocation.details
    val args = invocation.args
    val kwargs = invocation.kwargs

    // do something with arguments ...

    val res = ???
    res
  }
Java
Function<Invocation, Object> handler =
  invocation -> {
    Long registrationId = invocation.registrationId();
    Map<String, Object> details = invocation.details();
    List<Object> args = invocation.args();
    Map<String, Object> kwargs = invocation.kwargs();

    // do something with arguments ...

    Object res = null;
    return res;
  };

Please refer to the Payloads section for futher details about how to access arguments conveyed by incoming invocations.

Unregister

Once got a (future of) registration, the client can unregister from it.

Scala
val unregistered: Future[Unregistered] =
  registration.flatMap(s => s.unregister())
Java
CompletionStage<Unregistered> unregistered =
  registration.thenCompose(r -> r.unregister());

Just invoke the unregister method.

Putting all together

Scala
// TBD
Java
// TBD