Pathway: unlocking data stream processing [Part 1] - real-time linear regression
By Olivier Ruas
We have now entered the era of data. Data is everywhere, and people who know how to use the data have become the new stars of the industry, with the proliferation of new data titles: data engineer, data scientist, data analyst, data ops engineer...
The reasons for this data boom are two-fold: we can now process the data –both in terms of hardware and software– and the data is large enough to train models. To increase the value of data analytics, more and more data is generated: new captors are put everywhere, broadcasting all the time. This trend, sometimes dubbed Internet-of-Things, had some unexpected consequences: your fridge and lamps can be connected to the internet, and you can even have a bitcoin miner heater!
All these connectors created a new kind of data: streaming data. The data is now organized in data streams: a never ending stream of new data points. The temporal nature of this data makes it extremely useful for data analytics such as prediction. However, it raises a new issue: how can we train a model on dynamic data?
Usually, we train our models using a batch of data, assuming that the distribution of the data will remain the same.
we assume that the data will behave in the future the same way it has in the past: this assumption is impossible for streaming data
.
We need a new way to keep our models up-to-date and relevant according to the latest data points.
How to keep up with changing data? In this article, we will use Pathway. Pathway is a data processing framework that takes care of streaming data updates for you. You can build your processing pipeline (data preprocessing, data modeling, etc.), and then Pathway will ingest the new streaming data points and maintain your models and tables updated in realtime!
Let’s see how to create a simple linear regression model on data streams!
A not so simple linear regression
A simple linear regression is a linear approach to modeling the relationship between data points using a single explanatory variable. In practice, we have a data stream of data points (xi,yi), and we want to compute the two parameters (a,b) so that, for each point (xi,yi), yi can be approximated by yi ≈ a+b*xi.
In a nutshell, we have a set of points, and we want to draw a line that approximates the best of all those points.
While being one of the simplest models, linear regression is also one of the most used: it is frequently used to estimate trends such as sales forecasting or house pricing.
We are not going to explain the mathematical details here; if you are interested, you can find all the details in the Wikipedia article.
Why computing a linear regression in realtime is hard
So, what’s wrong with the standard approach? Why can’t we just apply the formula and iterate every time a new point is received?
Because the formulas for the slope and the intercept rely on the difference between each xi and the sum of the data points: you need to recompute the sums, which is the same as starting from scratch at every update. This would be time-consuming and prevent the model from being updated in realtime.
Furthermore, we cannot use only a subset of the data points as the data may be changing. As an example, consider these 10 data points:
We could simply do the regression on these points and obtain the red line. But let's take a look at what happens further in time after 30 new points have been added:
In red, we have the previous regression, and in green, the one computed on all 40 points: they are clearly diverging!
The first model was a clear overfitting and did not really represent the reality of the data. To avoid such overfitting, we need to incorporate all the data we have, and not only a subset!
Real time updates of your model
Pathway is a Python framework for streaming data processing: you create your pipeline as if you were handling static and finite data, and Pathway will manage the updates for you!
Let’s compute the regression using Pathway. We assume that we have a connector that inputs the data points (xi,yi) into a table t. We need to compute the sum of the xi, of the xi2, of the yi, and of the xi*yi and the total number of data points received until then. This is done as follows:
import pathway as pw
t = pw.connector(input_data_stream)
t = t.select(
*pw.this,
x_square=t.x * t.x,
x_y=t.x * t.y
)
statistics_table = t.reduce(
count=pw.reducers.count(),
sum_x=pw.reducers.sum(t.x),
sum_y=pw.reducers.sum(t.y),
sum_x_y=pw.reducers.sum(t.x_y),
sum_x_square=pw.reducers.sum(t.x_square),
)
Then we can compute the estimation of a and b:
def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count):
d = count * sum_x_square - sum_x * sum_x
if d == 0:
return 0
else:
return (sum_y * sum_x_square - sum_x * sum_x_y) / d
def compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count):
d = count * sum_x_square - sum_x * sum_x
if d == 0:
return 0
else:
return (count * sum_x_y - sum_x * sum_y) / d
results_table = statistics_table.select(
a=pw.apply(compute_a, **statistics_table),
b=pw.apply(compute_b, **statistics_table),
)
And that’s it! Whenever a new data point is inserted inside the table t, Pathway will automatically update all the tables, updating the values a and b of the table results_table.
We will insert a list of data points around the line y=x (i.e. a=0 and b=1), with a small error on the y values:
(0,0.06888437030500963)
(1,1.0515908805880605)
(2,1.984114316166169)
(3,2.9517833500585926)
(4,4.002254944273722)
(5,4.980986827490083)
(6,6.056759717806955)
(7,6.9606625452157855)
(8,7.995319390830471)
(9,9.016676407891007)
By inserting those values into the table t, we obtain the following values in the table results_table:
a,b,time,diff
0,0,0,1
0,0,1,-1
0.06888437030500971,0.9827065102830508,1,1
0.06888437030500971,0.9827065102830508,2,-1
0.07724821608916699,0.9576149729305795,2,1
0.0769101730536299,0.9581220374838857,3,1
0.07724821608916699,0.9576149729305795,3,-1
0.05833884879671927,0.9766933617407955,4,1
0.0769101730536299,0.9581220374838857,4,-1
0.05087576879874134,0.9822906717392795,5,1
0.05833884879671927,0.9766933617407955,5,-1
0.03085078333935821,0.9943056630149089,6,1
0.05087576879874134,0.9822906717392795,6,-1
0.03085078333935821,0.9943056630149089,7,-1
0.03590542987734715,0.9917783397459139,7,1
0.03198741430177742,0.9934574892783012,8,1
0.03590542987734715,0.9917783397459139,8,-1
0.025649728471303895,0.9958341214647295,9,1
0.03198741430177742,0.9934574892783012,9,-1
Each row contains the current values of a and b, the logical time of the update, and a flag diff representing whether the row represents an addition or a deletion. An update is represented by two rows: one to remove the old value and one to add the new values. Those two rows have the same logical clock to ensure the atomicity of the operation.
As we can see, the obtained values converged to what we expected (a=0 and b=1). These values were updated whenever new values were added in realtime!
To learn more about how to connect data streams into Pathway, you can take a look at the full example on Pathway’s website.
Conclusion
I hope you are convinced that data streams are as challenging as they are an opportunity. This huge amount of data is extremely valuable, but it cannot be processed the same way we process static data.
Pathway simplifies the processing of data streams and the building of ML models over it: you can build your processing pipeline as if you were handling static data, and then the Pathway engine will manage all the updates for you.
I strongly recommend you not to take everything I said as granted but to test Pathway by yourself 🙂
All rights reserved ProtoGrowth Inc, India. Links are provided for informational purposes and do not imply endorsement. All views expressed in this newsletter are my own and do not represent current, former, or future employers’ opinions.