Share on twitter

The Ideal Domain-Driven Design Aggregate Store?

At the 2014 DDD eXchange in NYC, a park bench discussion developed around storing Aggregates. The consensus among the DDD leadership was against Object-Relational Mapping (ORM) and the desire to come up with a better way to store Aggregates. There were comments about ORM in general being an antiquated approach. While some developers are still new to ORMs, the technology of shoehorning objects into relational databases is more than 20 years old. In 20+ years, why haven’t we found a better way to store Aggregates?

During the park bench discussion I promoted the idea of serializing Aggregates as JSON and storing them in that object notation in a document store. A JSON-based store would enable you to query the object’s fields. Central to the discussion, there would be no need to use an ORM. This would help to keep the Domain Model pure and save days or weeks of time generally spent fiddling with mapping details. Even more, your objects could be designed in just the way your Ubiquitous Language is developed, and without any object-relational impedance mismatch whatsoever. Anyone who has used ORM with DDD knows that the limitations of mapping options regularly impede your modeling efforts.

When thinking of a JSON-based store, no doubt your mind is immediately drawn to MongoDB. That’s just how MongoDB works. While true, MongoDB still falls short of filling the needs of DDD Aggregates in one very important way. In our park bench discussion I noted how MongoDB was close to what I wanted, but that you could not use MongoDB to both update an Aggregate’s state to one collection in the store and append one or more new Domain Events to a different collection in the same operation. In short, MongoDB doesn’t support ACID transactions. This is a big problem when you want to use Domain Events along with your Aggregates, but you don’t want to use Event Sourcing. That is, your Domain Events are an adjunct to your Aggregate state, not its left fold. Hopefully I don’t have to explain the problems that would occur if we successfully saved an Aggregate’s state to MongoDB, but failed to append a new Domain Event to the same storage. That would simply make the state of the application completely wrong, and no doubt would lead to inconsistencies in dependent parts of our own Domain Model and/or those in one or more other Bounded Contexts.

Rumor has it that MongoDB will at some future time support ACID transactions. In fact there is now a branch of MongoDB that supports ACID transactions. It’s the TokuMX project. Although you may personally feel comfortable using this product, it didn’t excite me. Frankly, it could be a huge challenge to get a given enterprise to support MongoDB in the first place, let alone trying to convince every stakeholder to support a branch of MongoDB that is delivered by a lesser known third party. It seems to me that the best chance to use MongoDB with ACID transactions in your project is when you can finally download it from MongoDB.org.

For me this meant looking elsewhere, and boy, I am glad I did. I believe that I have found the truly ideal DDD Aggregate store in PostgreSQL 9.4. Here are the main reasons why I think this is so:

  • PostgreSQL 9.4 supports both text-based JSON (json datatype) and binary JSON (jsonb datatype). The binary JSON type is a higher performing datatype than the text-based datatype.
  • You can query directly against the JSON, and create indexes on specific JSON object fields/attributes.
  • PostgreSQL is, of course, a relational database and supports ACID transactions.
  • PostgreSQL is a very mature open source product and comes with support tools such as the Postgres Enterprise Manager and the like.
  • You can get both community and commercial support for PostgreSQL, and you have a choice among multiple support vendors.
  • PostgreSQL is fast. I mean, PostgreSQL is seriously fast. In benchmarks around version 9.4, PostgreSQL can perform database writes at or near 14,000 transactions per second. You will be hard pressed to find many projects that will need to perform anywhere near that fast or faster. I don’t have the comparison benchmarks handy, but I believe that is significantly faster than MongoDB (without ACID transactions). In my experience most likely PostgreSQL 9.4 (and later versions) could address the performance needs of probably something like 97% of all enterprise projects globally. Of course your mileage may vary, but I regularly poll developers for performance numbers. The majority need (far) less than 1,000 transactions per second, and only a few require anywhere near 10,000 transactions per second.
  • Using PostgreSQL’s JSON support is just plain easy.

What I will do next is step through how easy it is to use PostgreSQL to create DDD Aggregate storage.

Developing a PostgreSQL JSON Repository

