yun's attic

Lateral Join with SQLAlchemy

When handling timeseries data, quite often you may want to resample the data at a different frequency and use it that way.

One way to achieve this is to load all data with Python, and resample or reindex it with Pandas.

An alternative is to query directly in SQL by using a pattern like the one below. This allows you to only get the most recent data at each sample point you’re interested in. This particular example samples the exchange rate between JPY and USD every two hours (in the actual database, I have some more recent data at 1 min intervals, and the rest at either 5 min or 30 min intervals):

select *
from generate_series('2020-01-01', '2020-01-15', '2 hour'::interval) sampled_at
join lateral (
    select *
    from alphavantage.fx f
    where f.ts <= sampled_at
    and currency_from = 'JPY'
    and currency_to = 'USD'
    order by f.ts desc
    limit 1
) p on true

Since I’m down on the SQLAlchemy ORM road, why not try to write the query there as well!

I found the SQLAlchemy documentation quite difficult to grapple with at times, and I get the impression that there are many, many different ways to accomplish the same thing. At some point I got quite overwhelmed and frustrated after reading page after page of docs that doesn’t directly answer the question I have.

Then I came across this blog post by Adam Gligor, which really helped me comb my thoughts and finally decide on how I want to approach it.

First take care of all the imports:

import datetime as dt

from sqlalchemy import and_, desc, true
from sqlalchemy.orm import aliased, sessionmaker
from sqlalchemy.sql import func
from sqlalchemy.sql.expression import join

from .constants import DB_ENGINE
from .models import FxPrice

_Session = sessionmaker(bind=DB_ENGINE)
session = _Session()

I wrote this subquery to generate timeseries that I want to resample the data on, and aliased it:

start_ts, end_ts = dt.datetime(2020, 1, 1), dt.datetime(2020, 2, 1)
interval = dt.timedelta(hours=2)
subquery_ts = (
    session.query(
        func.generate_series(start_ts, end_ts, interval).label(
            "sampled_at"
        )
    )
    .subquery()
    .lateral()
)
a = aliased(subquery_ts)

The aliased() function does something like this in SQL:

select * from some_table as some_alias
...

It essentially allows you to give a name to some (intermediate or temporary) table, so that you can reference it later, either when joining it with another table, or joining it with itself.

Then I wrote another subquery to get the most recent price update at each time point I want to sample. Note that here I compared the timestamp in FxPrice table with a.c.sampled_at, where a is the aforementioned alias to the table created using generate_series. (The .c in a.c is for columns, so a.c.sampled_at gets you the column named sampled_at.)

Without the aliasing I wouldn’t be able to achieve this effect; instead it would create a nested subquery and that’s not what I want.

subquery_price = (
    session.query(FxPrice)
    .filter(
        and_(
            FxPrice.ts <= a.c.sampled_at,
            FxPrice.currency_from == "JPY",
            FxPrice.currency_to == "USD",
        )
    )
    .order_by(desc(FxPrice.ts))
    .limit(1)
    .subquery()
    .lateral()
)

Finally, putting everything together:

results = (
    session.query(
        subquery_price.c.ts,
        a,
        subquery_price.c.currency_from,
        subquery_price.c.currency_to,
        subquery_price.c.price,
    )
    .select_from(a)
    .join(subquery_price, true())
    .all()
)

Which actually produces the following query:

SELECT anon_1.ts, anon_2.sampled_at, anon_1.currency_from, anon_1.currency_to, anon_1.price
FROM (
    SELECT generate_series(:generate_series_1, :generate_series_2, :generate_series_3) AS sampled_at
    ) AS anon_2
JOIN LATERAL (
    SELECT alphavantage.fx.created_at AS created_at, alphavantage.fx.ts AS ts, alphavantage.fx.currency_from AS currency_from, alphavantage.fx.currency_to AS currency_to, alphavantage.fx.price AS price
    FROM alphavantage.fx
    WHERE alphavantage.fx.ts <= anon_2.sampled_at
    AND alphavantage.fx.currency_from = :currency_from_1
    AND alphavantage.fx.currency_to = :currency_to_1
    ORDER BY alphavantage.fx.ts DESC
    LIMIT :param_1
    ) AS anon_1
ON true

where the parameters are:

 {
    'generate_series_1': datetime.datetime(2020, 1, 1, 0, 0),
    'generate_series_2': datetime.datetime(2020, 2, 1, 0, 0),
    'generate_series_3': datetime.timedelta(seconds=7200),
    'currency_from_1': 'JPY',
    'currency_to_1': 'USD',
    'param_1': 1
 }
Menu