Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

7. Multi-source Ingestion and Polytope

Learning objectives

By the end of this notebook you will be able to:

  • Load data from multiple sources using the same from_source() API

  • Use FieldLists to inspect and select fields before any conversion

  • Align sources on a common grid using earthkit.geo

  • Apply the full preprocessing chain to combined data

  • Understand Polytope and why it matters for DestinE

  • Write a unified multi-variable Zarr store


The single-pipeline argument

Real ML models train on many variables from many sources: ERA5 atmospheric fields, sea surface temperature, soil moisture, satellite observations, DestinE digital twin outputs. Without a unified ingestion path, the preprocessing code becomes a tangle of source-specific adapters.

from_source() eliminates this. The origin of the data — CDS, FDB, S3, Polytope, a local file — is a single string label. Everything downstream is identical.

Setup

import earthkit.data as ekd
from earthkit.geo import regrid
import earthkit.plots as ekp
import xarray as xr
import numpy as np
import os

ekd.settings.set({"cache-policy": "user"})
os.makedirs("data", exist_ok=True)
print("Setup complete")

Source 1 — ERA5 surface fields

Load 2m temperature and mean sea level pressure. We stay in FieldList form until we need array operations.

# DATA: era5-2t-msl-1985122512.grib — 2t, msl
fl1 = ekd.from_source("sample", "era5-2t-msl-1985122512.grib").to_fieldlist()
print(f"Source 1: {len(fl1)} fields")
fl1.ls()

Source 2 — ERA5 pressure-level fields

A second source — different variables, different vertical coordinate, same from_source() call.

# DATA: tuv_pl.grib — t, u, v on pressure levels (7 x 12 grid)
fl2 = ekd.from_source("sample", "tuv_pl.grib").to_fieldlist()
print(f"Source 2: {len(fl2)} fields")
fl2.head()
# Select wind fields at the lowest available pressure level
levels = sorted({f.metadata('level') for f in fl2})
print(f"\nAvailable levels: {levels}")
fl_wind = fl2.sel({'parameter.variable': ["u", "v"], 'vertical.level': levels[0]})
print(f"Wind fields selected: {len(fl_wind)} (level={levels[0]} hPa)")

Regrid both sources to a common grid

The two sample files are on different grids. Before combining them we regrid both to a shared regular lat-lon grid using earthkit.geo.regrid().

In production, choose the target resolution based on your model. Here we use 4° to fit within both sample domains.

TARGET_GRID = {"grid": [4, 4]}

fl1_r = regrid(fl1, grid=TARGET_GRID)
fl2_r = regrid(fl_wind, grid=TARGET_GRID)

print(f"Source 1 regridded: {len(fl1_r)} fields")
print(f"Source 2 regridded: {len(fl2_r)} fields")

# Inspect the common grid
f_test = fl1_r[0]
f_test.describe('geography')

Convert to xarray and align

Both sources are now on the same grid. Convert to xarray for array operations.

xr1 = fl1_r.to_xarray()
xr2 = fl2_r.to_xarray()

print("Source 1 xarray:", dict(xr1.dims))
print("Source 2 xarray:", dict(xr2.dims))

# Find the common lat/lon extent
common_lat = np.intersect1d(np.round(xr1.latitude.values, 6),
                             np.round(xr2.latitude.values, 6))
common_lon = np.intersect1d(np.round(xr1.longitude.values, 6),
                             np.round(xr2.longitude.values, 6))

xr1_c = xr1.sel(latitude=common_lat, longitude=common_lon, method="nearest")
xr2_c = xr2.sel(latitude=common_lat, longitude=common_lon, method="nearest")
print(f"\nCommon domain: {len(common_lat)} lat x {len(common_lon)} lon")

Apply the preprocessing chain

With both sources on the same grid, apply the unit conversions and normalisation from notebook 3.

from earthkit.meteo import wind

# --- Unit conversions ---
t2m_c   = xr1_c["2t"] - 273.15      # K -> Celsius
msl_hpa = xr1_c["msl"] / 100.0     # Pa -> hPa

# Wind speed from U and V
u = xr2_c["u"].values
v = xr2_c["v"].values
wspd = wind.speed(u, v)

# --- Z-score normalisation ---
def zscore(arr):
    return (arr - arr.mean()) / arr.std()

