All posts in “data modelling”

Status change fact table – Part 5 (Conclusion)

(previous parts of this series are here: part 1, part 2, part 3, part 4)

The fact table described in the previous parts of this article allow us to count how many objects entered or exited a given state in a certain time period.

However, if we want to count how many objects are in a given state on a given point in time, we need to count all In and Out events since the beginning of time.

This is problematic and doesn’t scale well, as more calculations are required as more data is ingested.

However, we can add a Periodic Snapshot fact table to the mix and have those running totals counted for each state in each day. As such, questions such as “How many objects were Out for delivery on day X” can be answered by looking at a specific snapshot date and gouping all rows that have that specific attribute.

Even if we include attributes such as the age of objects or events, as we did in the implementation of the status change fact table, we can still keep track of everything.

In brief, the snapshot could be implemented by doing something like the following:

  • Take all rows for the snapshot on day D;
  • Increment snapshot date to D+1; increment all ages by 1;
  • Take all events from the status change table for day D+1
  • Add the two sets of events from steps 2 and 3
  • Insert the data into a new partition of the snapshot table.

The snapshot table will have a very high cardinality, especially because we will most likely need to keep it atomic, without aggregation on any attribute. Keeping the snapshot partitioned lets us maintain the snapshot more easily, allowing us to delete specific partitions when the data is found to be incorrect.
Bear in mind that, as in with any Periodic Snapshot algorithm, it is highly sensitive to data errors. Any incorrect number in one day will be propagated into the future and the only practical solution is to delete all snapshot data from the first day the totals are wrong and reprocess a significant amount of data.

But, with all its flaws, it’s still our best shot at keeping track of all measurable values within our data.

In conclusion, this model implements and extends functionality that used to be achieved by an Accumulating Snapshot. It features the ability to track an arbitrary number of stages of a process, and doesn’t require updates of past data, making it easy to implement in a Hadoop environment. It can be used to track data pertaining to shipments and deliveries, bug tracking software, or any other business process where objects go from one state to the next and we need to track its path.

Status change fact table – Part 4 (A PDI implementation)

(previous articles of this series can be found here, here and here)

In this series of posts we’ve been discussing a data model to handle status changes of objects and a way to track them and perform analytics.

The previous article describes the data model and in this article we show an implementation using PDI.