If you are familiar with my book, Implementing Domain-Driven Design, you recall the Core Domain named the Agile Project Management Context. In that Bounded Context we model a project management application for Scrum-based Products. A Product is an Entity that serves as the Root of the Aggregate:

public class Product extends Entity {

    private Set<ProductBacklogItem> backlogItems;
    private String description;
    private ProductDiscussion discussion;
    private String discussionInitiationId;
    private String name;
    private ProductId productId;
    private ProductOwnerId productOwnerId;
    private TenantId tenantId;
    ...
}

I am going to create a Repository to persist Product instances and find them again. Let’s first take a look at the basic means for persisting Product instances, and then we will look at querying for them. Here is the Repository declaration and the methods used to save and remove Product instances:

public class PostgreSQLJSONProductRepository
   extends AbstractPostgreSQLJSONRepository
   implements ProductRepository {
   ...
   @Override
   public ProductId nextIdentity() {
      return new ProductId(UUID.randomUUID().toString().toUpperCase());
   }
   ...
   @Override
   public void remove(Product aProduct) {
      this.deleteJSON(aProduct);
   }

   @Override
   public void removeAll(Collection<Product> aProductCollection) {
      this.deleteJSON(aProductCollection);
   }

   @Override
   public void save(Product aProduct) 
      this.saveAsJSON(aProduct);
   }

   @Override
   public void saveAll(Collection<Product> aProductCollection) {
      this.saveAsJSON(aProductCollection);
   }
   ...
}

That’s pretty simple. The bulk of the work is in the abstract base class, AbstractPostgreSQLJSONRepository. The only method that must be overridden and implemented by the concrete sub-class is the tableName(), which allows the abstract base class to know the name of the table in which the concrete type is stored:

public class PostgreSQLJSONProductRepository
      extends AbstractPostgreSQLJSONRepository
      implements ProductRepository {
   ...
   @Override
   protected String tableName() {
      return "tbl_products";
   }
   ...
}

Let’s take a look inside that base class:

public abstract class AbstractPostgreSQLJSONRepository {

   private ObjectSerializer serializer;
   ...
   protected AbstractPostgreSQLJSONRepository() {
      super();

      this.serializer = ObjectSerializer.instance();
   }

   protected void close(ResultSet aResultSet) {
      if (aResultSet != null) {
         try {
            aResultSet.close();
         } catch (Exception e) {
            // ignore
         }
      }
   }
	
   protected void close(Statement aStatement) {
      if (aStatement != null) {
         try {
            aStatement.close();
         } catch (Exception e) {
            // ignore
         }
      }
   }
	
   protected Connection connection() throws SQLException {
      Connection connection =
            PostgreSQLPooledConnectionProvider
                  .instance()
                  .connection();

      return connection;
   }

   protected void deleteJSON(Identifiable<Long> anAggregateRoot) {
      try {
         Connection connection = this.connection();

         this.deleteJSON(connection, anAggregateRoot);

      } catch (Exception e) {
         throw new RuntimeException("Cannot delete: " + anAggregateRoot + " because: " + e.getMessage());
      }
   }

   protected void deleteJSON(
         Collection<? extends Identifiable<Long>> anAggregateRoots) {
		
         try {
            Connection connection = this.connection();

            for (Identifiable<Long> root : anAggregateRoots) {
               this.deleteJSON(connection, root);
            }

         } catch (Exception e) {
            throw new RuntimeException("Cannot delete: " + anAggregateRoots + " because: " + e.getMessage());
         }
   }
	
   protected <T extends Object> T deserialize(String aSerialization, final Class<T> aType) {
      return this.serializer.deserialize(aSerialization, aType);
   }

   ...

   protected String serialize(Object anAggregate) {
      return this.serializer.serialize(anAggregate);
   }

   protected abstract String tableName();

   protected void saveAsJSON(Identifiable<Long> anAggregateRoot) {
      if (anAggregateRoot.isUnidentified()) {
         this.insertAsJSON(anAggregateRoot);
      } else {
         this.updateAsJSON(anAggregateRoot);
      }
   }
	
