Share on twitter

Answering Some Big Questions About vlingo/PLATFORM

Today Kenny Bastani and I were discussing strategy. Kenny asserted that we need to be able to answer some big questions about how the vlingo/PLATFORM will help developers in 9 essential ways. I here answer the questions that Kenny posed. If you think there are some questions missing, please pass yours along.

1. How do I tap into streams containing event data that I need to access?

Every storage type supported by vlingo/symbio—ObjectStore, StateStore, and Journal—support persisting Source data. A Source is a base type that has three major implementations, (a) DomainEvent, (b) Command, and (c) ProcessMessage. In other words, these are potential sources of facts or intentions. The following is a sample DomainEvent.

public static final class OrganizationDefined extends DomainEvent {
  public final String organizationId;
  public final String name;
  public final String description;
  public static OrganizationDefined with(final OrganizationId organizationId, final String name, final String description) {
    return new OrganizationDefined(organizationId, name, description);
  }
  public OrganizationDefined(final OrganizationId organizationId, final String name, final String description) {
    this.organizationId = organizationId.value;
    this.name = name;
    this.description = description;
  }
}

Concrete subclasses of the DomainEvent type may be persisted along with the state of a persistent object, such as an ObjectEntity, StatefulEntity, or EventSourced entity. The following is an example of and ObjectEntity applying state along with a DomainEvent to cause its transition, which also adds to the overall stream of the service it is part of.

public class OrganizationEntity extends ObjectEntity implements Organization {
  private OrganizationState state;
  public OrganizationEntity(final OrganizationId organizationId) {
    this.state = OrganizationState.from(organizationId);
  }
  @Override
  public Completes<OrganizationState> defineWith(final String name, final String description) {
    return apply(state.defineWith(name, description), new OrganizationDefined(this.state.organizationId, name, description), () -> state);
  }
  ...
}

A Command may also be persisted as part of the stream of a CommandSourced entity or within a ProcessMessage of a ProcessManager. In one way or another all of the concrete Source types together make up a stream of totally ordered facts and intentions, and in the case of EventSourced entities, there will be sub-streams for each entity. All of the storage types provide two means to see streams of Source instances.

The earliest way to ingest Source instances is as they are written into storage. You may register a Dispatcher with each storage mechanism, and that Dispatcher will dispatch persisted state and Source instances after they are safely persisted. Each of the dispatched instances must be confirmed as received by the consumer. If any are not confirmed, they will be re-dispatched until confirmation occurs. This provides is guaranteed at-least-once delivery, but redeliveries are not guaranteed to be in order of occurrence.

This is an example service with a Bootstrap startup.

public class Bootstrap {
  private static Bootstrap instance;
  private final StatefulTypeRegistry registry;
  public final Server server;

