RFC - Future Design - Time Series & Stateful Stream Processing

Future Time-series Architecture

Summary

Over the past 6 years of experimenting with different designs for time-series data, and the more recent designs for integrating time-series data into GraphQL and the Rhize Data Hub, our ability to reason about the different types of data and requirements has evolved.

This post tries to capture that understanding as the basis for discussion on what the future of time-series should look like.

Retrospective

Motivation

What use-cases are time-series use-cases? What makes time-series different?

  • We are mostly taking about machine data when we talk about time-series data. The Rhize Data Hub allows you to connect to external streaming data sources like OPC-UA servers or MQTT Brokers, and allows you to ā€˜bindā€™ topics from those external data sources to equipment properties in your equipment model.
  • This is traditionally the role of the ā€˜Process Historianā€™ and is primarily concerned with Observed data. I.e. the value of a sensor was observed at a point in time. That observation has a time-stamp, and a set of observations across time creates a time-series.

Time Series use-cases often include the need to Infer context for the observation. for example:

  • What type of equipment is the sensor attached to?
  • What product is the equipment producing at the time of the observation?
  • Which shift is operating the equipment at the time of the observation?

Time Series use-cases also include the need to derive information from the observations. Examples include:

  • Calculating the volume of liquid in a vessel based on the observation of a level sensor
  • Aggregating the total amount of time that an observed value stayed within a specified range
  • Calculating the % of waste over a particular time interval by considering the total production and the total rejects
  • Determining the time lost due to running at less than the desired speed by calculating the area between to observations over time

Some of these calculations only need to be determined at the time of running a query, but others may need to be used in the generation of events. (see RFC - Design Doc - Event Publishing )

Where there is a need to persist Derived or Inferred information, complications can arise when the underlying information changes. This can and does happen in scenarios like:

  • A calcuation changes which invalidates the previously persisted results
  • Context information changes historically: An example would be where the Production Order details for a machine where updated for a previously run order.
  • Data arrives late: This often occurs with lab test results which can arrive some days or weeks after production has occurred. Can also occur if source systems are not capable of publishing real-time events, and instead publish information in batches.
  • late arriving data can cause derived or inferred information to be invalidated.

Stateful calculations can also be required for some use-cases, particularly those relating to ā€œBottom-Upā€ MES applications that infer the state of the process by monitoring the state of the equipment against defined rules.

A stateful calculation refers to a computational process or algorithm that maintains and updates internal state information across multiple invocations or iterations. Unlike stateless calculations, where each computation is independent and produces an output based solely on the input parameters, stateful calculations rely on the accumulation and retention of intermediate state between computations.
Examples of stateful calculations include:

  1. Running Totals:
  • Calculating the cumulative sum of a sequence of values over time. The stateful calculation maintains the current total as it processes each input value, updating the total with each new value.
  1. Moving Averages:
  • Computing the average of a sliding window of values from a data stream. The stateful calculation maintains a buffer of recent data points and updates the average as new values are added to the window while removing older values.
  1. Iterative Algorithms:
  • Implementing algorithms that require multiple iterations to converge on a solution, such as optimization algorithms or machine learning algorithms like gradient descent. The stateful calculation maintains the current state of the computation between iterations, updating parameters or variables based on the results of each iteration.
  1. Session Management:
  • Tracking the state of user sessions in web applications or distributed systems. The stateful calculation maintains session information, such as user authentication status, session duration, and activity logs, across multiple HTTP requests or interactions.
  1. State Machines:
  • Modeling stateful systems or processes using state machines, where the system transitions between different states based on input events or conditions. The stateful calculation maintains the current state of the system and transitions between states as events occur.
    Historical changes can invalidate the accumulation and retention of intermediate state in stateful calculations causing them to become invalid.

In Transactional source applications, data integrity structures and data validation techniques are used to ensure that errors are generated when invalid scenarios occur. Transactions only move forward in time, and even when historical adjustments are required, like a journal entry in a financial ledger, that adjustment is applied at the current date, and marked with an effective data in the past. History is never altered.

In a DataHub that must handle scenarios of late arriving data, and historical updates, the need for solutions that handle Back-Filling and Re-Statement arise. These solutions are inherently complex.

So what should be considered when historical changes happen?

  • How can we determine the impact of a historical change to data?
    Not all changes will invalidate the current state. Consider a moving average. If the change is prior to the moving average window, the current Moving Average value will not be effected. Only the values in the window of the historical value change will be impacted.
  • Should the historical change cause new historical ā€œEventsā€ to be published?
    In a real-time, event-driven architecture, it is very important to separate historical events from current events. Imagine the confusion if events occurring as the result of a back-filling operations were not handled separately to currently occurring events.

Guide-level explanation

Reference-level explanation

Drawbacks

Rationale and alternatives

Prior art

Unresolved questions

Future possibilities

Great questions. Iā€™ve been focused on the runtime calculations/queries, and itā€™s good to consider the event sideā€”unfortunately, no real answers, just ramblings and more questions.

If we assume the calculations are pure stream processors and that we want to republish historical events, I can imagine this sequence:

  • Determine the scope of change based on the algorithm and timestamp [ingester]
  • Collect relevant data from time-series DB [querier]
  • Send data to calculator (stream processor) [querier]
  • Generate events based on data received [calculator]
  • Publish to subject reserved for republications [calculator]

An alternative would be to incorporate data retrieval capabilities into the service handling the calculation.

  • Send a message to the calculator that a data observation has changed [ingester]
  • Determine scope of change based on observation [calculator]
  • Retrieve relevant data from time-series database [calculator]
  • Generate events based on data received [calculator]
  • Publish to subject reserved for republications [calculator]

Should We Publish Historical Events

Do consumers have the capability of handling historical events? That would require some work on their end. I agree that injecting historical events into the real-time event stream would cause many problems. We could create additional subjects just for republications and allow consumers the choice to subscribe.

How to Determine the Impact of a Change

Determining the scope seems to be the biggest challenge. For the running total, Iā€™d assume we republished all the events after the change. On the moving average, would we have to republish anything after update-window_size and before update+window_size? Would session management be looking for distinct session_ids that have a minimum/maximum timestamp range that contains the updateā€™s timestamp? These are pretty rough guesses. Iā€™d need to know more about what iterative algorithms and state machine models we use to understand boundary conditions, but I see why this is difficult.

I think we should be considering a generic answer for impact analysis and re-publishing, rather than trying to do something that is function specific.

Apache Flink uses a snapshot mechanism to store that state periodically which creates point at which you can replay from.

If we were to persist state of the calculation over time, we could theoretically replay from any previous record.

There is then the question of what to replay? This is where we may need to store the dependency tree. Example: If you have a calculated property C = A + B, and we get a new historical value for A, we should replay C.

1 Like

Some other things to consider:
Data Historians are more than just time-series db.
Many of the calcs skip the time series db to present real-time values and then store itā€™s output in the historian after initial consumption.
The event queues and snapshot tables are key to low latency.
typically the snapshot table is a key-value store for every process variable.
Majority of the data stored in history use compression and then when needed interpolation calculation provides output.

Yes!

Thats why stateful functions are such an important piece of the puzzle

Interesting, when you think about it.

Dgraph is a distributed, acid compliant, low latency kv-store.

Found this great overview of reliable data pipelines from the guy who wrote Benthos

Makes sense. Maintain a durable log storing event publications and their associated inputs, then a graph of the transitive dependencies of calculations? When an update comes in, seek over the inputs, determine which events are impacted, traverse the dependency graphs, and then you have your scope?