   protected void saveAsJSON(Collection<? extends Identifiable<Long>> anAggregateRoots) {
      try {
         Connection connection = this.connection();

         for (Identifiable<Long> aggregateRoot : anAggregateRoots) {
            if (aggregateRoot.isUnidentified()) {
               this.insertAsJSON(connection, aggregateRoot);
            } else {
               this.updateAsJSON(connection, aggregateRoot);
            }
         }
	        
      } catch (Exception e) {
         throw new RuntimeException("Cannot save: " + anAggregateRoots + " because: " + e.getMessage());
      }
   }

   private void deleteJSON(
         Connection aConnection,
         Identifiable<Long> anAggregateRoot)
   throws SQLException {
		
      PreparedStatement statement = null;
		
      try {
         statement = aConnection.prepareStatement(
               "delete from "
               + this.tableName()
               + " where id = ?");

         statement.setLong(1, anAggregateRoot.identity());
         statement.executeUpdate(); 

      } finally {
         this.close(statement);
      }
   }

   private void insertAsJSON(Identifiable<Long> anAggregateRoot) {
      try {
         Connection connection = this.connection();

         this.insertAsJSON(connection, anAggregateRoot);

      } catch (Exception e) {
         throw new RuntimeException("Cannot save: " + anAggregateRoot + " because: " + e.getMessage());
      }
   }

   private void insertAsJSON(
         Connection aConnection,
         Identifiable<Long> anAggregateRoot)
   throws Exception {

      PreparedStatement statement = null;

      try {
         String json = this.serialize(anAggregateRoot);
			
         PGobject jsonObject = new PGobject();
         jsonObject.setType("json");
         jsonObject.setValue(json);

         statement = aConnection.prepareStatement(
               "insert into "
               + this.tableName()
               + " (data) values (?)");

         statement.setObject(1, jsonObject); 
         statement.executeUpdate(); 

      } finally {
         this.close(statement);
      }
   }

   private void updateAsJSON(Identifiable<Long> anAggregateRoot) {
      try {
         Connection connection = this.connection();

         this.updateAsJSON(connection, anAggregateRoot);

      } catch (Exception e) {
         throw new RuntimeException("Cannot update: " + anAggregateRoot + " because: " + e.getMessage());
      }
   }
	
   private void updateAsJSON(
         Connection aConnection,
         Identifiable<Long> anAggregateRoot)
   throws SQLException {

      PreparedStatement statement = null;

      try {
         String json = this.serialize(anAggregateRoot);

         PGobject jsonObject = new PGobject();
         jsonObject.setType("json");
         jsonObject.setValue(json);

         statement = aConnection.prepareStatement(
               "update "
               + this.tableName()
               + " set data = ?"
               + " where id = ?");

         statement.setObject(1, jsonObject);
         statement.setLong(2, anAggregateRoot.identity());
         statement.executeUpdate();

      } finally {
         this.close(statement);
      }
   }
}

Here are the highlights from the abstract base class with regard to saving and removing Aggregates to and from the store:

  • We use an ObjectSerializer to serialize Aggregate instances to JSON, and to deserialize them from JSON back to their Aggregate instance state. This ObjectSerializer is the same one I used in my book, which is based on the Google Gson parser. The biggest reason I use this JSON parser is because it works be introspection and reflection on object fields rather than requiring objects to support the JavaBean specification (yuk!).
  • There are special methods that help close ResultSet and PreparedStatement instances.
  • Each Repository gets a JDBC Connection to the database using PostgreSQLPooledConnectionProvider. All of the operations are simple, lightweight JDBC operations. As indicated by its name, the PostgreSQLPooledConnectionProvider provides pooled Connections that are thread bound using ThreadStatic.
  • You can delete and insert one or many Aggregate instances in one operation. This supports remove(), removeAll(), save(), and saveAll() in the concrete sub-classes.
  • All communication via JDBC uses the PGobject type to carry the JSON payload to and from the database. The PGobject type in this code is “json” and the value is a JSON String object. You can easily switch the code to the more efficient “jsonb” type.