  private Bootstrap() {
    world = World.startWithDefaults("my-service");
    registry = new StatefulTypeRegistry(world);
    QueryModelStoreProvider.using(world.stage(), registry);
    CommandModelStoreProvider.using(world.stage(), registry, ProjectionDispatcherProvider.using(world.stage()).storeDispatcher);
    ...
}

The Bootstrap uses the ProjectionDispatcherProvider to start a Dispatcher that is used to project into the CQRS Query Model.

public class ProjectionDispatcherProvider {
  public static ProjectionDispatcherProvider using(final Stage stage) {
    final Protocols dispatcherProtocols =
      stage.actorFor(
              new Class<?>[] { Dispatcher.class, ProjectionDispatcher.class },
              Definition.has(TextProjectionDispatcherActor.class, Definition.parameters(descriptions)));
    ...
}

The second way to ingest Source instances is by reading them individually or in larger batches from storage. This is the best way to get historical instances. A base protocol for this is EntryReader. There are three extensions for this protocol for each of the storage types: ObjectStoreEntryReader, StateStoreEntryReader, and JournalReader.

objectStore = provider.objectStore(world, dispatcher, mappers);
objectStore
  .entryReader("entry-reader")
  .andThenConsume(requestedReader -> entryReader = requestedReader);
...
entryReader
  .readNext(batchSize)
  .forEach(entry -> exchange.send(entryAdapter.fromEntry(entry)));

There are examples of how these are used to read individual and batches of entries.

2. If I build a new service, I may need to access historical stream data to build my query models. How does that work?

This is where you would use one of the EntryReader types listed just above in #1. The vlingo/lattice component provides an Exchange type for interfacing with various queuing and pub-sub mechanisms. One type of vlingo/lattice Exchange is the Feed, which supports building stream-based batches of messages. The vlingo/lattice Feed could be matched up with the vlingo/http FeedResource that serves feeds, including vlingo/lattice Feed streams, over HTTP as follows.

Client -> HTTP -> FeedResource <- Feed
The following is a client’s request for the next 100 elements from the stream’s feed.
final String request = "GET /feeds/events/100" + " HTTP/1.1\nHost: my-service.io\n\n";
client.requestWith(toByteBuffer(request));

This makes it possible to serve historic Source instances or even in near realtime, which would immediately follow persistence.

3. How do I run queries on event streams, once I have access to them?

Currently you would use one of the streaming methods discussed above in #1 or #2 and build a CQRS Query Model. Doing so is supported by vlingo/lattice projections, which are generally handled real-time through a Dispatcher of any of the vlingo/symbio storage types.

We will also introduce streaming components that directly support querying.

4. Who is responsible for exposing stream data?

It’s the service that owns the data that is to be streamed. In the Domain-Driven Design sense this would be the owning Bounded Context.

5. How do I know what an event contains, and at what point the versions of that event have introduced new fields?

This includes the use of vlingo/schemata, which is used to maintain the definitions of schemas for various types of data exchange between services (i.e. Bounded Contexts). Versions are maintained forward and backward compatible by means of semantic versioning and compatibility checks when new versions are registered. Updates with the same major version are required to be compatible with previous versions of the same major version. Updates with a greater major version may deviate and cause breaking changes in dependents. There is a Draft status for schemas that enable dependents to try a new version with possible revisions that would cause them to break, but only until the version is marked as Published.

6. What patterns and practices should I use to publish my own streams of events?

Design the schemas for Data and Events using vlingo/schemata. Open those types to consumption by other services. This is generally by marking the types as Published. Next make the data types and events accessible through a vlingo/lattice Exchange or Feed, which may include using a vlingo/http FeedResource as a query endpoint. These patterns include Publish-Subscribe and Request-Response.

7. How do I ensure that immutable event streams can be scrubbed, should an event contain private data that has been ordered to be deleted?

Generally private, protected, data is stored in a separate object that may be referenced by the event, but is inaccessible to consumers unless they can properly respond to a security challenge to prove their level of privilege. This is a first required step to overall data privacy and security.

There are various ways to scrub data, such as is required by GDPR and other data privacy mandates. I mention two such approaches: (a) The private data is placed in secure storage and never exposed inside events by identity. If the private data is ordered deleted, the secure storage containing the private data is replaced with blank/empty data. The identity references now point to nothing, and consumers must be able to deal with this. (b) The private data is first hashed with a private key that only the service knows. The hashed data is then placed inside events. Only those with rightful access to the hash key are able to access. When data is ordered deleted, the hash key is destroyed, leaving the data unaccessible.

Due care must be take to ensure the laws allow for either of the above two techniques at any given time. It is your responsibility to verify any technique that your business or organization uses.

8. How do I protect my streams from a data breach?

As with any data that is sensitive, personal, private, or otherwise puts its owners in a vulnerable position if inappropriately disclosed, it must be protected behind security mechanisms. This would include firewalls, secure sockets, security routers, and hash-based encryption. See #7 above.

9. How do I make sure that event transport is secure and encrypted from service to service?

The vlingo/common provides the means to hash data using various means of protection, where only those with proper credentials and hash keys may access it.

io.vlingo.common.crypto.Hasher.defaultHasher()
io.vlingo.common.crypto.Argon2Hasher
io.vlingo.common.crypto.BCryptHasher
io.vlingo.common.crypto.SCryptHasher

 

Share on twitter

More to explore

Introducing vlingo/zoom

The easiest way to get up and running quickly with the vlingo/PLATFORM, along with an explanation of our position on open source

Modeling Temporal Occurrences

The inquiry arrived: “Are time lapsed events Domain Events?” The question is the result of wrestling with whether YearEnded and similar occurrences are actually