# -*- coding: utf-8 -*-
from typing import Tuple
from rasterio.transform import Affine
from rasterio.profiles import Profile
from numba import njit, prange
import numpy as np
from import Window
from import CRS
from import DatasetReader
from netCDF4 import Dataset, date2num
from datetime import datetime
import os
from typing import Union

[docs] @njit(cache=True) def reproject_pixel( gt_from: tuple[float, float, float, float, float, float], gt_to: tuple[float, float, float, float, float, float], px_in: int, py_in: int, ) -> tuple[int, int]: """Reproject pixel. Args: gt_from: Orginal geotransformation. gt_to: Target geotransformation. px_in: Input xpixel. py_in: Input ypixel. """ px_out = (gt_from[0] + gt_from[1] * px_in - gt_to[0]) / gt_to[1] py_out = (gt_from[3] + gt_from[5] * py_in - gt_to[3]) / gt_to[5] return int(px_out), int(py_out)
[docs] @njit(cache=True) def pixel_to_coord(px: int, py: int, gt: tuple) -> Tuple[float, float]: """Converts pixel (x, y) to coordinate (lon, lat) for given geotransformation. Uses the upper left corner of the pixel. To use the center, add 0.5 to input pixel. Parameters: pixel: the pixel (x, y) that need to be transformed to coordinate gt: the geotransformation. Must be unrotated Returns: array: the coordinate (lon, lat) """ if gt[2] + gt[4] == 0: lon = px * gt[1] + gt[0] lat = py * gt[5] + gt[3] return lon, lat else: raise ValueError("Cannot convert rotated maps")
[docs] @njit(cache=True, parallel=True) def pixels_to_coords(pixels: np.ndarray, gt: tuple) -> np.ndarray: """Converts pixels (x, y) to coordinates (lon, lat) for given geotransformation. Uses the upper left corner of the pixels. To use the centers, add 0.5 to input pixels. Parameters: pixels: the pixels (x, y) that need to be transformed to coordinates (shape: 2, n) gt: the geotransformation. Must be unrotated Returns: array: the coordinates (lon, lat) (shape: 2, n) """ assert pixels.shape[1] == 2 if gt[2] + gt[4] == 0: coords = np.empty(pixels.shape, dtype=np.float64) for i in prange(coords.shape[0]): coords[i, 0] = pixels[i, 0] * gt[1] + gt[0] coords[i, 1] = pixels[i, 1] * gt[5] + gt[3] return coords else: raise ValueError("Cannot convert rotated maps")
[docs] @njit(cache=True) def coord_to_pixel( coord: np.ndarray, gt: tuple[float, float, float, float, float, float] ) -> tuple[int, int]: """Converts coordinate to pixel (x, y) for given geotransformation. Parameters: coord: the coordinate (lon, lat) that need to be transformed to pixel gt: the geotransformation. Must be unrotated Returns: array: tuple of pixel (x, y) """ if gt[2] + gt[4] == 0: px = (coord[0] - gt[0]) / gt[1] py = (coord[1] - gt[3]) / gt[5] return int(px), int(py) else: raise ValueError("Cannot convert rotated maps")
[docs] @njit(parallel=True) def coords_to_pixels( coords, gt: tuple, dtype=np.uint32 ) -> tuple[np.ndarray, np.ndarray]: """Converts array of coordinates to array of pixels for given geotransformation. Parameters: coords: the coordinates (lon, lat) that need to be transformed to pixels (shape: 2, n) gt: the geotransformation. Must be unrotated Returns: array: 2d-array of pixels per coordinate (shape: 2, n) """ if gt[2] + gt[4] == 0: size = coords.shape[0] x_offset = gt[0] y_offset = gt[3] x_step = gt[1] y_step = gt[5] pxs = np.empty(size, dtype=dtype) pys = np.empty(size, dtype=dtype) for i in prange(size): pxs[i] = int((coords[i, 0] - x_offset) / x_step) pys[i] = int((coords[i, 1] - y_offset) / y_step) return pxs, pys else: raise ValueError("Cannot convert rotated maps")
[docs] @njit(parallel=False) def sample_from_map( array: np.ndarray, coords: np.ndarray, gt: tuple[float, float, float, float, float, float], ) -> np.ndarray: """Sample coordinates from a map. Can handle multiple dimensions. Parameters: array: the map to sample from (2+n dimensions) coords: the coordinates used to sample (shape: 2, m) gt: the geotransformation. Must be unrotated Returns: array: values per coordinate """ assert gt[2] + gt[4] == 0 size = coords.shape[0] x_offset = gt[0] y_offset = gt[3] x_step = gt[1] y_step = gt[5] values = np.empty((size,) + array.shape[:-2], dtype=array.dtype) for i in prange(size): values[i] = array[ ..., int((coords[i, 1] - y_offset) / y_step), int((coords[i, 0] - x_offset) / x_step), ] return values
[docs] @njit( parallel=False, cache=True, ) # Writing to an array cannot be parallelized as race conditions would occur. def write_to_array( array: np.ndarray, values: np.ndarray, coords: np.ndarray, gt: tuple[float, float, float, float, float, float], ): """Write values using coordinates to a map. If multiple coordinates map to a single cell, the values are added. The operation is inplace Parameters: array: the 2-dimensional array to write to values: the values to write (shape: n) coords: the coordinates of the values (shape: 2, n) gt: the geotransformation. Must be unrotated Returns: array: the array with the values added (operation is inplace) """ assert values.size == coords.shape[0] assert gt[2] + gt[4] == 0 size = values.size x_offset = gt[0] y_offset = gt[3] x_step = gt[1] y_step = gt[5] for i in range(size): array[ int((coords[i, 1] - y_offset) / y_step), int((coords[i, 0] - x_offset) / x_step), ] += values[i] return array
[docs] def clip_to_xy_bounds( src: DatasetReader, profile: Profile, array: np.ndarray, xmin: int, xmax: int, ymin: int, ymax: int, ) -> np.ndarray: """Clips rasterio dataset to given bounds. Args: src: Rasterio dataset. profile: Rasterio profile of dataset. array: Array to clip. xmin: Minimum xbound. xmax: Maximum xbound. ymin: Minimum ybound. ymax: Maximum ybound. Returns: profile: Updated profile for cut array. array: Array data. """ window = Window.from_slices(slice(ymin, ymax), slice(xmin, xmax)) profile.update( { "transform": src.window_transform(window), "height": int(window.height), "width": int(window.width), } ) return profile, array[ymin:ymax, xmin:xmax].copy()
[docs] def clip_to_other( array: np.ndarray, src_profile: Profile, other_profile: Profile ) -> tuple[np.ndarray, Profile]: """Clip array to rasterio profile. Args: array: Array to clip. src_profile: Rasterio profile of source array. other_profile: Rasterio profile of array to clip to. Returns: outarray: Clipped array. profile: Updated profile. """ xmin = round( (other_profile["transform"].c - src_profile["transform"].c) / src_profile["transform"].a ) assert abs(xmin) == xmin # assert xmin is positive ymin = round( (other_profile["transform"].f - src_profile["transform"].f) / src_profile["transform"].e ) assert abs(ymin) == ymin # assert ymin is positive profile = dict(src_profile) profile.update( { "transform": other_profile["transform"], "height": other_profile["height"], "width": other_profile["width"], } ) outarray = array[ ymin : ymin + other_profile["height"], xmin : xmin + other_profile["width"] ].copy() return outarray, profile
[docs] def upscale( array: np.ndarray, src_profile: np.ndarray, factor: np.ndarray ) -> tuple[np.ndarray, Profile]: """ "Upscale array and rasterio profile by x amount. Args: array: Array to upscale. src_profile: Rasterio profile of source array. factor: Factor by which to upscale. Returns: array: Upscaled array. profile: Updated rasterio profile. """ profile = dict(src_profile) transform = src_profile["transform"] profile["transform"] = Affine( transform.a / factor, transform.b, transform.c, transform.d, transform.e / factor, transform.f, ) profile["height"] = src_profile["height"] * factor profile["width"] = src_profile["width"] * factor array = array.repeat(factor, axis=-2).repeat(factor, axis=-1) return array, profile
[docs] class NetCDFHasEmtpyLayersException(Exception): """Raised when one or more layers are empty""" pass
[docs] class CreateNetCDF: """Class to create easy create a spatial NetCDF-file. Args: fn: Filepath. title: Title of NetCDF. source: Source of data. institution: Institution where data was created. n_timesteps: Number of timesteps in NetCDF. None if NetCDF should not have time dimension. lons: Array of longitudes. lats: Array of latitudes. epsg: EPSG code. varname: Name of variable to store. units: Unit of variable. dtype: NetCDF dtype. See NetCDF documentation for valid dtypes: chunksizes: Sizes of datachunks. (time, lat, lon) if time dimension exists, otherwise (lat, lon). fill_value: Value to represent nodata. compression_level: NetCDF compression level (1-9) comment: Optional dataset comment. """ def __init__( self, fp: str, title: str, source: str, institution: str, n_timesteps: Union[None, int], lons: np.ndarray, lats: np.ndarray, epsg: int, varname: str, units: str, dtype: str, chunksizes: tuple[int], fill_value: Union[float, int], compression_level: int, comment: str = None, ) -> None: self.fp = fp self.ds = Dataset(self.fp, "w", format="NETCDF4") self.ds.set_always_mask(False) self.ds.date_created = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") self.ds.source = source self.ds.institution = institution self.ds.title = title self.ds.Conventions = "CF-1.6" if comment: self.ds.comment = comment if n_timesteps: self.ds.createDimension("time", n_timesteps) self.ds.createDimension("lat", len(lats)) self.ds.createDimension("lon", len(lons)) lat = self.ds.createVariable("lat", "f8", ("lat",)) lat.standard_name = "latitude" lat.units = "degrees_north" lat.axis = "Y" lat[:] = lats lon = self.ds.createVariable("lon", "f8", ("lon",)) lon.standard_name = "longitude" lon.units = "degrees_east" lon.axis = "X" lon[:] = lons crs = self.ds.createVariable("spatial_ref", "i4") crs.spatial_ref = CRS.from_epsg(epsg).to_wkt() if n_timesteps: self.times = self.ds.createVariable("time", "f4", ("time",)) self.times.standard_name = "time" self.times.long_name = "time" self.times.units = "hours since 1970-01-01 00:00:00" self.times.calendar = "gregorian" self.values = self.ds.createVariable( varname, datatype=dtype, dimensions=("time", "lat", "lon") if n_timesteps else ("lat", "lon"), chunksizes=chunksizes, zlib=bool(compression_level), complevel=compression_level, # ignored if zlib is False contiguous=False, # do not neccesarily store contiguous on disk fill_value=fill_value, ) self.values.standard_name = varname self.values.units = units self.current_timestep = 0
[docs] def write(self, values: np.ndarray, dt: datetime = None) -> None: """Write layer to file. Args: values: Values to write. dt: Optional datetime to write. If NetCDF has no time dimension, do not write. """ if dt: assert dt >= datetime(1970, 1, 1) self.times[self.current_timestep] = date2num( dt, units=self.times.units, calendar=self.times.calendar ) self.values[self.current_timestep, :, :] = values self.current_timestep += 1 else: assert not hasattr(self, "times") self.values[:, :] = values
def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): """Check if NetCDF has no empty layers on exit.""" if hasattr(self, "times"): checklayer = self.times else: checklayer = self.values if[:]): self.ds.close() os.remove(self.fp) raise NetCDFHasEmtpyLayersException else: self.ds.close() if exc_type is not None: os.remove(self.fp)