The idea is as follows: after reading a data slice and sorting it by order_id (our primary key) and event_date, we determine which event is the first (of the batch) and which one is the last. We then clone all events but the last one, and apply some logic:

  • The clone of any event is a “Out” event, that means the event_count should be set to -1;
  • The “Out” event will have as timestamp the timestamp of the next “In” event (the moment the event is in fact closed);
  • On the “Out” events we determine the event’s age (which is by definition 0 at the “In” events)
  • We calculate the cumulative sum of all event ages to determine the age of the order itself (this will be valid in both “In” and “Out” events, one should beware of this when using this measure in reports;
  • All “last” events (by order) shall be assigned a different output partition. The output partitions will be called “Open” for all “In” events that don’t yet have a matching “Out” event; and “Closed” for all matched “In/Out” pairs.
  • The output is appended to “Closed” partition, but it overwrites the contents of the “Open” partition.
  • On the subsequent run, all new input data plus all unmatched “In” events previously processed will be fed into the ETL. Those “In” events that eventually get matched with an “Out” event will move to the “Closed” partition

The ETL can thus run incrementally against an input data set that is fed new data periodically. If at any time the ETL runs without new input data, nothing will be done: no new data is appended to the “Closed” partition, and the contents of the “Open” partition are read, nothing is done to them (as there are no other events to process) and re-written to the “Open” partition.

At the end of this article you’ll find a download link to a ZIP file that has all necessary files to run the ETL described above.

Inside the ZIP file you’ll find the following folders:

  • data: all the input data files;
  • inbox: folder that will be processed by the transformation
  • output: destination of the processed data
  • kettle: location of the actual PDI transformation

To run the transformation just move the first few files from data into inbox (remark: you need to respect the dates; copy older files first); run the transformation. The output folder will now have two files under output/open/data.csv and output/closed/data.csv. These are the two files that constitute our output. To run it again, remove the files from the inbox folder and move the next batch of files from data. The ETL will run incrementally.

Download PDI Implementation

Read the last part of this series of articles here

Status change fact table – Part 3 (The model)

(parts 1 and 2 of this series can be viewed here and here)

The classical approach to solving this problem is through an accumulating snapshot fact table. That table would have the order date, shipment date, delivery date and would be updated at every milestone of each order. We may add a return date, or re-delivery date to keep track of returns or failed deliveries, but we have to decide the structure beforehand.

A possible design of an accumulating snapshot to track the data would be

orders_accumulating_snapshot(
Order_id,
Customer,
Warehouse,
Carrier,
Order_date,
Shipped_date,
First_delivery_attempt_date,
First_return_to_warehouse_date,
First_returo_to_warehouse_reason,
Successful_delivery_date
)

There are significant limitations to that design, namely the fact that we can’t track more than a fixed number of delivery attempts or product returns. In the example above, we chose to track the dates of the first shipment, the first delivery attempt, the first return to the warehouse (whether due to a failed delivery or return by customer, a field showing the reason for the first return to warehouse, and the successful delivery date.

We can expand this design, for example to also track the last return date as well as the last return reason, to we can track both the 1st failed delivery and an eventual return by the customer.

We can also add a few more intermediate milestones.

But we have to decide before we start collecting and processing data which milestones are worth keeping and which ones are not. And when we add more milestones to the list of interesting events we want to track we have to revisit the table structures, which may require reprocessing large amounts of data.

The approach we propose is different.

Instead of being order-centric, as in the accumulating snapshot design, the fact table should be event-centric. We consider each event as the actual fact we want to track, not the order itself. And we add another twist: we duplicate the event rows, one to track the moment the order entered a given state and another to track when the order left that state into another.

What we’re doing is considering a state change to be made of two distinct parts: if an order moves from state A to state B, then:

  1. The order LEAVES state A
  2. Then the order ENTERS state B.

Each of those two instances are treated as if they were actual events. But we add a new column, which we call Direction, which takes values of +1 and -1 depending on whether it refers to a new state or an old one.

So, the data above would look like this:

order_id order_date customer_name event_date event_name warehouse carrier event_count
1 01/01/16 Acme Industries 02/01/16 08:00 Preparing shipment London Deliveries express 1
1 01/01/16 Acme Industries 03/01/16 10:00 Preparing shipment London Deliveries express -1
1 01/01/16 Acme Industries 03/01/16 10:00 Shipped from warehouse London Deliveries express 1
1 01/01/16 Acme Industries 03/01/16 14:30 Shipped from warehouse London Deliveries express -1
1 01/01/16 Acme Industries 03/01/16 14:30 Delivery failed London Deliveries express 1

What we did here is separate the event into two different facts: one with an event_count of +1 marking the entry into a given state and another with an event_count of -1 marking its exit from that state. We shall call these the In and Out events, respectively.

Notice that the timestamp of the Out event should always match the timestamp of the next In event. All other attributes of the Out event are kept from the corresponding In event.

A few things about this model are worth noting:

  • sum(event_count) yields the same result as count(disctinct order_id), as within each order_id all rows are paired up and cancel each other out, except the most recent one;
  • This remains true if we limit our query to a given event_date interval;
  • It still remains true if we limit further our query to a specific attribute (e.g., customer_name);
  • It’s still true if we filter further on a specific event_name, with a caveat: it will return the difference between the number of objects in that state at the end of the interval and the number of objects in that state at the beginning, giving us a net count of objects in/out any given state;
  • It aggregates nicely and includes only additive measures;
  • It can be enriched, for example including an event_age measure in all Out events (defined as zero for all In events), which allow us to calculate average ages of objects in any given state; this measure also allows drilling and filtering;
  • Furthermore, it allow us to calculate the cumulative age of objects, as long as we’re careful, when calculating it, to only take cumulative ages of Out events;
  • We can view the event_count column as a measure, counting how many objects entered minus how many objects left a given state, but also as a dimension, counting In and Out events separately (with the caveat that counts of Out events are negative numbers)
  • As we only insert new rows and never update records, this can be implemented in a Hadoop environment without any changes;
  • And the algorithm can be implemented in a MapReduce type of algorithm, furthering its applicability to Hadoop data stores and providing scalability.

What this table achieves is in fact a generalisation of the Accumulating Snapshot table as described by Kimball, by means of a trade off between the number of columns and the number of rows. If we were to capture events in a process of fixed length, let’s say N milestones, Kimball’s approach requires 1 row and N additional columns, whereas our approach requires 1 additional column and a total of 2N-1 rows.

However, where Kimball’s approach reaches its limit is in the ability to change the length of the process being analysed; adding a new milestone requires changing the table’s structure and filling in missing data, whereas in our approach only new rows have to be added and for all those objects where new events being tracked don’t exist, they are simply not there. In processes where the length varies significantly (multiple delivery attempts, for example), Kimball’s approach will result in many empty cells, whereas in this approach those rows would simply be missing.

This model doesn’t come without its challenges, of course:

  • There’s no practical way to determine how many objects had an event of type X in a time period if there are repeated events. We can only count how many such events occurred, but if an object had multiple events within that time period, it’ll be overcounted;
  • During the ETL it’s necessary to read all “most recent events” for all objects, so we can determine their Out event correctly and then filter them out from the final output; one way to achieve this is to apply partitioning: all In events that don’t yet have a matching Out event are stored in a separate partition which is overwritten on each ETL run;
  • The implementation algorithm will need to sort the input data to properly merge most recent events coming from the target fact table with new incoming events from the source system; if the input data is very small when compared to the global population of objects already being tracked this adds a significant overhead to the ETL process;
  • The algorithm is highly sensitive to late arriving data; if the events arrive in the wrong order and we process the 1st, 3rd and 4th events of an object in one ETL run and only later we read the 2nd event of its lifecycle, there’s no way to insert it into the target table without compromising consistency. A reload of data will be necessary.

In the next article we’re going to see a practical implementation of this fact table using a set of PDI transformations that read new data in CSV files from an inbox folder and append its output data to another CSV file, which we can then open in the tool of our choice to do some analytics with it.

(part 4 of the series is here)

Status change fact table – Part 2 (The input data)

(See part 1 of this series here)

To make matters simpler we’ll assume that a table with a stream of order events already exists and that it’s denormalised, with each row containing all the relevant data. Either the source system already had that information or it was built in an earlier phase of the ETL.

Table order_events:
Order_id int
Order_date datetime
Customer_name string
Event_date datetime
Event_name string
Warehouse string
Carrier string

Here’s a sample of the data we’ll be dealing with:

order_id order_date customer_name event_date event_name warehouse carrier
1 01/01/16 Acme Industries 02/01/16 08:00 Preparing shipment London Deliveries express
1 01/01/16 Acme Industries 03/01/16 10:00 Shipped from warehouse London Deliveries express
1 01/01/16 Acme Industries 03/01/16 14:30 Delivery failed London Deliveries express
1 01/01/16 Acme Industries 03/01/16 18:00 Returned to warehouse London Deliveries express
1 01/01/16 Acme Industries 04/01/16 09:00 Shipped from warehouse London Deliveries express
1 01/01/16 Acme Industries 04/01/16 11:30 Delivery failed London Deliveries express
1 01/01/16 Acme Industries 04/01/16 18:00 Returned to warehouse London Deliveries express
1 01/01/16 Acme Industries 05/01/16 10:30 Shipped from warehouse London Deliveries express
1 01/01/16 Acme Industries 05/01/16 15:00 Delivery successful London Deliveries express
2 01/01/16 Boggs Corp 02/01/16 09:00 Preparing shipment London Deliveries express
2 01/01/16 Boggs Corp 03/01/16 10:00 Shipped from warehouse London Deliveries express
2 01/01/16 Boggs Corp 03/01/16 11:00 Delivery successful London Deliveries express
3 02/01/16 Boggs Corp 03/01/16 08:00 Preparing shipment Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 04/01/16 09:30 Shipped from warehouse Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 04/01/16 15:00 Delivery successful Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 08/01/16 16:30 Returned by customer Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 09/01/16 08:00 Preparing shipment Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 10/01/16 10:00 Shipped from warehouse Liverpool WeDeliverStuff
3 02/01/16 Boggs Corp 10/01/16 14:00 Delivery successful Liverpool WeDeliverStuff

Our data sample has 3 orders, from 2 different customers and that shipped from two different warehouses. One of the orders was only delivered after a couple of failed attempts and in another the products were returned and had to be replaced.

Our goal in the next article is to build a fact table that can track all events that happened to all orders and provide meaningful analytics to the client.

In the next part of this series we’re going to explore a data model that allows analysing this dataset.

Status change fact table – Part 1 (The problem)

In this series of articles we’re going to address a business problem that is traditionally solved, although only partially, by an accumulating snapshot fact table. We will propose a new type of fact table that not only generalizes the concept of the accumulating snapshot, as described by Ralph Kimball, but also provides an implementation that doesn’t require updates, which makes it Hadoop friendly, and can be implemented by a MapReduce algorithm.

So, here’s the description of the business problem we got from our (fictional) client:

We need to analyse data pertaining to orders and order fulfilment. We have a database where order events are stored, indicating when an order is placed by a client, when it’s prepared for shipment at our warehouses, when it dispatches and when it’s delivered. In some cases shipments are sent back by the client, if we shipped the wrong products or if some item is defective; in that case the order may be shipped again, cancelled altogether, modified, etc. Also, in some cases the delivery attempt fails and it may need to be retried one or more times by our courier.

We need to create reports to answer, at least, the following questions:

  1. How many orders were placed/shipped/delivered on a given time period?
  2. What’s the number of orders returned to the warehouse by any reason other than “could not deliver” (defective or broken products; wrong products shipped; delivered past its due date; etc.)?
  3. How many orders are in our backlog waiting to be dispatched? How does that compare to the backlog last week/month/year?
  4. How many orders are delivered but haven’t been paid yet? How many payments are overdue, given our standard 30 day payment terms? How does that compare with the same indicator last week/month/year?
  5. Which warehouses are more likely to have orders returned due to defective items?
  6. Which carriers are more likely to break items during delivery?
  7. Of course, for all these questions, we may need to drill down by customer, carrier, warehouse, etc.

All the data for our analytics solution must be hosted in a Hadoop cluster and data processing should be done directly in the cluster as much as possible (e.g. in MapReduce jobs).

From the business questions we can see how an accumulating snapshot fact table would be able to address some of the questions, by inserting new facts when an order is placed and updating them for each milestone in the order’s life cycle: prepared, shipped, delivered, paid. We can also add a couple more milestones, namely for shipment returns, delivery retries, etc. But this has several limitations.

One limitation of the accumulating snapshot is that we have to define which milestones are worth recording beforehand. If an order is retried more than once we can only keep track of a fixed number of retries; likewise, an order may be returned multiple times, but we must determine beforehand how many order returns can be tracked. And there’s no way to track a variable and arbitrarily large number of milestones. As time goes by, more variety is expected in the data and it may happen that although most orders go through 5 or 6 stages in their lifecycle, some have well over 20 stages including a varying number of delivery attempts or product returns. And in a few exotic examples, there may be over 100 milestones for one particular order.

Another limitation is that the accumulating snapshot isn’t future proof. Even if we accept the limitation of being able to track only a fixed number of delivery attempts and product returns, if the client asks us to also keep track of late payments and the number of actions taken to collect an overdue invoice (emails, letters, phone calls) we need to add new columns to what is already a very wide fact table.

Finally, this is not Hadoop friendly. HDFS is a “write once, read many times” filesystem and doesn’t offer the chance to easily update records. We could try versioning records, but that places an extra burden on the query engine to perform the analytics, requiring grouping facts by order ID and retrieving only the latest version of each fact to get the most current state. And those queries will necessarily be slower than your typical OLAP query,

Select dim_table.attribute, sum(fact_table.measure)
from fact_table, dim_table
where fact_table.key = dim_table.key
group by dim_table.attribute

In the next article of this series we’ll see a sample of the data we need to process.

(part 2 of the series here