📖

How I use live open data for Paris Vélib

Category
Data Science
Published on
August 5, 2023

Since July 2022, I fetch the inventory of roughly 1500 Paris Velib stations every minute from the open source API, and stream it in a BigQuery dataset. That’s 2.1 million rows per day. Then, to reduce raw tables size, and to prepare data for visualizations, I have scheduled a number of transformations in a GCP-native ETL pipeline, entirely with BigQuery Scheduled Queries and GCP Cloud Scheduler+Pub/Sub+Cloud Functions.

Everything is run with no-ops GCP functions, and no maintenance has been required for more than a year. The montly cost for the whole process and storage is modest, less than 5€ per month.

The final data is exposed in real-time as maps and tables with Looker Studio, and can be seen on the Vélib tracker:

🚲
Live inventory of Paris Vélib stations

ETL pipeline

Schematically, this is the data architecture of the ETL pipeline:

image

BigQuery tables

7 BigQuery tables are generated, accounting for about 4GB of storage per year:

  • velib_inventory_latest gets latest inventory update for every station. The table is truncated at each API call, every minute.
  • velib_inventory_full keeps the full inventory, at every minute, of each Vélib station for the last 2 days. A row is appended at each API call, for every station.
  • velib_inventory_compact is a “compacted” version of the full inventory table velib_inventory_full. All duplicate consecutive rows are removed, which reduces the number of rows by about 90%.
    • Update frequency: daily
    • Query
      WITH sub1 AS (
        SELECT 
          *,
          LAG(num_docks_available) OVER (PARTITION BY station_id ORDER BY updated_at) AS lag_num_docks_available,
          LAG(num_bikes_available) OVER (PARTITION BY station_id ORDER BY updated_at) AS lag_num_bikes_available,
          LAG(num_bikes_available_mechanical) OVER (PARTITION BY station_id ORDER BY updated_at) AS lag_num_bikes_available_mechanical,
          LAG(num_bikes_available_ebike) OVER (PARTITION BY station_id ORDER BY updated_at) AS lag_num_bikes_available_ebike
        FROM `sandbox-246007.parigovelo.velib_inventory_full` 
        WHERE 
          DATE(updated_at) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
      ),
      
      sub2 AS (
      SELECT
        sub1.*,
        IF(
          lag_num_docks_available IS NULL
          OR lag_num_docks_available != num_docks_available 
          OR lag_num_bikes_available != num_bikes_available
          OR num_bikes_available_mechanical != lag_num_bikes_available_mechanical
          OR num_bikes_available_ebike != lag_num_bikes_available_ebike, 
        updated_at, NULL) AS last_updated_at
      FROM sub1
      )
      
      SELECT DISTINCT
        station_id,
        num_docks_available,
        num_bikes_available,
        num_bikes_available_mechanical,
        num_bikes_available_ebike,
        is_installed,
        is_returning,
        is_renting,
        last_updated_at
      FROM sub2
      WHERE last_updated_at IS NOT NULL

  • velib_stations lists each Vélib station with its attributes such as name, coordinates and capacity. Attributes are not supposed to change often, so there’s only a daily check from Vélib API – and it’s still overkill, but it’s cheap to store.
    • Update frequency: daily
  • velib_inventory_hourly computes the average inventory by hour and weekday, over the last 30 days, for each station.
    • Update frequency: weekly
    • Query
      /* Create hourly grid with a special BigQuery function */
      WITH slots AS (
          SELECT 
              slot AS time_slot
          FROM UNNEST(GENERATE_TIMESTAMP_ARRAY(
              (SELECT TIMESTAMP(MAX(time_slot)) FROM `parigovelo.velib_inventory_hourly`), 
              TIMESTAMP(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)),
              INTERVAL 1 HOUR
          )) AS slot
      ),
      
      /* Get latest stations information */
      stations AS (
          SELECT DISTINCT
              station_id,
              station_code
          FROM `parigovelo.velib_stations`
          WHERE updated_at = (SELECT MAX(updated_at) FROM `parigovelo.velib_stations`)
      ),
      
      /* Join grid of hourly slots with stations */
      slots_stations AS (
          SELECT
              slots.time_slot,
              stations.station_id,
              stations.station_code
          FROM slots
          CROSS JOIN stations
      ),
      
      /* Find average and last updated inventory for each timeslot and each station */
      inventory AS (
          SELECT 
              slots_stations.time_slot,
              slots_stations.station_id,
              slots_stations.station_code,
              MIN(inv.last_updated_at) AS slot_first_update_at,
              MAX(inv.last_updated_at) AS slot_last_update_at,
              AVG(inv.num_docks_available) AS avg_num_docks,
              AVG(inv.num_bikes_available_mechanical) AS avg_num_bikes_mechanical,
              AVG(inv.num_bikes_available_ebike) AS avg_num_bikes_ebike,
          FROM slots_stations
          LEFT JOIN `parigovelo.velib_inventory_compact` AS inv
              ON slots_stations.station_id = inv.station_id
              AND inv.last_updated_at < DATE_ADD(slots_stations.time_slot, INTERVAL 1 HOUR)
              AND inv.last_updated_at >= slots_stations.time_slot
          GROUP BY 1,2,3
      ),
      
      /* Fill in the blank update times */
      inventory_full AS (
          SELECT
              inventory.*,
              LAST_VALUE(inventory.slot_last_update_at IGNORE NULLS) OVER (
                  PARTITION BY inventory.station_id
                  ORDER BY inventory.time_slot
                  ROWS UNBOUNDED PRECEDING
              ) AS latest_update_at
          FROM inventory
      )
      
      /* For empty time slots, get the latest previous inventory */
      SELECT 
          inv_full.time_slot,
          inv_full.station_id, 
          inv_full.station_code,
          inv_full.latest_update_at,
          COALESCE(inv_full.avg_num_docks, fill.num_docks_available) AS avg_num_docks,
          COALESCE(inv_full.avg_num_bikes_mechanical, fill.num_bikes_available_mechanical) AS avg_num_bikes_mechanical,
          COALESCE(inv_full.avg_num_bikes_ebike, fill.num_bikes_available_ebike) AS avg_num_bikes_ebike,
      FROM inventory_full AS inv_full
      LEFT JOIN `parigovelo.velib_inventory_compact` AS fill
          ON inv_full.station_id = fill.station_id
          AND inv_full.latest_update_at = fill.last_updated_at
      ORDER BY station_code, time_slot
    • Usage: hourly availability graph
    • image

  • velib_stations_distances computes the distance between each station and all others, which is used for getting nearby stations. For 1500 stations, the query is computationally quite expensive.
    • Update frequency: monthly
    • Query
      WITH sub1 AS (
        SELECT DISTINCT
          station_id,
          station_code,
          latitude,
          longitude
        FROM `parigovelo.velib_stations`
        WHERE 
          station_code IS NOT NULL
          AND latitude IS NOT NULL
          AND longitude IS NOT NULL
      ),
      
      sub2 AS (
        SELECT
          station_id,
          station_code,
          ST_GEOGPOINT(longitude, latitude) AS coord
        FROM sub1
      ),
      
      sub3 AS (
        SELECT
          stn_from.station_id AS station_from_id,
          stn_to.station_id AS station_to_id,
          stn_from.station_code AS station_from_code,
          stn_to.station_code AS station_to_code,
          CAST(ST_DISTANCE(stn_from.coord, stn_to.coord) AS INT64) AS distance
        FROM sub2 AS stn_from
        CROSS JOIN sub2 AS stn_to
      )
      
      SELECT DISTINCT *
      FROM sub3
      WHERE 
        station_from_id != station_to_id
        AND distance <= 1000
      ORDER BY station_from_code, distance

  • velib_stations_features provide additional attributes for each Vélib station, coming from a manual Google Sheet.
    • Update frequency: manual