Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.

Issue/209 xrdatarray b #229

Merged
merged 32 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
144e190
first go at using xr.Dataset workflow
peterdudfield Oct 14, 2021
24dd7f4
move from branch where data was too big
peterdudfield Oct 14, 2021
11ef417
move subselect functions to seperate file
peterdudfield Oct 14, 2021
e64d8d8
remove validation script.py, fix netcdf dataset test
peterdudfield Oct 14, 2021
0009158
Merge branch 'main' into issue/209-xrdatarray-b
peterdudfield Oct 14, 2021
02d0887
tidy imports
peterdudfield Oct 14, 2021
19997b1
pylinter + pv_data test data
peterdudfield Oct 14, 2021
ffd16d9
remove commented code
peterdudfield Oct 14, 2021
f6d75c4
add validation of dims in models
peterdudfield Oct 15, 2021
c8438e4
add satellite and nwp specific validation
peterdudfield Oct 15, 2021
8628197
tidy imports
peterdudfield Oct 15, 2021
3467d61
tidy unsed functions
peterdudfield Oct 15, 2021
ca4d5a8
add fake datetime
peterdudfield Oct 15, 2021
11f24d5
tidy files
peterdudfield Oct 15, 2021
ff4fded
move fake functions into separate file
peterdudfield Oct 15, 2021
62f07a0
Merge branch 'main' into issue/209-xrdatarray-b
peterdudfield Oct 15, 2021
585fb65
self PR comments
peterdudfield Oct 15, 2021
4f009c2
add back in fake dataset
peterdudfield Oct 18, 2021
589b93c
__slots__ = ()
peterdudfield Oct 18, 2021
a22b0e0
Jacob PR comments - thanks
peterdudfield Oct 18, 2021
5c16b54
Apply suggestions from code review
peterdudfield Oct 18, 2021
5689d5c
PR comments from Flo
peterdudfield Oct 18, 2021
dc2a1e9
more PR comments
peterdudfield Oct 18, 2021
faab4fe
tidy
peterdudfield Oct 18, 2021
69294b0
update prepare_ml_data script
peterdudfield Oct 18, 2021
ae482fd
1. format datasources correctly,
peterdudfield Oct 18, 2021
de9fbdd
add sun to gcp yaml, and datetime tests
peterdudfield Oct 18, 2021
4bab4ec
fix
peterdudfield Oct 18, 2021
ef48a2d
add option when looking for how much batch data is made
peterdudfield Oct 18, 2021
1a069cf
add sun file to prepare_ml_data
peterdudfield Oct 18, 2021
5d953c4
Merge branch 'main' into issue/209-xrdatarray-b
peterdudfield Oct 18, 2021
7bf0b70
make sure sat and nwp are saved as np.float64
peterdudfield Oct 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@
from nowcasting_dataset.data_sources import SatelliteDataSource
from nowcasting_dataset.data_sources.gsp.gsp_data_source import GSPDataSource
from nowcasting_dataset.data_sources.metadata.metadata_data_source import MetadataDataSource
from nowcasting_dataset.dataset.xr_utils import (
register_xr_data_array_to_tensor,
register_xr_data_set_to_tensor,
)

pytest.IMAGE_SIZE_PIXELS = 128

# need to run these to ensure that xarray DataArray and Dataset have torch functions
register_xr_data_array_to_tensor()
register_xr_data_set_to_tensor()