norm = {
    "2t_norm":   zscore(t2m_c.values).astype(np.float32),
    "msl_norm":  zscore(msl_hpa.values).astype(np.float32),
    "wspd_norm": zscore(wspd).astype(np.float32),
}

for name, arr in norm.items():
    print(f"{name}: shape={arr.shape}  mean={arr.mean():.4f}  std={arr.std():.4f}")

Build the unified Dataset

coords = {"latitude": common_lat, "longitude": common_lon}
dims   = ["latitude", "longitude"]

unified = xr.Dataset({
    k: xr.DataArray(v, dims=dims, coords=coords,
                    attrs={"long_name": k.replace("_", " "), "units": "1"})
    for k, v in norm.items()
})
print(unified)

Visualise the combined fields

fig = ekp.Figure(1, 3, figsize=(16, 4))
for col, var in enumerate(["2t_norm", "msl_norm", "wspd_norm"]):
    fig.add_map(0, col).quickplot(unified[var])
fig.coastlines()
fig.title("Unified multi-source normalised fields")
fig.show()

Write the unified Zarr store

zarr_path = "data/era5_multi_source.zarr"
unified.to_zarr(zarr_path, mode="w")
print(f"Unified store written to {zarr_path}")
print(unified)

Polytope — data access for DestinE

Polytope is ECMWF’s geometric data extraction service and the primary access mechanism for Destination Earth digital twin outputs.

What makes Polytope different?

Traditional extraction downloads a rectangular bounding box. For a time series at a single point, this means downloading an entire global field and discarding 99.9% of it.

Polytope extracts only the data you actually request:

  • A time series at one or more point locations

  • A vertical profile

  • A trajectory (sequence of lat/lon/level/time)

  • A polygon region on an unstructured grid

This can reduce data transfer by up to 99% compared to bounding-box extraction.

API

The earthkit-data Polytope source uses the same from_source() pattern — the FieldList you get back is identical to any other source:

# DATA: Polytope requires credentials and server access.
# Register at https://polytope.ecmwf.int and place your API key in ~/.polytopeapirc
#
# Example — extract a 2m temperature time series at Paris from the DestinE Climate DT:
#
# fl_paris = ekd.from_source(
#     "polytope",
#     collection="destination-earth-climate-dt",
#     request={
#         "class":    "d1",
#         "dataset":  "climate-dt",
#         "expver":   "0001",
#         "model":    "ICON",
#         "param":    "2t",
#         "levtype":  "sfc",
#         "date":     "2020-01-01/to/2020-12-31",
#         "time":     "00:00:00",
#         "step":     "0",
#         "feature": {
#             "type":   "timeseries",
#             "points": [[48.8, 2.3]],   # [lat, lon]
#             "axes":   "date",
#         },
#     },
#     address="https://polytope.ecmwf.int",
# )
#
# The result is a FieldList — identical to any other source.
# fl_paris is then processed with the same regrid/normalise/Zarr chain.

print("Polytope example shown above — credentials required.")
print("See https://polytope.readthedocs.io/ for setup instructions.")

Polytope for DestinE audiences

If you have access to DestinE digital twin outputs via Polytope:

  1. Register at polytope.ecmwf.int

  2. Install the client: pip install polytope-client

  3. Add your API key to ~/.polytopeapirc

  4. The earthkit-data Polytope source handles authentication automatically

  5. The preprocessing chain (regrid, normalise, Zarr) is unchanged — you only change the from_source() label


Summary

You have:

  • Loaded data from two different sources with identical from_source() calls

  • Used FieldLists to inspect and select fields before converting

  • Aligned both sources on a common grid with regrid()

  • Applied the full preprocessing chain: unit conversion, normalisation

  • Written a unified data/era5_multi_source.zarr

  • Seen how Polytope gives efficient access to DestinE outputs


Activity

  1. Replace Source 2 with a local file using ekd.from_source("file", "/path/to/your/file.grib"). Does any downstream code change?

  2. Add a fourth variable: potential temperature from notebook 3. Extend the unified Dataset.

  3. What would change in the Polytope request to extract a vertical profile instead of a time series? Look at the feature.type options in the Polytope docs.

# Extend unified Dataset
unified["theta_norm"] = xr.DataArray(..., dims=dims, coords=coords)