Since July 2022, I fetch the inventory of roughly 1500 Paris Velib stations every minute from the open source API, and stream it in a BigQuery dataset. That’s 2.1 million rows per day. Then, to reduce raw tables size, and to prepare data for visualizations, I have scheduled a number of transformations in a GCP-native ETL pipeline, entirely with BigQuery Scheduled Queries and GCP Cloud Scheduler+Pub/Sub+Cloud Functions.
Everything is run with no-ops GCP functions, and no maintenance has been required for more than a year. The montly cost for the whole process and storage is modest, less than 5€ per month.
The final data is exposed in real-time as maps and tables with Looker Studio, and can be seen on the Vélib tracker:
ETL pipeline
Schematically, this is the data architecture of the ETL pipeline:
BigQuery tables
7 BigQuery tables are generated, accounting for about 4GB of storage per year:
velib_inventory_latest
gets latest inventory update for every station. The table is truncated at each API call, every minute.- Update frequency:
minute
- Usage: map of live Vélib inventory
velib_inventory_full
keeps the full inventory, at every minute, of each Vélib station for the last 2 days. A row is appended at each API call, for every station.- Update frequency:
minute
- Usage: inventory of last 24 hours for each Vélib station
velib_inventory_compact
is a “compacted” version of the full inventory tablevelib_inventory_full
. All duplicate consecutive rows are removed, which reduces the number of rows by about 90%.- Update frequency:
daily
velib_stations
lists each Vélib station with its attributes such as name, coordinates and capacity. Attributes are not supposed to change often, so there’s only a daily check from Vélib API – and it’s still overkill, but it’s cheap to store.- Update frequency:
daily
velib_inventory_hourly
computes the average inventory by hour and weekday, over the last 30 days, for each station.- Update frequency:
weekly
- Usage: hourly availability graph
velib_stations_distances
computes the distance between each station and all others, which is used for getting nearby stations. For 1500 stations, the query is computationally quite expensive.- Update frequency:
monthly
velib_stations_features
provide additional attributes for each Vélib station, coming from a manual Google Sheet.- Update frequency:
manual
WITH sub1 AS (
SELECT
*,
LAG(num_docks_available) OVER (PARTITION BY station_id ORDER BY updated_at) AS lag_num_docks_available,
LAG(num_bikes_available) OVER (PARTITION BY station_id ORDER BY updated_at) AS lag_num_bikes_available,
LAG(num_bikes_available_mechanical) OVER (PARTITION BY station_id ORDER BY updated_at) AS lag_num_bikes_available_mechanical,
LAG(num_bikes_available_ebike) OVER (PARTITION BY station_id ORDER BY updated_at) AS lag_num_bikes_available_ebike
FROM `sandbox-246007.parigovelo.velib_inventory_full`
WHERE
DATE(updated_at) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
),
sub2 AS (
SELECT
sub1.*,
IF(
lag_num_docks_available IS NULL
OR lag_num_docks_available != num_docks_available
OR lag_num_bikes_available != num_bikes_available
OR num_bikes_available_mechanical != lag_num_bikes_available_mechanical
OR num_bikes_available_ebike != lag_num_bikes_available_ebike,
updated_at, NULL) AS last_updated_at
FROM sub1
)
SELECT DISTINCT
station_id,
num_docks_available,
num_bikes_available,
num_bikes_available_mechanical,
num_bikes_available_ebike,
is_installed,
is_returning,
is_renting,
last_updated_at
FROM sub2
WHERE last_updated_at IS NOT NULL
/* Create hourly grid with a special BigQuery function */
WITH slots AS (
SELECT
slot AS time_slot
FROM UNNEST(GENERATE_TIMESTAMP_ARRAY(
(SELECT TIMESTAMP(MAX(time_slot)) FROM `parigovelo.velib_inventory_hourly`),
TIMESTAMP(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)),
INTERVAL 1 HOUR
)) AS slot
),
/* Get latest stations information */
stations AS (
SELECT DISTINCT
station_id,
station_code
FROM `parigovelo.velib_stations`
WHERE updated_at = (SELECT MAX(updated_at) FROM `parigovelo.velib_stations`)
),
/* Join grid of hourly slots with stations */
slots_stations AS (
SELECT
slots.time_slot,
stations.station_id,
stations.station_code
FROM slots
CROSS JOIN stations
),
/* Find average and last updated inventory for each timeslot and each station */
inventory AS (
SELECT
slots_stations.time_slot,
slots_stations.station_id,
slots_stations.station_code,
MIN(inv.last_updated_at) AS slot_first_update_at,
MAX(inv.last_updated_at) AS slot_last_update_at,
AVG(inv.num_docks_available) AS avg_num_docks,
AVG(inv.num_bikes_available_mechanical) AS avg_num_bikes_mechanical,
AVG(inv.num_bikes_available_ebike) AS avg_num_bikes_ebike,
FROM slots_stations
LEFT JOIN `parigovelo.velib_inventory_compact` AS inv
ON slots_stations.station_id = inv.station_id
AND inv.last_updated_at < DATE_ADD(slots_stations.time_slot, INTERVAL 1 HOUR)
AND inv.last_updated_at >= slots_stations.time_slot
GROUP BY 1,2,3
),
/* Fill in the blank update times */
inventory_full AS (
SELECT
inventory.*,
LAST_VALUE(inventory.slot_last_update_at IGNORE NULLS) OVER (
PARTITION BY inventory.station_id
ORDER BY inventory.time_slot
ROWS UNBOUNDED PRECEDING
) AS latest_update_at
FROM inventory
)
/* For empty time slots, get the latest previous inventory */
SELECT
inv_full.time_slot,
inv_full.station_id,
inv_full.station_code,
inv_full.latest_update_at,
COALESCE(inv_full.avg_num_docks, fill.num_docks_available) AS avg_num_docks,
COALESCE(inv_full.avg_num_bikes_mechanical, fill.num_bikes_available_mechanical) AS avg_num_bikes_mechanical,
COALESCE(inv_full.avg_num_bikes_ebike, fill.num_bikes_available_ebike) AS avg_num_bikes_ebike,
FROM inventory_full AS inv_full
LEFT JOIN `parigovelo.velib_inventory_compact` AS fill
ON inv_full.station_id = fill.station_id
AND inv_full.latest_update_at = fill.last_updated_at
ORDER BY station_code, time_slot
WITH sub1 AS (
SELECT DISTINCT
station_id,
station_code,
latitude,
longitude
FROM `parigovelo.velib_stations`
WHERE
station_code IS NOT NULL
AND latitude IS NOT NULL
AND longitude IS NOT NULL
),
sub2 AS (
SELECT
station_id,
station_code,
ST_GEOGPOINT(longitude, latitude) AS coord
FROM sub1
),
sub3 AS (
SELECT
stn_from.station_id AS station_from_id,
stn_to.station_id AS station_to_id,
stn_from.station_code AS station_from_code,
stn_to.station_code AS station_to_code,
CAST(ST_DISTANCE(stn_from.coord, stn_to.coord) AS INT64) AS distance
FROM sub2 AS stn_from
CROSS JOIN sub2 AS stn_to
)
SELECT DISTINCT *
FROM sub3
WHERE
station_from_id != station_to_id
AND distance <= 1000
ORDER BY station_from_code, distance