Learning objectives
By the end of this notebook you will be able to:
Load data from multiple sources using the same
from_source()APIUse FieldLists to inspect and select fields before any conversion
Align sources on a common grid using
earthkit.geoApply the full preprocessing chain to combined data
Understand Polytope and why it matters for DestinE
Write a unified multi-variable Zarr store
The single-pipeline argument¶
Real ML models train on many variables from many sources: ERA5 atmospheric fields, sea surface temperature, soil moisture, satellite observations, DestinE digital twin outputs. Without a unified ingestion path, the preprocessing code becomes a tangle of source-specific adapters.
from_source() eliminates this. The origin of the data — CDS, FDB, S3, Polytope, a local file — is a single string label. Everything downstream is identical.
Setup¶
import earthkit.data as ekd
from earthkit.geo import regrid
import earthkit.plots as ekp
import xarray as xr
import numpy as np
import os
ekd.settings.set({"cache-policy": "user"})
os.makedirs("data", exist_ok=True)
print("Setup complete")Source 1 — ERA5 surface fields¶
Load 2m temperature and mean sea level pressure. We stay in FieldList form until we need array operations.
# DATA: era5-2t-msl-1985122512.grib — 2t, msl
fl1 = ekd.from_source("sample", "era5-2t-msl-1985122512.grib").to_fieldlist()
print(f"Source 1: {len(fl1)} fields")
fl1.ls()Source 2 — ERA5 pressure-level fields¶
A second source — different variables, different vertical coordinate, same from_source() call.
# DATA: tuv_pl.grib — t, u, v on pressure levels (7 x 12 grid)
fl2 = ekd.from_source("sample", "tuv_pl.grib").to_fieldlist()
print(f"Source 2: {len(fl2)} fields")
fl2.head()# Select wind fields at the lowest available pressure level
levels = sorted({f.metadata('level') for f in fl2})
print(f"\nAvailable levels: {levels}")
fl_wind = fl2.sel({'parameter.variable': ["u", "v"], 'vertical.level': levels[0]})
print(f"Wind fields selected: {len(fl_wind)} (level={levels[0]} hPa)")Regrid both sources to a common grid¶
The two sample files are on different grids. Before combining them we regrid both to a shared regular lat-lon grid using earthkit.geo.regrid().
In production, choose the target resolution based on your model. Here we use 4° to fit within both sample domains.
TARGET_GRID = {"grid": [4, 4]}
fl1_r = regrid(fl1, grid=TARGET_GRID)
fl2_r = regrid(fl_wind, grid=TARGET_GRID)
print(f"Source 1 regridded: {len(fl1_r)} fields")
print(f"Source 2 regridded: {len(fl2_r)} fields")
# Inspect the common grid
f_test = fl1_r[0]
f_test.describe('geography')Convert to xarray and align¶
Both sources are now on the same grid. Convert to xarray for array operations.
xr1 = fl1_r.to_xarray()
xr2 = fl2_r.to_xarray()
print("Source 1 xarray:", dict(xr1.dims))
print("Source 2 xarray:", dict(xr2.dims))
# Find the common lat/lon extent
common_lat = np.intersect1d(np.round(xr1.latitude.values, 6),
np.round(xr2.latitude.values, 6))
common_lon = np.intersect1d(np.round(xr1.longitude.values, 6),
np.round(xr2.longitude.values, 6))
xr1_c = xr1.sel(latitude=common_lat, longitude=common_lon, method="nearest")
xr2_c = xr2.sel(latitude=common_lat, longitude=common_lon, method="nearest")
print(f"\nCommon domain: {len(common_lat)} lat x {len(common_lon)} lon")Apply the preprocessing chain¶
With both sources on the same grid, apply the unit conversions and normalisation from notebook 3.
from earthkit.meteo import wind
# --- Unit conversions ---
t2m_c = xr1_c["2t"] - 273.15 # K -> Celsius
msl_hpa = xr1_c["msl"] / 100.0 # Pa -> hPa
# Wind speed from U and V
u = xr2_c["u"].values
v = xr2_c["v"].values
wspd = wind.speed(u, v)
# --- Z-score normalisation ---
def zscore(arr):
return (arr - arr.mean()) / arr.std()
norm = {
"2t_norm": zscore(t2m_c.values).astype(np.float32),
"msl_norm": zscore(msl_hpa.values).astype(np.float32),
"wspd_norm": zscore(wspd).astype(np.float32),
}
for name, arr in norm.items():
print(f"{name}: shape={arr.shape} mean={arr.mean():.4f} std={arr.std():.4f}")Build the unified Dataset¶
coords = {"latitude": common_lat, "longitude": common_lon}
dims = ["latitude", "longitude"]
unified = xr.Dataset({
k: xr.DataArray(v, dims=dims, coords=coords,
attrs={"long_name": k.replace("_", " "), "units": "1"})
for k, v in norm.items()
})
print(unified)Visualise the combined fields¶
fig = ekp.Figure(1, 3, figsize=(16, 4))
for col, var in enumerate(["2t_norm", "msl_norm", "wspd_norm"]):
fig.add_map(0, col).quickplot(unified[var])
fig.coastlines()
fig.title("Unified multi-source normalised fields")
fig.show()Write the unified Zarr store¶
zarr_path = "data/era5_multi_source.zarr"
unified.to_zarr(zarr_path, mode="w")
print(f"Unified store written to {zarr_path}")
print(unified)Polytope — data access for DestinE¶
Polytope is ECMWF’s geometric data extraction service and the primary access mechanism for Destination Earth digital twin outputs.
What makes Polytope different?¶
Traditional extraction downloads a rectangular bounding box. For a time series at a single point, this means downloading an entire global field and discarding 99.9% of it.
Polytope extracts only the data you actually request:
A time series at one or more point locations
A vertical profile
A trajectory (sequence of lat/lon/level/time)
A polygon region on an unstructured grid
This can reduce data transfer by up to 99% compared to bounding-box extraction.
API¶
The earthkit-data Polytope source uses the same from_source() pattern — the FieldList you get back is identical to any other source:
# DATA: Polytope requires credentials and server access.
# Register at https://polytope.ecmwf.int and place your API key in ~/.polytopeapirc
#
# Example — extract a 2m temperature time series at Paris from the DestinE Climate DT:
#
# fl_paris = ekd.from_source(
# "polytope",
# collection="destination-earth-climate-dt",
# request={
# "class": "d1",
# "dataset": "climate-dt",
# "expver": "0001",
# "model": "ICON",
# "param": "2t",
# "levtype": "sfc",
# "date": "2020-01-01/to/2020-12-31",
# "time": "00:00:00",
# "step": "0",
# "feature": {
# "type": "timeseries",
# "points": [[48.8, 2.3]], # [lat, lon]
# "axes": "date",
# },
# },
# address="https://polytope.ecmwf.int",
# )
#
# The result is a FieldList — identical to any other source.
# fl_paris is then processed with the same regrid/normalise/Zarr chain.
print("Polytope example shown above — credentials required.")
print("See https://polytope.readthedocs.io/ for setup instructions.")Polytope for DestinE audiences¶
If you have access to DestinE digital twin outputs via Polytope:
Register at polytope.ecmwf.int
Install the client:
pip install polytope-clientAdd your API key to
~/.polytopeapircThe earthkit-data Polytope source handles authentication automatically
The preprocessing chain (regrid, normalise, Zarr) is unchanged — you only change the
from_source()label
Summary¶
You have:
Loaded data from two different sources with identical
from_source()callsUsed FieldLists to inspect and select fields before converting
Aligned both sources on a common grid with
regrid()Applied the full preprocessing chain: unit conversion, normalisation
Written a unified
data/era5_multi_source.zarrSeen how Polytope gives efficient access to DestinE outputs
Activity
Replace Source 2 with a local file using
ekd.from_source("file", "/path/to/your/file.grib"). Does any downstream code change?Add a fourth variable: potential temperature from notebook 3. Extend the unified Dataset.
What would change in the Polytope request to extract a vertical profile instead of a time series? Look at the
feature.typeoptions in the Polytope docs.# Extend unified Dataset unified["theta_norm"] = xr.DataArray(..., dims=dims, coords=coords)