def pytest_addoption(parser):
parser.addoption(
Expand Down
1 change: 1 addition & 0 deletions nowcasting_dataset/config/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ input_data:
solar_pv_metadata_filename: gs://solar-pv-nowcasting-data/PV/PVOutput.org/UK_PV_metadata.csv
gsp_zarr_path: gs://solar-pv-nowcasting-data/PV/GSP/v1/pv_gsp.zarr
topographic_filename: gs://solar-pv-nowcasting-data/Topographic/europe_dem_1km_osgb.tif
sun_zarr_path: gs://solar-pv-nowcasting-data/Sun/v0/sun.zarr
output_data:
filepath: gs://solar-pv-nowcasting-data/prepared_ML_training_data/v7/
process:
Expand Down
9 changes: 8 additions & 1 deletion nowcasting_dataset/data_sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,12 @@ General pydantic model of output of the data source. Contains the following meth

Roughly each of the data source folders follows this pattern
- A class which defines how to load the data source, how to select for batches etc. This inherits from 'data_source.DataSource',
- A class which contains the output model of the data source. This is the information used in the batches.
- A class which contains the output model of the data source, built from an xarray Dataset. This is the information used in the batches.
This inherits from 'datasource_output.DataSourceOutput'.
- A second class (pydantic) which moves the xarray Dataset to tensor fields. This will be used for training in ML models


# fake

`fake.py` has several function to create fake `Batch` data. This is useful for testing,
and hopefully useful outside this module too.
22 changes: 13 additions & 9 deletions nowcasting_dataset/data_sources/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import nowcasting_dataset.time as nd_time
from nowcasting_dataset import square
from nowcasting_dataset.data_sources.datasource_output import DataSourceOutput
from nowcasting_dataset.dataset.xr_utils import join_dataset_to_batch_dataset

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,16 +123,19 @@ def get_batch(
examples = []
zipped = zip(t0_datetimes, x_locations, y_locations)
for t0_datetime, x_location, y_location in zipped:
output: DataSourceOutput = self.get_example(t0_datetime, x_location, y_location)
output: xr.Dataset = self.get_example(t0_datetime, x_location, y_location)

if self.convert_to_numpy:
output.to_numpy()
examples.append(output)

# could add option here, to save each data source using
# 1. # DataSourceOutput.to_xr_dataset() to make it a dataset
# 2. DataSourceOutput.save_netcdf(), save to netcdf
return DataSourceOutput.create_batch_from_examples(examples)

# get the name of the cls, this could be one of the data sources like Sun
cls = examples[0].__class__

# join the examples together, and cast them to the cls, so that validation can occur
return cls(join_dataset_to_batch_dataset(examples))

def datetime_index(self) -> pd.DatetimeIndex:
"""Returns a complete list of all available datetimes."""
Expand Down Expand Up @@ -203,7 +207,7 @@ def get_example(
t0_dt: pd.Timestamp, #: Datetime of "now": The most recent obs.
x_meters_center: Number, #: Centre, in OSGB coordinates.
y_meters_center: Number, #: Centre, in OSGB coordinates.
) -> DataSourceOutput:
) -> xr.Dataset:
"""Must be overridden by child classes."""
raise NotImplementedError()

Expand Down Expand Up @@ -305,7 +309,10 @@ def get_example(
f"actual shape {selected_data.shape}"
)

return self._put_data_into_example(selected_data)
# rename 'variable' to 'channels'
selected_data = selected_data.rename({"variable": "channels"})

return selected_data

def geospatial_border(self) -> List[Tuple[Number, Number]]:
"""
Expand Down Expand Up @@ -342,6 +349,3 @@ def open(self) -> None:

def _open_data(self) -> xr.DataArray:
raise NotImplementedError()

def _put_data_into_example(self, selected_data: xr.DataArray) -> DataSourceOutput:
raise NotImplementedError()
178 changes: 36 additions & 142 deletions nowcasting_dataset/data_sources/datasource_output.py
Original file line number Diff line number Diff line change
@@ -1,115 +1,40 @@
""" General Data Source output pydantic class. """
from __future__ import annotations
import os
from nowcasting_dataset.filesystem.utils import make_folder
from nowcasting_dataset.utils import get_netcdf_filename

import logging
import os
from pathlib import Path
from pydantic import BaseModel, Field
import pandas as pd
import xarray as xr
from typing import List

import numpy as np
from typing import List, Union
import logging
from datetime import datetime
from pydantic import BaseModel, Field

from nowcasting_dataset.utils import to_numpy
from nowcasting_dataset.dataset.xr_utils import PydanticXArrayDataSet
from nowcasting_dataset.filesystem.utils import make_folder
from nowcasting_dataset.utils import get_netcdf_filename

logger = logging.getLogger(__name__)


class DataSourceOutput(BaseModel):
class DataSourceOutput(PydanticXArrayDataSet):
"""General Data Source output pydantic class.

Data source output classes should inherit from this class
"""

class Config:
""" Allowed classes e.g. tensor.Tensor"""

# TODO maybe there is a better way to do this
arbitrary_types_allowed = True

batch_size: int = Field(
0,
ge=0,
description="The size of this batch. If the batch size is 0, "
"then this item stores one data item i.e Example",
)
__slots__ = []

def get_name(self) -> str:
""" Get the name of the class """
"""Get the name of the class"""
return self.__class__.__name__.lower()

def to_numpy(self):
"""Change to numpy"""
for k, v in self.dict().items():
self.__setattr__(k, to_numpy(v))

def to_xr_data_array(self):
""" Change to xr DataArray"""
raise NotImplementedError()

@staticmethod
def create_batch_from_examples(data):
"""
Join a list of data source items to a batch.

Note that this only works for numpy objects, so objects are changed into numpy
"""
_ = [d.to_numpy() for d in data]

# use the first item in the list, and then update each item
batch = data[0]
for k in batch.dict().keys():

# set batch size to the list of the items
if k == "batch_size":
batch.batch_size = len(data)
else:

# get list of one variable from the list of data items.
one_variable_list = [d.__getattribute__(k) for d in data]
batch.__setattr__(k, np.stack(one_variable_list, axis=0))

return batch

def split(self) -> List[DataSourceOutput]:
"""
Split the datasource from a batch to a list of items

Returns: List of single data source items
"""
cls = self.__class__

items = []
for batch_idx in range(self.batch_size):
d = {k: v[batch_idx] for k, v in self.dict().items() if k != "batch_size"}
d["batch_size"] = 0
items.append(cls(**d))

return items

def to_xr_dataset(self, **kwargs):
""" Make a xr dataset. Each data source needs to define this """
raise NotImplementedError

def from_xr_dataset(self):
""" Load from xr dataset. Each data source needs to define this """
raise NotImplementedError

def get_datetime_index(self):
""" Datetime index for the data """
pass

def save_netcdf(self, batch_i: int, path: Path, xr_dataset: xr.Dataset):
def save_netcdf(self, batch_i: int, path: Path):
"""
Save batch to netcdf file

Args:
batch_i: the batch id, used to make the filename
path: the path where it will be saved. This can be local or in the cloud.
xr_dataset: xr dataset that has batch information in it
"""
filename = get_netcdf_filename(batch_i)

Expand All @@ -124,77 +49,46 @@ def save_netcdf(self, batch_i: int, path: Path, xr_dataset: xr.Dataset):
# make file
local_filename = os.path.join(folder, filename)

encoding = {name: {"compression": "lzf"} for name in xr_dataset.data_vars}
xr_dataset.to_netcdf(local_filename, engine="h5netcdf", mode="w", encoding=encoding)

def select_time_period(
self,
keys: List[str],
history_minutes: int,
forecast_minutes: int,
t0_dt_of_first_example: Union[datetime, pd.Timestamp],
):
"""
Selects a subset of data between the indicies of [start, end] for each key in keys

Note that class is edited so nothing is returned.

Args:
keys: Keys in batch to use
t0_dt_of_first_example: datetime of the current time (t0) in the first example of the batch
history_minutes: How many minutes of history to use
forecast_minutes: How many minutes of future data to use for forecasting

"""
logger.debug(
f"Taking a sub-selection of the batch data based on a history minutes of {history_minutes} "
f"and forecast minutes of {forecast_minutes}"
)
encoding = {name: {"compression": "lzf"} for name in self.data_vars}
self.to_netcdf(local_filename, engine="h5netcdf", mode="w", encoding=encoding)

start_time_of_first_batch = t0_dt_of_first_example - pd.to_timedelta(
f"{history_minutes} minute 30 second"
)
end_time_of_first_example = t0_dt_of_first_example + pd.to_timedelta(
f"{forecast_minutes} minute 30 second"
)

logger.debug(f"New start time for first batch is {start_time_of_first_batch}")
logger.debug(f"New end time for first batch is {end_time_of_first_example}")
class DataSourceOutputML(BaseModel):
"""General Data Source output pydantic class.

start_time_of_first_example = to_numpy(start_time_of_first_batch)
end_time_of_first_example = to_numpy(end_time_of_first_example)
Data source output classes should inherit from this class
"""

if self.get_datetime_index() is not None:
class Config:
"""Allowed classes e.g. tensor.Tensor"""

time_of_first_example = to_numpy(pd.to_datetime(self.get_datetime_index()[0]))
# TODO maybe there is a better way to do this
arbitrary_types_allowed = True

# find the start and end index, that we will then use to slice the data
start_i, end_i = np.searchsorted(
time_of_first_example, [start_time_of_first_example, end_time_of_first_example]
)
batch_size: int = Field(
0,
ge=0,
description="The size of this batch. If the batch size is 0, "
"then this item stores one data item i.e Example",
)

# slice all the data
for key in keys:
if "time" in self.__getattribute__(key).dims:
self.__setattr__(
key, self.__getattribute__(key).isel(time=slice(start_i, end_i))
)
elif "time_30" in self.__getattribute__(key).dims:
self.__setattr__(
key, self.__getattribute__(key).isel(time_30=slice(start_i, end_i))
)
def get_name(self) -> str:
"""Get the name of the class"""
return self.__class__.__name__.lower()

logger.debug(f"{self.__class__.__name__} {key}: {self.__getattribute__(key).shape}")
def get_datetime_index(self):
"""Datetime index for the data"""
pass


def pad_nans(array, pad_width) -> np.ndarray:
""" Pad nans with nans"""
"""Pad nans with nans"""
array = array.astype(np.float32)
return np.pad(array, pad_width, constant_values=np.NaN)


def pad_data(
data: DataSourceOutput,
data: DataSourceOutputML,
pad_size: int,
one_dimensional_arrays: List[str],
two_dimensional_arrays: List[str],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from nowcasting_dataset import time as nd_time
from nowcasting_dataset.data_sources.data_source import DataSource
from nowcasting_dataset.data_sources.datetime.datetime_model import Datetime
from nowcasting_dataset.dataset.xr_utils import make_dim_index


@dataclass
Expand Down Expand Up @@ -36,7 +37,13 @@ def get_example(
start_dt = self._get_start_dt(t0_dt)
end_dt = self._get_end_dt(t0_dt)
index = pd.date_range(start_dt, end_dt, freq="5T")
return nd_time.datetime_features_in_example(index)

datetime_xr_dataset = nd_time.datetime_features_in_example(index).rename({"index": "time"})

# make sure time is indexes in the correct way
datetime_xr_dataset = make_dim_index(datetime_xr_dataset)

return Datetime(datetime_xr_dataset)

def get_locations_for_batch(
self, t0_datetimes: pd.DatetimeIndex
Expand Down
Loading