Note another detail. All Aggregate Root Entities are passed into the abstract base class as Identifiable instances. This enables the base class Repository to determine whether the instances have already been saved to the data store on prior operations, or if this is the first time. For first time persistence the Repository uses an INSERT operation. For subsequent saves after having read the Aggregate instances from the store the operation will be an UPDATE. The Entity type in the Agile Project Management code base implements the Identifiable interface:

public interface Identifiable<T> {
   public T identity();
   public void identity(T aValue);
   public boolean isIdentified();
   public boolean isUnidentified();
}

public abstract class Entity implements Identifiable<Long> {
    ...
    private Long surrogateIdentity;

    public Entity() {
        super();

        this.identity(0L);
    }
    ...
    @Override
    public Long identity() {
       return this.surrogateIdentity == null ? 0:this.surrogateIdentity;
    }

    @Override
    public void identity(Long aValue) {
       this.surrogateIdentity = aValue;
    }

    @Override
    public boolean isIdentified() {
       return identity() > 0;
    }

    @Override
    public boolean isUnidentified() {
       return identity() <= 0;
    }
    ...
}

Supporting this interface enables the various saveAsJSON() methods to interrogate each Aggregate instance for its surrogate identity. If the surrogate identity is not yet set, it knows that the Aggregate instance is new and must be inserted. If the surrogate identity is set, the Repository knows that it is a preexisting instance that must be updated to the data store. The surrogate identity is stored as the row’s primary key in the table.


Follow the Aggregate Rule of Thumb: Reference Other Aggregates By Identity Only

Following this rule is very important as it makes your Aggregate instance simple to serialize. If instead you use a graph of Aggregate instances, don’t expect fabulous things from the JSON serializer.


Speaking of database, here is a simple database SQL script used to create the database and tables used by the solution:

drop database if exists agilepm;
create database agilepm owner postgres;

create table tbl_events
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_publishednotificationtracker
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_timeconstrainedprocesstrackers
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_backlogitems
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_productowners
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_products
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_releases
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_sprints
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_teammembers
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_teams
(
    id             bigserial primary key,
    data           json not null
);

As you can see, these are all very simple tables. The JSON is stored in the column named data. The bigserial column type is a bigint (8 bytes) that has a backing sequence. As you insert new rows into one of the tables, its sequence is used to auto-increment the primary key. As you can see, the tbl_events that holds each Domain Event published by the Bounded Context (see Chapter 8 of my book) has a primary key also. This serial bigint primary key serves as the unique notification identity for messaging notifications that are published inside and outside the Bounded Context.

Finally let’s take a look at how Aggregate instances stored as JSON inside the database are found. Note that we will be querying inside the data column of each database table. We use simple -> and ->> notation to navigate from data down into each JSON object. For example, here are the three finder methods found in the Repository for Products, the PostgreSQLJSONProductRepository:

public class PostgreSQLJSONProductRepository
      extends AbstractPostgreSQLJSONRepository
      implements ProductRepository {
   ...
   @Override
   public Collection<Product> allProductsOfTenant(TenantId aTenantId) {
      String filter = "data->'tenantId'->>'id' = ?";

      return this.findAll(Product.class, filter, "", aTenantId.id());
   }

   @Override
   public Product productOfDiscussionInitiationId(
         TenantId aTenantId,
         String aDiscussionInitiationId) {

      String filter = "data->'tenantId'->>'id' = ? and data->>'discussionInitiationId' = ?";

      return this.findExact(Product.class, filter, aTenantId.id(), aDiscussionInitiationId);
   }

   @Override
   public Product productOfId(TenantId aTenantId, ProductId aProductId) {
      String filter = "data->'tenantId'->>'id' = ? and data->'productId'->>'id' = ?";

      return this.findExact(Product.class, filter, aTenantId.id(), aProductId.id());
   }
   ...
}

