All posts in “data modelling”

Neo4j Northwind Traders Database Schema

Graph Databases: Loading Data with Neo4j

Graph databases are becoming more popular as a way of storing and analysing large connected datasets.

Neo4j is a popular Graph DBMS because of its powerful querying language: Cypher and its growing community and excellent supporting tools.

A new paradigm comes with a new set of challenges. In this case we are focused on the challenge of creating a data pipeline to load data into Neo4j, thinking about how we might design our schema and how we might query it.

Today, we’ll take you through an ETL of The Northwind Traders Sample Database and some of the things we can do with Cypher that makes it special and worth a look.

We’ll be using Kettle to orchestrate our ETL and making use of the Neo4j Output Plugin to help us interact with our Neo4j server. We’ll also load the same dataset to PosgreSQL, to see how the two technologies compare.


Environment Setup

The installation of Kettle, Neo4j Output Plugin and Neo4j server is outside the scope of this post but it is important to note that the connection is not stored in the transformations as they usually are. It’s stored in the metastore which is found in your home folder. For that reason, you’ll have to create a new connection before you can load data into Neo4j.

Create connection is available the Neo4j Menu of Kettle
Neo4j Connection dialog allows you to add a connection to your metastore

Once you can test your connection you are good to go, just make sure the connection name is local-hardcoded so that you don’t need to change any of the transformations provided at the end of the post.

To set up your PostgreSQL connection you’ll need to edit the conf/kettle.properties files with the correct details for your connection. This will change the connection for each of the transformations and the main job as we have parameterised the connection for your convenience. If you do not have a password you can remove everything after DATABASE_PASS from the file and it will work fine.

With that out of the way, let’s get started with Northwind.


Northwind Traders Database

The Northwind Traders database is a sample database that comes with Microsoft Access. The database contains sales data for a fictitious company called Northwind Traders.

As you can see it represents an OLTP focused on parts of orders.

We will be using a version of Northwind compiled by Neo4j as csv files, available on GitHub, which is more or less the same as the original and the image above.


Loading Data

The ETL we have created is relatively simple; there is a single job, RUN.kjb, that runs, in sequence, the transformations that load nodes and relationships into Neo4j from the csv files.

Similarly, there is a single job for loading into Postgres; one important difference is that for Postgres we must create our dimension and fact schemas beforehand using SQL create statements. In Neo4j this is not necessary because graph databases are schema-less.

We’ll be using the Neo4j Output step to create nodes and relationships and the Neo4j Cypher step for lookups.

The sequence of steps below shows a common pattern to ensure that poorly delimited and enclosed csv files produce the expected columns and rows:

Handling poorly delimited files in Kettle

Address Nodes

Addresses occur often within Northwind: suppliers, customers, shipment receivers and employees all have addresses. For that reason we have tried to create a generic way to link an address, an actor that allows for performant geographic queries.

Rather than have addresses and their components (country, region, city, postcode, street and building name) stored as a property for each customer, supplier, etc. each address is stored as its own node. Each address node is linked to a city node and each city to a region and so on; this creates a hierarchy, using composite keys to make sure the same region isn’t linked to two countries. The result of this is easier indexing which speeds up queries because searches on nodes are faster than searches on properties.

Producing nodes and relationships from a table of addresses in Kettle

After we have collected all the addresses from all the files, we remove duplicates and replace any null regions (some countries have only one region, which is null) with the country name. We create the region nodes and link them to their appropriate country node, do the same for cities -> regions and addresses -> city nodes. 

We now have a more efficient way to query locations and slice our data. 

Example address heirarchy in Neo4j Browser

Relational Addresses

In Postgres we need to do little to get addresses into our dimensions. If it wasn’t for other complications such as poorly delimited and enclosed csv files and replacing null regions it would be as simple as table input -> table output. This is because each address is stored in the same dimension as the other information for that actor, i.e. customer addresses are stored in the customer dimension.


Date Nodes and Date Dimension

Creating date nodes in Kettle

Creating date nodes is identical to any date dimension that you have created in the past except that you are creating nodes instead of rows. The result is a node with many properties that allow you to query in a variety of ways without having to do on the fly date operations:

Date node properties in Neo4j Browser

In PostgreSQL, we populate our date dimension with the same fields as in Neo4j but each date is a row not a node.


Order Nodes and Relationships 

In our graph, order nodes are the most connected node; you could compare this node to a fact table that contains no additive fields (also known as a factless fact table)

Joining orders.csv and order-details.csv in Kettle

We start by joining order-details to orders so that we can create all the nodes and relationships we want in one go.

Date and shipment receiver node lookups in Kettle

Next, we do some lookups on previously created nodes so that we can link the order nodes to date nodes and shipment receivers.

Creating order nodes and the relationship between products before grouping in Kettle

Calculate totals from unitPrice, discount and quantity; this reduces query time because values are precalculated. Create our order nodes and their relationships to products (remembering that the CONTAINS_PRODUCT relationship uses part orders coming from order-details.csv

Define the CONTAINS_PRODUCT relationship and its properties in Kettle

All the additive fields apart from freight are stored in our CONTAINS_PRODUCT relationship between Order and Product nodes. This is the most logical location to store these properties unless we wanted to create a Part Order which would only increase traversal and reduce performance of queries.

Creating relationships between orders and many other nodes in Kettle

Finally, we create all the relationships between our order nodes and the other nodes we created before using a sequence of Neo4j Output steps.

Define the SHIPPED_TO relationship and its properties in Kettle

It’s worth noting that we store our freight costs, another additive field, in the relationship between the Order and ShipmentReceiver: SHIPPED_TO. This allows us to maintain additivity without introducing complications surrounding the freight field as you will see later on.

The schema we have created looks like this:

“call db.schema()” in Neo4j Browser

Part Fact Orders

In PostgreSQL, we still join orders to order details to get part orders; however, we must isolate a single freight value for each order so that freight is additive. Some of the options here are:

  • Split the freight evenly between the products contained in an order.
    • This is misleading as packaging and shipping costs are usually dependant on size and/or weight of the package so we want to avoid splitting evenly.
  • Split the freight proportionately between the products contained in an order.
    • This is the ideal scenario but it is not possible because we do not have weight or size information for the products, unfortunately.
  • Store freight with only a single part order.
    • This is the compromise we chose as it maintains the additivity of the freight field and is less misleading.

As you can see, all options mean we cannot do analysis of freight costs per product. 

We accomplish this using a changing sequence and a javascript calculation.

Remove freight from all but the first row of an order in Kettle

Relational data warehouses depend heavily on surrogate keys to join facts to dimensions. For each dimension we have created a sequence for this purpose. 

When we create the fact table we lookup these sequences so we can add them to the fact table. This is a distinct difference between Neo4j and relational databases as Neo4j manages its own keys to identify which relationships are connected to a given node.

Dimension lookups in Kettle

Finally, before loading to the table we create a sequence to be the primary key for the fact table.

Add a primary key sequence before table output in Kettle

Querying

Let’s look at how we can query our newly created databases.

Value of sales for each year from customers in the USA

In Cypher:

MATCH (p:Product)<-[r]-(o:Order)--(:Customer)--()--()--()--(c:Country)
WHERE toLower(c.country) = "usa"
WITH o AS order, c.country AS country, r AS rel
MATCH (order)-[:ORDERED_ON_DATE]->(d)
RETURN 
  country, d.calendarYear AS year
, count(DISTINCT order) AS number_of_orders
, apoc.number.format(sum(rel.netAmount), '$#,##0.00', 'en') AS 
  value_in_dollars
ORDER BY year ASC

In PostgreSQL:

SELECT
  customers.country AS country
, dates.calendar_year AS year
, count(DISTINCT orders.order_nk) AS number_of_orders
, cast(sum(orders.net_amount) AS money) AS value_in_dollars
FROM
  public.fact_part_orders AS orders
, public.dim_customers AS customers
, public.dim_date AS dates
WHERE orders.customer_id = customers.customer_id
AND   orders.order_date_id = dates.date_id
AND   lower(customers.country) = 'usa'
GROUP BY country, year
order BY year ASC;

We can refactor our schema to include direct relationships between orders and cities, orders and regions, orders and countries, giving us a quicker way to retrieve the same results. After you do this the matching pattern changes from (p:Product)<-[r]-(o:Order)--(:Customer)--()--()--()--(c:Country) to (p:Product)<-[r]-(o:Order)--(c:Country) and the performance boost would be significant as there are less hops to traverse and fewer searches to complete. 

Products most likely to be bought together

In Cypher:

MATCH 
  p=(original:Product)--(:Order)--(related:Product)
WHERE
  toLower(original.productName) = "teatime chocolate
  biscuits"
RETURN
  DISTINCT original.productName AS product
, related.productName AS most_likely_to_be_bought_with
, count(p) AS popularity 
ORDER BY
  popularity DESC
, most_likely_to_be_bought_with DESC
LIMIT 5

In PostgreSQL:

SELECT
  original.product_name AS product
, related.product_name AS most_likely_to_be_bought_with
, count(r_orders.order_nk) AS popularity
FROM
  public.dim_products AS original
, public.dim_products AS related
, public.fact_orders AS o_orders
, public.fact_orders AS r_orders
WHERE 
    original.product_id = o_orders.product_id
AND o_orders.order_nk =  r_orders.order_nk
AND r_orders.product_id = related.product_id
AND lower(original.product_name) = 'teatime chocolate biscuits'
AND lower(related.product_name) <> 'teatime chocolate biscuits'
GROUP BY
  original.product_name
, related.product_name
ORDER BY
  popularity DESC
, most_likely_to_be_bought_with DESC
LIMIT 5;

Isn’t that a mouthful.

This type of query has become common in online shopping; the shop will recommend products based on what you are looking at or what you have in your cart.

As you can see in Cypher the Products most likely to be bought together query is more compact. Importantly, this makes querying far less error prone; accidentally running a cross-join because you forgot a join condition can go unnoticed and be very costly.

In SQL, fewer joins will lead to the best performance, especially when your fact table has several billions of rows (or you’re joining the fact table to itself like we are here). Neo4j does not have the concept of joins because there are no tables. Graph queries are easier to write, read and modify which is why recommendation queries work well in graph databases.


Improvement Strategies 

This schema is a good start and allows us to think about how to use Neo4j to analyse our data. In loading the data into Neo4j, we have come up with new ideas that have not been implemented as of yet.

Firstly, aggregation nodes could be a useful way to query old data quickly by storing pre-calculated values for later; these nodes play the same role as aggregation tables do in a relational database. In a schema-less model, we can add new nodes easily without building new tables making aggregation a valuable strategy.

The most simple version of this is to calculate the total value of an order and store it in that order, this should improve query time.

We can create these nodes on several aggregation levels, e.g. Yearly Sales, Monthly Sales, Daily Sales, etc.

There is also a possibility of creating geographic aggregation nodes, e.g. USA Sales, London Sales etc.

Separating date nodes into year, month and day nodes is another strategy; this should allow performant querying for specific years and months as a search through all properties of date nodes is not necessary.

Finally, creating a LinkedList between nodes of the same type may prove to be valuable. For example, (:Year)->[:NEXT_YEAR]->(:Year) allows you to compare one years sales to the previous years sales; the same can be done for previous and next month or previous and next day. Thus we can make use of reduced hop traversal to improve query performance when interested in sequences of dates. This is quite difficult to implement in a relational model as each comparison to a previous period and future period will require an additional column on the date dimension.


Conclusion

  • Cypher queries are less error-prone because its more difficult to miss join conditions when SQL-style FROM and JOIN are expressed through a single pattern.
  • Kettle has a nice plugin to visualise and perform your output to Neo4j.
  • We can optimise our graph for a number of different queries without impacting overall performance.
  • The flexible, schema-less nature means changes can be made without refactoring the whole ETL.
  • Graphs produce efficient recommendation queries.
  • There are many improvements yet to explore.

Next Steps

  • Compare performance on large, connected datasets between relational and graph databases.
  • Load and query databases built from the ground up for connected use cases: social media, map navigation, city planning.
  • Explore hybrid schemas (relational when needed, graph when appropriate) with a virtualisation layer.
  • Optimise Date nodes for different use cases.

Where to get the code

The Neo4j ETL can be downloaded here: loading_northwind_neo4j.zip

The PostgreSQL ETL can be downloaded here: loading_northwind_postgres.zip

Status change fact table – Part 5 (Conclusion)

(previous parts of this series are here: part 1, part 2, part 3, part 4)

The fact table described in the previous parts of this article allow us to count how many objects entered or exited a given state in a certain time period.

However, if we want to count how many objects are in a given state on a given point in time, we need to count all In and Out events since the beginning of time.

This is problematic and doesn’t scale well, as more calculations are required as more data is ingested.

However, we can add a Periodic Snapshot fact table to the mix and have those running totals counted for each state in each day. As such, questions such as “How many objects were Out for delivery on day X” can be answered by looking at a specific snapshot date and gouping all rows that have that specific attribute.

Even if we include attributes such as the age of objects or events, as we did in the implementation of the status change fact table, we can still keep track of everything.

In brief, the snapshot could be implemented by doing something like the following:

  • Take all rows for the snapshot on day D;
  • Increment snapshot date to D+1; increment all ages by 1;
  • Take all events from the status change table for day D+1
  • Add the two sets of events from steps 2 and 3
  • Insert the data into a new partition of the snapshot table.

The snapshot table will have a very high cardinality, especially because we will most likely need to keep it atomic, without aggregation on any attribute. Keeping the snapshot partitioned lets us maintain the snapshot more easily, allowing us to delete specific partitions when the data is found to be incorrect.
Bear in mind that, as in with any Periodic Snapshot algorithm, it is highly sensitive to data errors. Any incorrect number in one day will be propagated into the future and the only practical solution is to delete all snapshot data from the first day the totals are wrong and reprocess a significant amount of data.

But, with all its flaws, it’s still our best shot at keeping track of all measurable values within our data.

In conclusion, this model implements and extends functionality that used to be achieved by an Accumulating Snapshot. It features the ability to track an arbitrary number of stages of a process, and doesn’t require updates of past data, making it easy to implement in a Hadoop environment. It can be used to track data pertaining to shipments and deliveries, bug tracking software, or any other business process where objects go from one state to the next and we need to track its path.

Status change fact table – Part 4 (A PDI implementation)

(previous articles of this series can be found here, here and here)

In this series of posts we’ve been discussing a data model to handle status changes of objects and a way to track them and perform analytics.

The previous article describes the data model and in this article we show an implementation using PDI.

The idea is as follows: after reading a data slice and sorting it by order_id (our primary key) and event_date, we determine which event is the first (of the batch) and which one is the last. We then clone all events but the last one, and apply some logic:

  • The clone of any event is a “Out” event, that means the event_count should be set to -1;
  • The “Out” event will have as timestamp the timestamp of the next “In” event (the moment the event is in fact closed);
  • On the “Out” events we determine the event’s age (which is by definition 0 at the “In” events)
  • We calculate the cumulative sum of all event ages to determine the age of the order itself (this will be valid in both “In” and “Out” events, one should beware of this when using this measure in reports;
  • All “last” events (by order) shall be assigned a different output partition. The output partitions will be called “Open” for all “In” events that don’t yet have a matching “Out” event; and “Closed” for all matched “In/Out” pairs.
  • The output is appended to “Closed” partition, but it overwrites the contents of the “Open” partition.
  • On the subsequent run, all new input data plus all unmatched “In” events previously processed will be fed into the ETL. Those “In” events that eventually get matched with an “Out” event will move to the “Closed” partition

The ETL can thus run incrementally against an input data set that is fed new data periodically. If at any time the ETL runs without new input data, nothing will be done: no new data is appended to the “Closed” partition, and the contents of the “Open” partition are read, nothing is done to them (as there are no other events to process) and re-written to the “Open” partition.

At the end of this article you’ll find a download link to a ZIP file that has all necessary files to run the ETL described above.

Inside the ZIP file you’ll find the following folders:

  • data: all the input data files;
  • inbox: folder that will be processed by the transformation
  • output: destination of the processed data
  • kettle: location of the actual PDI transformation

To run the transformation just move the first few files from data into inbox (remark: you need to respect the dates; copy older files first); run the transformation. The output folder will now have two files under output/open/data.csv and output/closed/data.csv. These are the two files that constitute our output. To run it again, remove the files from the inbox folder and move the next batch of files from data. The ETL will run incrementally.

Download PDI Implementation

Read the last part of this series of articles here

Status change fact table – Part 3 (The model)

(parts 1 and 2 of this series can be viewed here and here)

The classical approach to solving this problem is through an accumulating snapshot fact table. That table would have the order date, shipment date, delivery date and would be updated at every milestone of each order. We may add a return date, or re-delivery date to keep track of returns or failed deliveries, but we have to decide the structure beforehand.

A possible design of an accumulating snapshot to track the data would be

orders_accumulating_snapshot(
Order_id,
Customer,
Warehouse,
Carrier,
Order_date,
Shipped_date,
First_delivery_attempt_date,
First_return_to_warehouse_date,
First_returo_to_warehouse_reason,
Successful_delivery_date
)

There are significant limitations to that design, namely the fact that we can’t track more than a fixed number of delivery attempts or product returns. In the example above, we chose to track the dates of the first shipment, the first delivery attempt, the first return to the warehouse (whether due to a failed delivery or return by customer, a field showing the reason for the first return to warehouse, and the successful delivery date.

We can expand this design, for example to also track the last return date as well as the last return reason, to we can track both the 1st failed delivery and an eventual return by the customer.

We can also add a few more intermediate milestones.

But we have to decide before we start collecting and processing data which milestones are worth keeping and which ones are not. And when we add more milestones to the list of interesting events we want to track we have to revisit the table structures, which may require reprocessing large amounts of data.

The approach we propose is different.

Instead of being order-centric, as in the accumulating snapshot design, the fact table should be event-centric. We consider each event as the actual fact we want to track, not the order itself. And we add another twist: we duplicate the event rows, one to track the moment the order entered a given state and another to track when the order left that state into another.

What we’re doing is considering a state change to be made of two distinct parts: if an order moves from state A to state B, then:

  1. The order LEAVES state A
  2. Then the order ENTERS state B.

Each of those two instances are treated as if they were actual events. But we add a new column, which we call Direction, which takes values of +1 and -1 depending on whether it refers to a new state or an old one.

So, the data above would look like this:

order_id order_date customer_name event_date event_name warehouse carrier event_count
1 01/01/16 Acme Industries 02/01/16 08:00 Preparing shipment London Deliveries express 1
1 01/01/16 Acme Industries 03/01/16 10:00 Preparing shipment London Deliveries express -1
1 01/01/16 Acme Industries 03/01/16 10:00 Shipped from warehouse London Deliveries express 1
1 01/01/16 Acme Industries 03/01/16 14:30 Shipped from warehouse London Deliveries express -1
1 01/01/16 Acme Industries 03/01/16 14:30 Delivery failed London Deliveries express 1

What we did here is separate the event into two different facts: one with an event_count of +1 marking the entry into a given state and another with an event_count of -1 marking its exit from that state. We shall call these the In and Out events, respectively.

Notice that the timestamp of the Out event should always match the timestamp of the next In event. All other attributes of the Out event are kept from the corresponding In event.

A few things about this model are worth noting:

  • sum(event_count) yields the same result as count(disctinct order_id), as within each order_id all rows are paired up and cancel each other out, except the most recent one;
  • This remains true if we limit our query to a given event_date interval;
  • It still remains true if we limit further our query to a specific attribute (e.g., customer_name);
  • It’s still true if we filter further on a specific event_name, with a caveat: it will return the difference between the number of objects in that state at the end of the interval and the number of objects in that state at the beginning, giving us a net count of objects in/out any given state;
  • It aggregates nicely and includes only additive measures;
  • It can be enriched, for example including an event_age measure in all Out events (defined as zero for all In events), which allow us to calculate average ages of objects in any given state; this measure also allows drilling and filtering;
  • Furthermore, it allow us to calculate the cumulative age of objects, as long as we’re careful, when calculating it, to only take cumulative ages of Out events;
  • We can view the event_count column as a measure, counting how many objects entered minus how many objects left a given state, but also as a dimension, counting In and Out events separately (with the caveat that counts of Out events are negative numbers)
  • As we only insert new rows and never update records, this can be implemented in a Hadoop environment without any changes;
  • And the algorithm can be implemented in a MapReduce type of algorithm, furthering its applicability to Hadoop data stores and providing scalability.

What this table achieves is in fact a generalisation of the Accumulating Snapshot table as described by Kimball, by means of a trade off between the number of columns and the number of rows. If we were to capture events in a process of fixed length, let’s say N milestones, Kimball’s approach requires 1 row and N additional columns, whereas our approach requires 1 additional column and a total of 2N-1 rows.

However, where Kimball’s approach reaches its limit is in the ability to change the length of the process being analysed; adding a new milestone requires changing the table’s structure and filling in missing data, whereas in our approach only new rows have to be added and for all those objects where new events being tracked don’t exist, they are simply not there. In processes where the length varies significantly (multiple delivery attempts, for example), Kimball’s approach will result in many empty cells, whereas in this approach those rows would simply be missing.

This model doesn’t come without its challenges, of course:

  • There’s no practical way to determine how many objects had an event of type X in a time period if there are repeated events. We can only count how many such events occurred, but if an object had multiple events within that time period, it’ll be overcounted;
  • During the ETL it’s necessary to read all “most recent events” for all objects, so we can determine their Out event correctly and then filter them out from the final output; one way to achieve this is to apply partitioning: all In events that don’t yet have a matching Out event are stored in a separate partition which is overwritten on each ETL run;
  • The implementation algorithm will need to sort the input data to properly merge most recent events coming from the target fact table with new incoming events from the source system; if the input data is very small when compared to the global population of objects already being tracked this adds a significant overhead to the ETL process;
  • The algorithm is highly sensitive to late arriving data; if the events arrive in the wrong order and we process the 1st, 3rd and 4th events of an object in one ETL run and only later we read the 2nd event of its lifecycle, there’s no way to insert it into the target table without compromising consistency. A reload of data will be necessary.

In the next article we’re going to see a practical implementation of this fact table using a set of PDI transformations that read new data in CSV files from an inbox folder and append its output data to another CSV file, which we can then open in the tool of our choice to do some analytics with it.

(part 4 of the series is here)

Status change fact table – Part 2 (The input data)

(See part 1 of this series here)

To make matters simpler we’ll assume that a table with a stream of order events already exists and that it’s denormalised, with each row containing all the relevant data. Either the source system already had that information or it was built in an earlier phase of the ETL.

Table order_events:
Order_id int
Order_date datetime
Customer_name string
Event_date datetime
Event_name string
Warehouse string
Carrier string

Here’s a sample of the data we’ll be dealing with:

order_id order_date customer_name event_date event_name warehouse carrier
1 01/01/16 Acme Industries 02/01/16 08:00 Preparing shipment London Deliveries express
1 01/01/16 Acme Industries 03/01/16 10:00 Shipped from warehouse London Deliveries express
1 01/01/16 Acme Industries 03/01/16 14:30 Delivery failed London Deliveries express
1 01/01/16 Acme Industries 03/01/16 18:00 Returned to warehouse London Deliveries express
1 01/01/16 Acme Industries 04/01/16 09:00 Shipped from warehouse London Deliveries express
1 01/01/16 Acme Industries 04/01/16 11:30 Delivery failed London Deliveries express
1 01/01/16 Acme Industries 04/01/16 18:00 Returned to warehouse London Deliveries express
1 01/01/16 Acme Industries 05/01/16 10:30 Shipped from warehouse London Deliveries express
1 01/01/16 Acme Industries 05/01/16 15:00 Delivery successful London Deliveries express
2 01/01/16 Boggs Corp 02/01/16 09:00 Preparing shipment London Deliveries express
2 01/01/16 Boggs Corp 03/01/16 10:00 Shipped from warehouse London Deliveries express
2 01/01/16 Boggs Corp 03/01/16 11:00 Delivery successful London Deliveries express
3 02/01/16 Boggs Corp 03/01/16 08:00 Preparing shipment Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 04/01/16 09:30 Shipped from warehouse Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 04/01/16 15:00 Delivery successful Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 08/01/16 16:30 Returned by customer Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 09/01/16 08:00 Preparing shipment Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 10/01/16 10:00 Shipped from warehouse Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 10/01/16 14:00 Delivery successful Liverpool WeDeliverStuff

Our data sample has 3 orders, from 2 different customers and that shipped from two different warehouses. One of the orders was only delivered after a couple of failed attempts and in another the products were returned and had to be replaced.

Our goal in the next article is to build a fact table that can track all events that happened to all orders and provide meaningful analytics to the client.

In the next part of this series we’re going to explore a data model that allows analysing this dataset.

Status change fact table – Part 1 (The problem)

In this series of articles we’re going to address a business problem that is traditionally solved, although only partially, by an accumulating snapshot fact table. We will propose a new type of fact table that not only generalizes the concept of the accumulating snapshot, as described by Ralph Kimball, but also provides an implementation that doesn’t require updates, which makes it Hadoop friendly, and can be implemented by a MapReduce algorithm.

So, here’s the description of the business problem we got from our (fictional) client:

We need to analyse data pertaining to orders and order fulfilment. We have a database where order events are stored, indicating when an order is placed by a client, when it’s prepared for shipment at our warehouses, when it dispatches and when it’s delivered. In some cases shipments are sent back by the client, if we shipped the wrong products or if some item is defective; in that case the order may be shipped again, cancelled altogether, modified, etc. Also, in some cases the delivery attempt fails and it may need to be retried one or more times by our courier.

We need to create reports to answer, at least, the following questions:

  1. How many orders were placed/shipped/delivered on a given time period?
  2. What’s the number of orders returned to the warehouse by any reason other than “could not deliver” (defective or broken products; wrong products shipped; delivered past its due date; etc.)?
  3. How many orders are in our backlog waiting to be dispatched? How does that compare to the backlog last week/month/year?
  4. How many orders are delivered but haven’t been paid yet? How many payments are overdue, given our standard 30 day payment terms? How does that compare with the same indicator last week/month/year?
  5. Which warehouses are more likely to have orders returned due to defective items?
  6. Which carriers are more likely to break items during delivery?
  7. Of course, for all these questions, we may need to drill down by customer, carrier, warehouse, etc.

All the data for our analytics solution must be hosted in a Hadoop cluster and data processing should be done directly in the cluster as much as possible (e.g. in MapReduce jobs).

From the business questions we can see how an accumulating snapshot fact table would be able to address some of the questions, by inserting new facts when an order is placed and updating them for each milestone in the order’s life cycle: prepared, shipped, delivered, paid. We can also add a couple more milestones, namely for shipment returns, delivery retries, etc. But this has several limitations.

One limitation of the accumulating snapshot is that we have to define which milestones are worth recording beforehand. If an order is retried more than once we can only keep track of a fixed number of retries; likewise, an order may be returned multiple times, but we must determine beforehand how many order returns can be tracked. And there’s no way to track a variable and arbitrarily large number of milestones. As time goes by, more variety is expected in the data and it may happen that although most orders go through 5 or 6 stages in their lifecycle, some have well over 20 stages including a varying number of delivery attempts or product returns. And in a few exotic examples, there may be over 100 milestones for one particular order.

Another limitation is that the accumulating snapshot isn’t future proof. Even if we accept the limitation of being able to track only a fixed number of delivery attempts and product returns, if the client asks us to also keep track of late payments and the number of actions taken to collect an overdue invoice (emails, letters, phone calls) we need to add new columns to what is already a very wide fact table.

Finally, this is not Hadoop friendly. HDFS is a “write once, read many times” filesystem and doesn’t offer the chance to easily update records. We could try versioning records, but that places an extra burden on the query engine to perform the analytics, requiring grouping facts by order ID and retrieving only the latest version of each fact to get the most current state. And those queries will necessarily be slower than your typical OLAP query,

Select dim_table.attribute, sum(fact_table.measure)
from fact_table, dim_table
where fact_table.key = dim_table.key
group by dim_table.attribute

In the next article of this series we’ll see a sample of the data we need to process.

(part 2 of the series here