From the data column we filter using a WHERE clause. The full SELECT statement is found in the abstract base class, which we will examine in a moment. To keep the finder interfaces very simple I only require the client Repository to provide the actual matching parts, such as seen in the code snippet above. There are several tokens in each filter. The data token refers to the data column in the given row. The other tokens such as ‘tenantId’, ‘id’, and ‘productId’ are the JSON field names. So, to match on the tenant identity in the JSON you use data->’tenantId’->>’id’ = ? as part of the WHERE clause. Note that -> is used to navigate above the actual target field, while ->> points to the final target field.

You can findAll() or findExact(), which find a Collection of a specific type or find a single instance of a specific type, respectively:

public abstract class AbstractPostgreSQLJSONRepository {
   ...
   protected <T extends Identifiable<Long>> List<T> findAll(
         Class<T> aType,
         String aFilterExpression,
         String anOrderBy,
         Object ... anArguments) {

      List<T> aggregates = new ArrayList<T>();
      PreparedStatement statement = null;
      ResultSet result = null;

      String query =
            "select id, data from "
            + this.tableName()
            + " where "
            + aFilterExpression
            + " "
            + anOrderBy;

      try {
         Connection connection = this.connection();

         statement = connection.prepareStatement(query);

         this.setStatementArguments(statement, anArguments);

         result = statement.executeQuery();

         while (result.next()) {
            Long identity = result.getLong(1);

            String serialized = result.getObject(2).toString();
            	
            T aggregate = this.deserialize(serialized, aType);
            	
            aggregate.identity(identity);

            aggregates.add(aggregate);
         }

      } catch (Exception e) {
         throw new RuntimeException("Cannot find: " + query + " because: " + e.getMessage());
      } finally {
         this.close(statement);
         this.close(result);
      }

      return aggregates;
   }
	
   protected <T extends Identifiable<Long>> T findExact(
         Class<T> aType,
         String aFilterExpression,
         Object ... anArguments) {

      T aggregate = null;

      List<T> aggregates = this.findAll(aType, aFilterExpression, "", anArguments);

      if (!aggregates.isEmpty()) {
         aggregate = aggregates.get(0);
      }

      return aggregate;
   }
   ...
   private void setStatementArguments(
         PreparedStatement aStatement,
         Object[] anArguments)
   throws SQLException {

      for (int idx = 0; idx < anArguments.length; ++idx) {
         Object argument = anArguments[idx];
         Class<?> argumentType = argument.getClass();

         if (argumentType == String.class) {
            aStatement.setString(idx+1, (String) argument);
         } else if (argumentType == Integer.class) {
            aStatement.setInt(idx+1, (Integer) argument);
         } else if (argumentType == Long.class) {
            aStatement.setLong(idx+1, (Long) argument);
         } else if (argumentType == Boolean.class) {
            aStatement.setBoolean(idx+1, (Boolean) argument);
         } else if (argumentType == Date.class) {
            java.sql.Date sqlDate = new java.sql.Date(((Date) argument).getTime());
            aStatement.setDate(idx+1, sqlDate);
         } else if (argumentType == Double.class) {
            aStatement.setDouble(idx+1, (Double) argument);
         } else if (argumentType == Float.class) {
            aStatement.setFloat(idx+1, (Float) argument);
         }
      }
   }
   ...
}

The backbone of the finders is implemented in findAll(), which findExact() reuses. Note that when the ResultSet is obtained we iterate over each entry. Using findAll() you can both filter and order the outcome by a specific column or JSON field.

We obtain both the surrogate identity and the JSON serialization payload. Once the JSON is used to deserialize to the Aggregate instance, we set the surrogate identity as the identity of the Identifiable. This prepares the Aggregate instance for updating should the client decide to modify the instance and call save() on the Product Repository.

Well, that’s pretty much it. Every concrete Repository implemented using the AbstractPostgreSQLJSONRepository is very simple and straightforward. I intend to push the implementation to its Github repository as soon as possible. That should give you everything you need to implement this in your own project.

Share on twitter

More to explore

Actors Are Ok!

I have been using various forms of messaging for more than 30 years. Messaging has become second nature to me, but I

And Then This Happened

Tonight after midnight in Barcelona, I was in a conversation about CQRS and how using Domain Events or not using them can