Skip to content

API Reference

This page is generated with mkdocstrings.

Top-Level Package

solarpandas

Top-level package exports and version metadata for solarpandas.

Modules:

  • accessors
  • base

    Core data containers with site metadata for solar time series.

  • config

    Configuration loading and runtime option access for solarpandas.

  • helpers

    General helper functions shared across the solarpandas package.

  • iohelpers
  • logtools

    Logging format helpers used by solarpandas logging setup.

  • mplstyles

    Registration helpers and constants for bundled Matplotlib styles.

  • origin

    Data origin integrations provided by solarpandas.

  • qcontrol

    Public exports for quality-control test functions and utilities.

  • sample_data

    Sample datasets bundled with solarpandas for demos and tests.

  • types

    Public type aliases, validators, and QC flag extension types.

  • validate

    Validation helpers and annotated domain types used by solarpandas.

Core Containers

base

Core data containers with site metadata for solar time series.

This module defines :class:SolarSeries and :class:SolarDataFrame, two pandas subclasses that keep site-level metadata (latitude, longitude, elevation and custom metadata) attached to the object through common pandas operations.

The module also provides top-level convenience readers for the custom CSV and Parquet formats implemented by :class:SolarDataFrame.

Classes:

SolarDataFrame

SolarDataFrame(
    *args,
    latitude: Latitude,
    longitude: Longitude,
    elevation: Elevation = 0.0,
    custom_metadata: dict | None = None,
    **kwargs,
)

Bases: DataFrame

Solar dataframe carrying site metadata.

Parameters:

  • latitude
    (float) –

    Site latitude in decimal degrees. Must satisfy -90 < lat < 90.

  • longitude
    (float) –

    Site longitude in decimal degrees. Must satisfy -180 <= lon < 180.

  • elevation
    (float, default: 0.0 ) –

    Site elevation in meters.

  • custom_metadata
    (dict or None, default: None ) –

    Additional user metadata to keep together with the dataframe.

Notes

Metadata are propagated through the custom pandas constructors. latitude, longitude and elevation are reserved metadata keys. They are managed internally and cannot be provided in custom_metadata.

Methods:

  • as_pandas

    Return a plain pandas DataFrame view of this object.

  • describe

    Compute descriptive statistics as a plain pandas dataframe.

  • read_csv

    Read a CSV file written by :meth:to_csv.

  • read_parquet

    Read a Parquet file written by :meth:to_parquet.

  • replace_data

    Create a copy with identical metadata and replaced data.

  • to_csv

    Write dataframe and metadata to a CSV file.

  • to_parquet

    Write dataframe and metadata to a Parquet file.

Attributes:

Source code in src/solarpandas/base.py
def __init__(
    self,
    *args,
    latitude: Latitude,
    longitude: Longitude,
    elevation: Elevation = 0.0,
    custom_metadata: dict | None = None,
    **kwargs,
):
    self._latitude = validate_type(latitude, Latitude)
    self._longitude = validate_type(longitude, Longitude)
    self._elevation = validate_type(elevation, Elevation)
    self._custom_metadata = custom_metadata or {}
    if "latitude" in self._custom_metadata:
        raise ValueError("`latitude` cannot be a key in metadata")
    if "longitude" in self._custom_metadata:
        raise ValueError("`longitue` cannot be a key in metadata")
    if "elevation" in self._custom_metadata:
        raise ValueError("`elevation` cannot be a key in metadata")
    super().__init__(*args, **kwargs)
custom_metadata property
custom_metadata: dict

dict : Additional user metadata attached to the object.

elevation property
elevation: float

float : Site elevation in meters above sea level.

latitude property
latitude: float

float : Site latitude in decimal degrees.

longitude property
longitude: float

float : Site longitude in decimal degrees.

as_pandas
as_pandas()

Return a plain pandas DataFrame view of this object.

Returns:

  • DataFrame

    Equivalent dataframe without solarpandas subclass semantics.

Source code in src/solarpandas/base.py
def as_pandas(self):
    """Return a plain pandas DataFrame view of this object.

    Returns
    -------
    pandas.DataFrame
        Equivalent dataframe without solarpandas subclass semantics.
    """
    return pd.DataFrame(self)
describe
describe()

Compute descriptive statistics as a plain pandas dataframe.

Returns:

  • DataFrame

    Result of pandas.DataFrame.describe on this dataset.

Source code in src/solarpandas/base.py
def describe(self):
    """Compute descriptive statistics as a plain pandas dataframe.

    Returns
    -------
    pandas.DataFrame
        Result of ``pandas.DataFrame.describe`` on this dataset.
    """
    return self.as_pandas().describe()
read_csv classmethod
read_csv(path: str | Path, **kwargs)

Read a CSV file written by :meth:to_csv.

Parameters:

  • path
    (str or Path) –

    Input file path.

  • **kwargs
    (Any, default: {} ) –

    Additional keyword arguments passed to :func:pandas.read_csv.

Returns:

Examples:

>>> sdf.to_csv("data.csv")
>>> restored = SolarDataFrame.read_csv("data.csv")
>>> restored.latitude == sdf.latitude
True
Source code in src/solarpandas/base.py
@classmethod
def read_csv(cls, path: str | Path, **kwargs):
    """Read a CSV file written by :meth:`to_csv`.

    Parameters
    ----------
    path : str or pathlib.Path
        Input file path.
    **kwargs : Any
        Additional keyword arguments passed to :func:`pandas.read_csv`.

    Returns
    -------
    SolarDataFrame
        Parsed dataset with restored site metadata.

    Examples
    --------
    >>> sdf.to_csv("data.csv")
    >>> restored = SolarDataFrame.read_csv("data.csv")
    >>> restored.latitude == sdf.latitude
    True
    """
    if not (p := Path(path)).exists():
        raise ValueError(f"missing file {path}")

    metadata_line = linecache.getline(p.as_posix(), 1)
    must_kwargs = {"header": 1, "index_col": 0, "parse_dates": True}
    data = pd.read_csv(path, **(kwargs | must_kwargs))
    metadata = json.loads(metadata_line)

    return cls(
        data=data,
        latitude=float(metadata.pop("latitude")),
        longitude=float(metadata.pop("longitude")),
        elevation=float(metadata.pop("elevation")),
        custom_metadata=metadata,
    )
read_parquet classmethod
read_parquet(path: str | Path)

Read a Parquet file written by :meth:to_parquet.

Parameters:

  • path
    (str or Path) –

    Input file path.

Returns:

Examples:

>>> sdf.to_parquet("data.parquet")
>>> restored = SolarDataFrame.read_parquet("data.parquet")
>>> restored.longitude == sdf.longitude
True
Source code in src/solarpandas/base.py
@classmethod
def read_parquet(cls, path: str | Path):
    """Read a Parquet file written by :meth:`to_parquet`.

    Parameters
    ----------
    path : str or pathlib.Path
        Input file path.

    Returns
    -------
    SolarDataFrame
        Parsed dataset with restored site metadata.

    Examples
    --------
    >>> sdf.to_parquet("data.parquet")
    >>> restored = SolarDataFrame.read_parquet("data.parquet")
    >>> restored.longitude == sdf.longitude
    True
    """
    if not (p := Path(path)).exists():
        raise ValueError(f"missing file {path}")
    table = pq.read_table(p)
    df = table.to_pandas()
    metadata_json = table.schema.metadata["solarpandas".encode()]
    metadata = {}
    for key, value in json.loads(metadata_json).items():
        if key.endswith("/ndarray"):
            metadata[key.split("/", 1)[0]] = np.array(value)
        elif key.endswith("/dataframe"):
            metadata[key.split("/", 1)[0]] = pd.DataFrame(value)
        else:
            metadata[key] = value

    return cls(
        data=df,
        latitude=float(metadata.pop("latitude")),
        longitude=float(metadata.pop("longitude")),
        elevation=float(metadata.pop("elevation")),
        custom_metadata=metadata,
    )
replace_data
replace_data(
    other: Series | DataFrame | Sequence[Number] | Number,
) -> Self

Create a copy with identical metadata and replaced data.

Parameters:

  • other
    (pandas.Series, pandas.DataFrame, sequence, or scalar number) –

    New data used to build the cloned object.

Returns:

  • Self

    A new :class:SolarDataFrame preserving metadata from the current object.

Source code in src/solarpandas/base.py
def replace_data(
    self, other: pd.Series | pd.DataFrame | Sequence[Number] | Number
) -> Self:
    """Create a copy with identical metadata and replaced data.

    Parameters
    ----------
    other : pandas.Series, pandas.DataFrame, sequence, or scalar number
        New data used to build the cloned object.

    Returns
    -------
    Self
        A new :class:`SolarDataFrame` preserving metadata from the current
        object.
    """
    kwargs = {
        "latitude": self.latitude,
        "longitude": self.longitude,
        "elevation": self.elevation,
        "custom_metadata": copy.deepcopy(self.custom_metadata),
    }
    if isinstance(other, Number):
        return self.__class__(
            data=np.full((len(self),), other), index=self.index, **kwargs
        )
    if isinstance(other, (np.ndarray, list)):
        return self.__class__(data=copy.copy(other), index=self.index, **kwargs)
    return self.__class__(data=copy.copy(other), **kwargs)
to_csv
to_csv(path: str | Path, **kwargs)

Write dataframe and metadata to a CSV file.

The first output line stores a JSON document with metadata. Data values are written from the second line onward using pandas.DataFrame.to_csv.

Parameters:

  • path
    (str or Path) –

    Destination path.

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments passed to DataFrame.to_csv.

Source code in src/solarpandas/base.py
def to_csv(self, path: str | Path, **kwargs):
    """Write dataframe and metadata to a CSV file.

    The first output line stores a JSON document with metadata. Data values
    are written from the second line onward using ``pandas.DataFrame.to_csv``.

    Parameters
    ----------
    path : str or pathlib.Path
        Destination path.
    **kwargs : Any
        Extra keyword arguments passed to ``DataFrame.to_csv``.
    """
    metadata = {
        "latitude": self.latitude,
        "longitude": self.longitude,
        "elevation": self.elevation,
    } | self.custom_metadata

    if not (p := Path(path)).parent.exists():
        p.parent.mkdir(parents=True, exist_ok=True)

    default_kwargs = {"header": True}
    with p.open("w") as f:
        f.write(json.dumps(metadata) + "\n")
        pd.DataFrame(self).to_csv(f, **(default_kwargs | kwargs))
to_parquet
to_parquet(path: str | Path, **kwargs)

Write dataframe and metadata to a Parquet file.

Parameters:

  • path
    (str or Path) –

    Destination path.

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments passed to pyarrow.parquet.write_table.

Notes

Metadata is stored in the Parquet schema metadata.

Source code in src/solarpandas/base.py
def to_parquet(self, path: str | Path, **kwargs):
    """Write dataframe and metadata to a Parquet file.

    Parameters
    ----------
    path : str or pathlib.Path
        Destination path.
    **kwargs : Any
        Extra keyword arguments passed to ``pyarrow.parquet.write_table``.

    Notes
    -----
    Metadata is stored in the Parquet schema metadata.
    """
    metadata = {
        "latitude": self.latitude,
        "longitude": self.longitude,
        "elevation": self.elevation,
    } | self.custom_metadata

    # convert the dataframe to a Arrow Table
    table = pa.Table.from_pandas(self)

    # copy the metadata to a dictionary and convert the numpy
    # arrays and dataframes to a json-serializable format
    df_metadata = {}
    for key, value in metadata.items():
        if isinstance(value, np.ndarray):
            df_metadata[f"{key}/ndarray"] = value.tolist()
        elif isinstance(value, pd.DataFrame):
            df_metadata[f"{key}/dataframe"] = value.to_dict()
        else:
            df_metadata[key] = value

    # add the dictionary to the schema metadata of table. Note
    # that I am using the keyword "syngena" and encode to bytes
    combined_metadata = {
        "solarpandas".encode(): json.dumps(df_metadata).encode(),
        **table.schema.metadata,
    }  # DataFrame"s metadata
    table = table.replace_schema_metadata(combined_metadata)

    # serialize to a parquet file
    p = Path(path)
    if not p.parent.exists():
        p.parent.mkdir(parents=True, exist_ok=True)
    pa.parquet.write_table(table, p, **kwargs)

SolarSeries

SolarSeries(
    *args,
    latitude: Latitude,
    longitude: Longitude,
    elevation: Elevation = 0.0,
    custom_metadata: dict | None = None,
    **kwargs,
)

Bases: Series

Solar data series carrying site metadata.

Parameters:

  • latitude
    (float) –

    Site latitude in decimal degrees. Must satisfy -90 < lat < 90.

  • longitude
    (float) –

    Site longitude in decimal degrees. Must satisfy -180 <= lon < 180.

  • elevation
    (float, default: 0.0 ) –

    Site elevation in meters.

  • custom_metadata
    (dict or None, default: None ) –

    Additional user metadata to attach to the series.

Notes

Metadata are propagated through the custom pandas constructors. latitude, longitude and elevation are reserved metadata keys. They are managed internally and cannot be provided in custom_metadata.

Methods:

  • replace_data

    Create a copy with identical site metadata and new data values.

Attributes:

Source code in src/solarpandas/base.py
def __init__(
    self,
    *args,
    latitude: Latitude,
    longitude: Longitude,
    elevation: Elevation = 0.0,
    custom_metadata: dict | None = None,
    **kwargs,
):
    self._latitude = validate_type(latitude, Latitude)
    self._longitude = validate_type(longitude, Longitude)
    self._elevation = validate_type(elevation, Elevation)
    self._custom_metadata = custom_metadata or {}
    if "latitude" in self._custom_metadata:
        raise ValueError("`latitude` cannot be a key in metadata")
    if "longitude" in self._custom_metadata:
        raise ValueError("`longitue` cannot be a key in metadata")
    if "elevation" in self._custom_metadata:
        raise ValueError("`elevation` cannot be a key in metadata")
    super().__init__(*args, **kwargs)
custom_metadata property
custom_metadata: dict

dict : Additional user metadata attached to the object.

elevation property
elevation: float

float : Site elevation in meters above sea level.

latitude property
latitude: float

float : Site latitude in decimal degrees.

longitude property
longitude: float

float : Site longitude in decimal degrees.

replace_data
replace_data(
    other: Series | DataFrame | Sequence[Number] | Number,
) -> Self

Create a copy with identical site metadata and new data values.

Parameters:

  • other
    (pandas.Series, pandas.DataFrame, sequence, or scalar number) –

    New data values.

Returns:

  • Self

    A new :class:SolarSeries preserving index (when applicable) and metadata from the source object.

Source code in src/solarpandas/base.py
def replace_data(
    self, other: pd.Series | pd.DataFrame | Sequence[Number] | Number
) -> Self:
    """Create a copy with identical site metadata and new data values.

    Parameters
    ----------
    other : pandas.Series, pandas.DataFrame, sequence, or scalar number
        New data values.

    Returns
    -------
    Self
        A new :class:`SolarSeries` preserving index (when applicable) and
        metadata from the source object.
    """
    kwargs = {
        "latitude": self.latitude,
        "longitude": self.longitude,
        "elevation": self.elevation,
        "custom_metadata": copy.deepcopy(self.custom_metadata),
    }
    if isinstance(other, Number):
        return self.__class__(
            data=np.full((len(self),), other), index=self.index, **kwargs
        )
    if isinstance(other, (np.ndarray, list)):
        return self.__class__(data=copy.copy(other), index=self.index, **kwargs)
    return self.__class__(data=copy.copy(other), **kwargs)

Accessors

solpos

Pandas accessors and caching helpers for solar-position computations.

Classes:

Functions:

  • clear_cache

    Clear the in-memory solar-position cache.

  • get_cache_info

    Return cache statistics for solar-position computations.

SolarPositionAccessor

SolarPositionAccessor(sdf_obj)

Accessor to compute and expose solar-position variables.

Notes

Cached properties use configuration options under solar-position: algorithm, refraction and engine.

Examples:

>>> sdf.solpos.zenith  # cached path using config defaults
>>> sdf.solpos.compute("psa", True, "numexpr").azimuth  # one-off computation

Methods:

  • clear_cache

    Clear the in-memory solar-position cache.

  • compute

    Compute solar position without using the accessor cache.

  • get_cache_info

    Return cache statistics for solar-position computations.

  • sunrise

    Return sunrise in the selected coordinate system.

  • sunset

    Return sunset in the selected coordinate system.

Attributes:

Source code in src/solarpandas/accessors/solpos.py
def __init__(self, sdf_obj):
    self._sdf = self._validate(sdf_obj)
    self._ISC = 1361.1  # W m-2, the solar constant
    self._algorithm = get_option("solar-position.algorithm", default="psa")
    self._refraction = get_option("solar-position.refraction", default=True)
    self._engine = get_option("solar-position.engine", default="numexpr")
azimuth property
azimuth: SolarSeries

Solar azimuth angle in degrees measured clockwise from north.

Returns:

cosz property

Cosine of the solar zenith angle.

Returns:

ecf property

Earth–Sun distance correction factor (dimensionless).

Returns:

  • SolarSeries

    Ratio of mean to actual Earth–Sun distance squared.

elevation property
elevation: SolarSeries

Solar elevation angle in degrees (complement of zenith: 90 - zenith).

Returns:

  • SolarSeries

    Positive values indicate the sun is above the horizon.

eth property

Extraterrestrial horizontal irradiance in W m⁻².

Returns:

  • SolarSeries

    Instantaneous irradiance on a horizontal plane at the top of atmosphere.

etn property

Extraterrestrial normal irradiance in W m⁻².

Returns:

  • SolarSeries

    Irradiance on a surface perpendicular to the solar beam (ISC * ecf).

local_solar_time property
local_solar_time: SolarSeries

Local Solar Time as a tz-aware datetime series.

Computed by shifting the index by the longitude-based solar offset (longitude * 4 minutes).

Returns:

lst property

Alias for :attr:local_solar_time.

sza property

Solar zenith angle in degrees. Alias for :attr:zenith.

true_solar_day property
true_solar_day: SolarSeries

True Solar Time floored to the start of each solar day.

Returns:

true_solar_time property
true_solar_time: SolarSeries

True Solar Time as a tz-aware datetime series.

Returns:

  • SolarSeries

    Timestamps re-expressed in True Solar Time (TST).

Examples:

>>> tst_hour = sdf.solpos.true_solar_time.dt.hour
tsd property

Alias for :attr:true_solar_day.

tst property

Alias for :attr:true_solar_time.

zenith property
zenith: SolarSeries

Solar zenith angle in degrees at each timestamp.

Returns:

  • SolarSeries

    Values range from 0° (sun overhead) to 180° (sun below horizon).

Examples:

>>> sza = sdf.solpos.zenith
>>> daytime = sza < 90
clear_cache staticmethod
clear_cache() -> None

Clear the in-memory solar-position cache.

Notes

Use this when changing model options and forcing a full recomputation.

Source code in src/solarpandas/accessors/solpos.py
@staticmethod
def clear_cache() -> None:
    """Clear the in-memory solar-position cache.

    Notes
    -----
    Use this when changing model options and forcing a full recomputation.
    """
    _compute_cached_solpos.cache_clear()
    logger.debug("solpos cache cleared")
compute
compute(
    algorithm: str = "psa",
    refraction: bool = True,
    engine: str = "numexpr",
) -> Sunpos

Compute solar position without using the accessor cache.

Parameters:

  • algorithm
    (str, default: "psa" ) –

    Solar-position algorithm accepted by sunwhere.

  • refraction
    (bool, default: True ) –

    Whether to include atmospheric refraction corrections.

  • engine
    (str, default: "numexpr" ) –

    Backend engine used by sunwhere.

Returns:

  • Sunpos

    sunwhere result object containing angular and temporal variables.

Source code in src/solarpandas/accessors/solpos.py
def compute(
    self, algorithm: str = "psa", refraction: bool = True, engine: str = "numexpr"
) -> sunwhere._base.Sunpos:
    """Compute solar position without using the accessor cache.

    Parameters
    ----------
    algorithm : str, default "psa"
        Solar-position algorithm accepted by ``sunwhere``.
    refraction : bool, default True
        Whether to include atmospheric refraction corrections.
    engine : str, default "numexpr"
        Backend engine used by ``sunwhere``.

    Returns
    -------
    sunwhere._base.Sunpos
        ``sunwhere`` result object containing angular and temporal variables.
    """
    logger.debug(
        f"evaluating solar position with `{algorithm}` algorithm, "
        f"refraction={refraction}, engine=`{engine}`..."
    )
    args = (self._sdf.index, self._sdf.latitude, self._sdf.longitude)
    kwargs = {"algorithm": algorithm, "refraction": refraction, "engine": engine}
    return sunwhere.sites(*args, **kwargs)
get_cache_info staticmethod
get_cache_info()

Return cache statistics for solar-position computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Source code in src/solarpandas/accessors/solpos.py
@staticmethod
def get_cache_info():
    """Return cache statistics for solar-position computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.
    """
    info = _compute_cached_solpos.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }
sunrise
sunrise(
    units: Literal[
        "rad", "deg", "tst", "lst", "utc"
    ] = "utc",
)

Return sunrise in the selected coordinate system.

Parameters:

  • units
    ((rad, deg, tst, lst, utc), default: "rad" ) –

    Units or time reference used by sunwhere.

Returns:

  • SolarSeries

    Sunrise values aligned to the dataframe index.

Source code in src/solarpandas/accessors/solpos.py
def sunrise(self, units: Literal["rad", "deg", "tst", "lst", "utc"] = "utc"):
    """Return sunrise in the selected coordinate system.

    Parameters
    ----------
    units : {"rad", "deg", "tst", "lst", "utc"}, default "utc"
        Units or time reference used by ``sunwhere``.

    Returns
    -------
    SolarSeries
        Sunrise values aligned to the dataframe index.
    """
    sr = self._get_cached_solpos("sunrise", units={"lst": "utc"}.get(units, units))
    if units == "lst":
        # convert from UTC to LST
        deltat = pd.Timedelta(self._sdf.longitude * 4, "min")
        sr = sr + deltat
    return sr
sunset
sunset(
    units: Literal[
        "rad", "deg", "tst", "lst", "utc"
    ] = "utc",
)

Return sunset in the selected coordinate system.

Parameters:

  • units
    ((rad, deg, tst, lst, utc), default: "rad" ) –

    Units or time reference used by sunwhere.

Returns:

  • SolarSeries

    Sunset values aligned to the dataframe index.

Source code in src/solarpandas/accessors/solpos.py
def sunset(self, units: Literal["rad", "deg", "tst", "lst", "utc"] = "utc"):
    """Return sunset in the selected coordinate system.

    Parameters
    ----------
    units : {"rad", "deg", "tst", "lst", "utc"}, default "utc"
        Units or time reference used by ``sunwhere``.

    Returns
    -------
    SolarSeries
        Sunset values aligned to the dataframe index.
    """
    ss = self._get_cached_solpos("sunset", units={"lst": "utc"}.get(units, units))
    if units == "lst":
        # convert from UTC to LST
        deltat = pd.Timedelta(self._sdf.longitude * 4, "min")
        ss = ss + deltat
    return ss

clear_cache

clear_cache() -> None

Clear the in-memory solar-position cache.

Notes

Use this when changing model options and forcing a full recomputation.

Examples:

>>> import solarpandas as sp
>>> sp.clear_solpos_cache()
Source code in src/solarpandas/accessors/solpos.py
def clear_cache() -> None:
    """Clear the in-memory solar-position cache.

    Notes
    -----
    Use this when changing model options and forcing a full recomputation.

    Examples
    --------
    >>> import solarpandas as sp
    >>> sp.clear_solpos_cache()
    """
    _compute_cached_solpos.cache_clear()
    logger.debug("solpos cache cleared")

get_cache_info

get_cache_info()

Return cache statistics for solar-position computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Examples:

>>> import solarpandas as sp
>>> info = sp.get_solpos_cache_info()
>>> "hits" in info
True
Source code in src/solarpandas/accessors/solpos.py
def get_cache_info():
    """Return cache statistics for solar-position computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.

    Examples
    --------
    >>> import solarpandas as sp
    >>> info = sp.get_solpos_cache_info()
    >>> "hits" in info
    True
    """
    info = _compute_cached_solpos.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }

clearsky

Pandas accessors and caches for clear-sky irradiance estimations.

Classes:

Functions:

  • clear_cache

    Clear the in-memory clear-sky irradiance cache.

  • get_cache_info

    Return cache statistics for clear-sky computations.

BaseClearskyIrradianceAccessor

BaseClearskyIrradianceAccessor(sdf_obj)

Base class providing cached clear-sky irradiance properties.

Subclasses configure _atmosphere and _model to select the atmosphere dataset and irradiance model used for computations.

Attributes:

  • ghi (SolarSeries) –

    Clear-sky global horizontal irradiance in W m⁻².

  • dni (SolarSeries) –

    Clear-sky direct normal irradiance in W m⁻².

  • dif (SolarSeries) –

    Clear-sky diffuse horizontal irradiance in W m⁻².

  • csi (SolarSeries) –

    Clear-sky index (ratio of measured to clear-sky GHI).

Methods:

  • clear_cache

    Clear the in-memory clear-sky irradiance cache.

  • get_cache_info

    Return cache statistics for clear-sky computations.

Source code in src/solarpandas/accessors/clearsky.py
def __init__(self, sdf_obj):
    self._sdf = self._validate(sdf_obj)
    self._model = get_option("clearsky.model", default="SPARTA")
    self._atmosphere = get_option("clearsky.atmosphere", default="crs_soda")
    if not hasattr(spartasolar.atmosphere, self._atmosphere):
        raise ValueError(f"invalid clearsky atmosphere `{self._atmosphere}`")
    logger.debug(
        f"initialized {self.__class__.__name__} with `{self._model}` "
        f"model and `{self._atmosphere}` atmosphere"
    )
csi property

Clear-sky index (ratio of measured to modelled clear-sky GHI).

dif property

Clear-sky diffuse horizontal irradiance in W m⁻².

dni property

Clear-sky direct normal irradiance in W m⁻².

ghi property

Clear-sky global horizontal irradiance in W m⁻².

clear_cache staticmethod
clear_cache() -> None

Clear the in-memory clear-sky irradiance cache.

Source code in src/solarpandas/accessors/clearsky.py
@staticmethod
def clear_cache() -> None:
    """Clear the in-memory clear-sky irradiance cache.
    """
    _compute_cached_clearsky.cache_clear()
    logger.debug("clearsky cache cleared")
get_cache_info staticmethod
get_cache_info()

Return cache statistics for clear-sky computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Source code in src/solarpandas/accessors/clearsky.py
@staticmethod
def get_cache_info():
    """Return cache statistics for clear-sky computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.
    """
    info = _compute_cached_clearsky.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }

CDAIrradianceAccessor

CDAIrradianceAccessor(sdf_obj)

Bases: BaseClearskyIrradianceAccessor

Accessor for clear-day-analysis clear-sky irradiance products.

Examples:

>>> sdf.cda.ghi
>>> sdf.cda.csi

Methods:

  • clear_cache

    Clear the in-memory clear-sky irradiance cache.

  • get_cache_info

    Return cache statistics for clear-sky computations.

Attributes:

  • csi (SolarSeries) –

    Clear-sky index (ratio of measured to modelled clear-sky GHI).

  • dif (SolarSeries) –

    Clear-sky diffuse horizontal irradiance in W m⁻².

  • dni (SolarSeries) –

    Clear-sky direct normal irradiance in W m⁻².

  • ghi (SolarSeries) –

    Clear-sky global horizontal irradiance in W m⁻².

Source code in src/solarpandas/accessors/clearsky.py
def __init__(self, sdf_obj):
    self._sdf = self._validate(sdf_obj)
    self._model = get_option("clearsky.model", default="SPARTA")
    self._atmosphere = get_option("clearsky.cda_atmosphere", default="merra2_cda")
    if not hasattr(spartasolar.atmosphere, self._atmosphere):
        raise ValueError(f"invalid clearsky atmosphere `{self._atmosphere}`")
csi property

Clear-sky index (ratio of measured to modelled clear-sky GHI).

dif property

Clear-sky diffuse horizontal irradiance in W m⁻².

dni property

Clear-sky direct normal irradiance in W m⁻².

ghi property

Clear-sky global horizontal irradiance in W m⁻².

clear_cache staticmethod
clear_cache() -> None

Clear the in-memory clear-sky irradiance cache.

Source code in src/solarpandas/accessors/clearsky.py
@staticmethod
def clear_cache() -> None:
    """Clear the in-memory clear-sky irradiance cache.
    """
    _compute_cached_clearsky.cache_clear()
    logger.debug("clearsky cache cleared")
get_cache_info staticmethod
get_cache_info()

Return cache statistics for clear-sky computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Source code in src/solarpandas/accessors/clearsky.py
@staticmethod
def get_cache_info():
    """Return cache statistics for clear-sky computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.
    """
    info = _compute_cached_clearsky.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }

ClearskyIrradianceAccessor

ClearskyIrradianceAccessor(sdf_obj)

Bases: BaseClearskyIrradianceAccessor

Accessor for clear-sky irradiance variables (GHI, DNI, DIF, CSI).

Notes

Cached properties use configuration options clearsky.model and clearsky.atmosphere.

Examples:

>>> sdf.clearsky.ghi
>>> sdf.clearsky.compute("crs_soda", "SPARTA").dni

Methods:

  • clear_cache

    Clear the in-memory clear-sky irradiance cache.

  • compute

    Compute clear-sky irradiance once without using cache.

  • get_cache_info

    Return cache statistics for clear-sky computations.

Attributes:

  • csi (SolarSeries) –

    Clear-sky index (ratio of measured to modelled clear-sky GHI).

  • dif (SolarSeries) –

    Clear-sky diffuse horizontal irradiance in W m⁻².

  • dni (SolarSeries) –

    Clear-sky direct normal irradiance in W m⁻².

  • ghi (SolarSeries) –

    Clear-sky global horizontal irradiance in W m⁻².

Source code in src/solarpandas/accessors/clearsky.py
def __init__(self, sdf_obj):
    self._sdf = self._validate(sdf_obj)
    self._model = get_option("clearsky.model", default="SPARTA")
    self._atmosphere = get_option("clearsky.atmosphere", default="crs_soda")
    if not hasattr(spartasolar.atmosphere, self._atmosphere):
        raise ValueError(f"invalid clearsky atmosphere `{self._atmosphere}`")
    logger.debug(
        f"initialized {self.__class__.__name__} with `{self._model}` "
        f"model and `{self._atmosphere}` atmosphere"
    )
csi property

Clear-sky index (ratio of measured to modelled clear-sky GHI).

dif property

Clear-sky diffuse horizontal irradiance in W m⁻².

dni property

Clear-sky direct normal irradiance in W m⁻².

ghi property

Clear-sky global horizontal irradiance in W m⁻².

clear_cache staticmethod
clear_cache() -> None

Clear the in-memory clear-sky irradiance cache.

Source code in src/solarpandas/accessors/clearsky.py
@staticmethod
def clear_cache() -> None:
    """Clear the in-memory clear-sky irradiance cache.
    """
    _compute_cached_clearsky.cache_clear()
    logger.debug("clearsky cache cleared")
compute
compute(
    atmosphere: Literal[
        "merra2_daily",
        "merra2_gee",
        "merra2_lta",
        "crs_soda",
        "custom",
    ],
    model: str = "SPARTA",
) -> SolarDataFrame

Compute clear-sky irradiance once without using cache.

Parameters:

  • atmosphere
    ((merra2_daily, merra2_gee, merra2_lta, crs_soda, custom), default: "merra2_daily" ) –

    Atmosphere dataset source.

  • model
    (str, default: "SPARTA" ) –

    Irradiance model name accepted by the selected atmosphere backend.

Returns:

  • SolarDataFrame

    Dataframe with columns ghi, dni, dif and csi computed for the requested atmosphere and model.

Source code in src/solarpandas/accessors/clearsky.py
def compute(
    self,
    atmosphere: Literal[
        "merra2_daily", "merra2_gee", "merra2_lta", "crs_soda", "custom"
    ],
    model: str = "SPARTA",
) -> SolarDataFrame:
    """Compute clear-sky irradiance once without using cache.

    Parameters
    ----------
    atmosphere : {"merra2_daily", "merra2_gee", "merra2_lta", "crs_soda", "custom"}
        Atmosphere dataset source.
    model : str, default "SPARTA"
        Irradiance model name accepted by the selected atmosphere backend.

    Returns
    -------
    SolarDataFrame
        Dataframe with columns ``ghi``, ``dni``, ``dif`` and ``csi``
        computed for the requested atmosphere and model."""
    logger.debug(
        f"evaluating clearsky with `{model}` model and `{atmosphere}` atmosphere..."
    )
    if not hasattr(spartasolar.atmosphere, atmosphere):
        raise ValueError(f"invalid clearsky atmosphere `{atmosphere}`")
    if atmosphere == "custom":
        raise NotImplementedError(
            "TODO: implement support for user-provided custom "
            "atmosphere datasets from dataframe columns"
        )
    atmos_obj = getattr(spartasolar.atmosphere, atmosphere)
    args = (
        self._sdf.index.tz_convert("UTC").tz_localize(None),
        self._sdf.latitude,
        self._sdf.longitude,
    )
    try:
        xa_result = atmos_obj.at_site(*args).compute(model)
    except AttributeError:
        xa_result = atmos_obj.at_sites(*args).compute(model)
    df_result = (
        xa_result.isel(site=0).drop_vars(["lat", "lon", "site"]).to_dataframe()
    )
    return self._sdf.replace_data(df_result)
get_cache_info staticmethod
get_cache_info()

Return cache statistics for clear-sky computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Source code in src/solarpandas/accessors/clearsky.py
@staticmethod
def get_cache_info():
    """Return cache statistics for clear-sky computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.
    """
    info = _compute_cached_clearsky.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }

LTAIrradianceAccessor

LTAIrradianceAccessor(sdf_obj)

Bases: BaseClearskyIrradianceAccessor

Accessor for long-term-average clear-sky irradiance products.

Examples:

>>> sdf.lta.ghi
>>> sdf.lta.dni

Methods:

  • clear_cache

    Clear the in-memory clear-sky irradiance cache.

  • get_cache_info

    Return cache statistics for clear-sky computations.

Attributes:

  • csi (SolarSeries) –

    Clear-sky index (ratio of measured to modelled clear-sky GHI).

  • dif (SolarSeries) –

    Clear-sky diffuse horizontal irradiance in W m⁻².

  • dni (SolarSeries) –

    Clear-sky direct normal irradiance in W m⁻².

  • ghi (SolarSeries) –

    Clear-sky global horizontal irradiance in W m⁻².

Source code in src/solarpandas/accessors/clearsky.py
def __init__(self, sdf_obj):
    self._sdf = self._validate(sdf_obj)
    self._model = get_option("clearsky.model", default="SPARTA")
    self._atmosphere = get_option("clearsky.lta_atmosphere", default="merra2_lta")
    if not hasattr(spartasolar.atmosphere, self._atmosphere):
        raise ValueError(f"invalid clearsky atmosphere `{self._atmosphere}`")
csi property

Clear-sky index (ratio of measured to modelled clear-sky GHI).

dif property

Clear-sky diffuse horizontal irradiance in W m⁻².

dni property

Clear-sky direct normal irradiance in W m⁻².

ghi property

Clear-sky global horizontal irradiance in W m⁻².

clear_cache staticmethod
clear_cache() -> None

Clear the in-memory clear-sky irradiance cache.

Source code in src/solarpandas/accessors/clearsky.py
@staticmethod
def clear_cache() -> None:
    """Clear the in-memory clear-sky irradiance cache.
    """
    _compute_cached_clearsky.cache_clear()
    logger.debug("clearsky cache cleared")
get_cache_info staticmethod
get_cache_info()

Return cache statistics for clear-sky computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Source code in src/solarpandas/accessors/clearsky.py
@staticmethod
def get_cache_info():
    """Return cache statistics for clear-sky computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.
    """
    info = _compute_cached_clearsky.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }

clear_cache

clear_cache() -> None

Clear the in-memory clear-sky irradiance cache.

Examples:

>>> import solarpandas as sp
>>> sp.clear_clearsky_cache()
Source code in src/solarpandas/accessors/clearsky.py
def clear_cache() -> None:
    """Clear the in-memory clear-sky irradiance cache.

    Examples
    --------
    >>> import solarpandas as sp
    >>> sp.clear_clearsky_cache()
    """
    _compute_cached_clearsky.cache_clear()
    logger.debug("clearsky cache cleared")

get_cache_info

get_cache_info()

Return cache statistics for clear-sky computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Examples:

>>> import solarpandas as sp
>>> info = sp.get_clearsky_cache_info()
>>> "current_size" in info
True
Source code in src/solarpandas/accessors/clearsky.py
def get_cache_info():
    """Return cache statistics for clear-sky computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.

    Examples
    --------
    >>> import solarpandas as sp
    >>> info = sp.get_clearsky_cache_info()
    >>> "current_size" in info
    True
    """
    info = _compute_cached_clearsky.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }

qcontrol

Accessor API to run qcrad quality-control checks on solar data.

Classes:

Functions:

  • clear_cache

    Clear the in-memory quality-control cache.

  • get_cache_info

    Return cache statistics for quality-control computations.

HashableDF

HashableDF(unhashable_df: SolarDataFrame | DataFrame)

Hashable wrapper for dataframe content used by the QC cache.

Source code in src/solarpandas/accessors/qcontrol.py
def __init__(self, unhashable_df: SolarDataFrame | pd.DataFrame):
    self.dataframe = unhashable_df
    # pd.util.has_pandas_object devuelve un array de hashes para cada fila,
    # sumamos para obtener un hash que representa el contenido de todo el DataFrame
    self._hash = int(pd.util.hash_pandas_object(self.dataframe, index=True).sum())

QualityControlAccessor

QualityControlAccessor(sdf_obj)

Accessor to run and query quality-control flags.

Examples:

>>> qc = sdf.qc
>>> qc.tests.columns
>>> qc.failed(component="ghi")

Methods:

  • __getattr__

    Access QC tests as attributes when names match columns.

  • __getitem__

    Return one QC test series by its column name.

  • clear_cache

    Clear the in-memory quality-control cache.

  • failed

    Return a boolean mask where at least one selected test fails.

  • filter

    Filter QC tests by component, explicit names, or pattern.

  • get_cache_info

    Return cache statistics for quality-control computations.

  • heatmap

    Render a QC pass/fail heatmap over time.

  • mask_failed

    Mask original values where selected QC tests fail.

  • passed

    Return a boolean mask where all selected tests pass or are neutral.

Attributes:

Source code in src/solarpandas/accessors/qcontrol.py
def __init__(self, sdf_obj):
    self._sdf = self._validate(sdf_obj)
    self._tests = _run_cached_qc(HashableDF(self._sdf))
tests property

Return the full QC test result dataframe.

__getattr__
__getattr__(name: str) -> SolarSeries

Access QC tests as attributes when names match columns.

Source code in src/solarpandas/accessors/qcontrol.py
def __getattr__(self, name: str) -> SolarSeries:
    """Access QC tests as attributes when names match columns."""
    if name not in self._tests.columns:
        raise AttributeError(f"QC test '{name}' not found in results.")
    return self._tests[name]
__getitem__
__getitem__(key: str) -> SolarSeries

Return one QC test series by its column name.

Source code in src/solarpandas/accessors/qcontrol.py
def __getitem__(self, key: str) -> SolarSeries:
    """Return one QC test series by its column name."""
    if key not in self._tests.columns:
        raise KeyError(f"QC test '{key}' not found in results.")
    return self._tests[key]
clear_cache staticmethod
clear_cache() -> None

Clear the in-memory quality-control cache.

Examples:

>>> import solarpandas as sp
>>> sp.clear_qc_cache()
Source code in src/solarpandas/accessors/qcontrol.py
@staticmethod
def clear_cache() -> None:
    """Clear the in-memory quality-control cache.

    Examples
    --------
    >>> import solarpandas as sp
    >>> sp.clear_qc_cache()
    """
    _run_cached_qc.cache_clear()
    logger.debug("qc cache cleared")
failed
failed(
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
) -> Series

Return a boolean mask where at least one selected test fails.

Parameters:

  • component
    ((ghi, dni, dif), default: "ghi" ) –

    Convenience selector for pre-defined test groups.

  • tests
    (list[str] or None, default: None ) –

    Explicit test names.

  • like
    (str or None, default: None ) –

    Substring pattern forwarded to DataFrame.filter.

  • regex
    (str or None, default: None ) –

    Regex pattern forwarded to DataFrame.filter.

Returns:

  • Series

    Boolean series; True where at least one selected test flags a sample as failed.

Examples:

>>> mask = sdf.qc.failed(component="ghi")
>>> clean_ghi = sdf["ghi"].where(~mask)
Source code in src/solarpandas/accessors/qcontrol.py
def failed(
    self,
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
) -> pd.Series:
    """Return a boolean mask where at least one selected test fails.

    Parameters
    ----------
    component : {"ghi", "dni", "dif"} or None, default None
        Convenience selector for pre-defined test groups.
    tests : list[str] or None, default None
        Explicit test names.
    like : str or None, default None
        Substring pattern forwarded to ``DataFrame.filter``.
    regex : str or None, default None
        Regex pattern forwarded to ``DataFrame.filter``.

    Returns
    -------
    pandas.Series
        Boolean series; ``True`` where at least one selected test flags a
        sample as failed.

    Examples
    --------
    >>> mask = sdf.qc.failed(component="ghi")
    >>> clean_ghi = sdf["ghi"].where(~mask)
    """

    return (
        self.filter(component, tests=tests, like=like, regex=regex)
        .apply(lambda test: test.flag.fails)
        .any(axis=1)
    )
filter
filter(
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
) -> DataFrame

Filter QC tests by component, explicit names, or pattern.

Parameters:

  • component
    ((ghi, dni, dif), default: "ghi" ) –

    Convenience selector for pre-defined test groups.

  • tests
    (list[str] or None, default: None ) –

    Explicit test names.

  • like
    (str or None, default: None ) –

    Substring pattern forwarded to DataFrame.filter.

  • regex
    (str or None, default: None ) –

    Regex pattern forwarded to DataFrame.filter.

Returns:

Source code in src/solarpandas/accessors/qcontrol.py
def filter(
    self,
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
) -> pd.DataFrame:
    """Filter QC tests by component, explicit names, or pattern.

    Parameters
    ----------
    component : {"ghi", "dni", "dif"} or None, default None
        Convenience selector for pre-defined test groups.
    tests : list[str] or None, default None
        Explicit test names.
    like : str or None, default None
        Substring pattern forwarded to ``DataFrame.filter``.
    regex : str or None, default None
        Regex pattern forwarded to ``DataFrame.filter``.

    Returns
    -------
    pandas.DataFrame
        Subset of QC test columns.
    """

    if component is not None:
        if component.casefold() not in ("ghi", "dni", "dif"):
            raise ValueError("component must be one of 'ghi', 'dni', 'dif' or None")

        if any([tests, like, regex]):
            logger.warning(
                "Cannot specify `component` together with `tests`, `like` or `regex` "
                "filters. Ignoring filters and using component only."
            )

        logger.debug(f"Filtering QC tests for component '{component}'")
        tests = reduce(
            lambda x, y: x + y, _COMPONENT_TO_TEST_MAP.get(component).values()
        )
        logger.debug(f"Tests for component '{component}': {tests}")

    if tests is None and like is None and regex is None:
        return self._tests
    tests = self._tests.filter(items=tests, like=like, regex=regex, axis=1)
    logger.info(f"Filtered QC tests: {tests.columns.tolist()}")
    return tests
get_cache_info staticmethod
get_cache_info()

Return cache statistics for quality-control computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Examples:

>>> import solarpandas as sp
>>> info = sp.get_qc_cache_info()
>>> "misses" in info
True
Source code in src/solarpandas/accessors/qcontrol.py
@staticmethod
def get_cache_info():
    """Return cache statistics for quality-control computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.

    Examples
    --------
    >>> import solarpandas as sp
    >>> info = sp.get_qc_cache_info()
    >>> "misses" in info
    True
    """
    from .qcontrol import _run_cached_qc
    info = _run_cached_qc.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }
heatmap
heatmap(
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
    combined: bool = False,
    **kwargs,
) -> Figure

Render a QC pass/fail heatmap over time.

Parameters:

  • component
    ((ghi, dni, dif), default: "ghi" ) –

    Convenience selector for pre-defined test groups.

  • tests
    (list[str] or None, default: None ) –

    Explicit test names.

  • like
    (str or None, default: None ) –

    Substring pattern forwarded to DataFrame.filter.

  • regex
    (str or None, default: None ) –

    Regex pattern forwarded to DataFrame.filter.

  • combined
    (bool, default: False ) –

    If True, encodes failure severity by component groups (1-component, 2-component, 3-component).

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments forwarded to the underlying heatmap plotter.

Returns:

  • Figure

    Figure containing the heatmap.

Examples:

>>> fig = sdf.qc.heatmap(component="ghi")
>>> fig = sdf.qc.heatmap(component="ghi", combined=True)
Source code in src/solarpandas/accessors/qcontrol.py
def heatmap(
    self,
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
    combined: bool = False,
    **kwargs,
) -> plt.Figure:
    """Render a QC pass/fail heatmap over time.

    Parameters
    ----------
    component : {"ghi", "dni", "dif"} or None, default None
        Convenience selector for pre-defined test groups.
    tests : list[str] or None, default None
        Explicit test names.
    like : str or None, default None
        Substring pattern forwarded to ``DataFrame.filter``.
    regex : str or None, default None
        Regex pattern forwarded to ``DataFrame.filter``.
    combined : bool, default False
        If ``True``, encodes failure severity by component groups
        (1-component, 2-component, 3-component).
    **kwargs : Any
        Extra keyword arguments forwarded to the underlying heatmap plotter.

    Returns
    -------
    matplotlib.figure.Figure
        Figure containing the heatmap.

    Examples
    --------
    >>> fig = sdf.qc.heatmap(component="ghi")
    >>> fig = sdf.qc.heatmap(component="ghi", combined=True)
    """
    from matplotlib.colors import BoundaryNorm, ListedColormap
    from ..mplstyles import QC_COLOR_FAILED, QC_COLOR_PASSED

    if not combined:
        series = self.passed(component, tests=tests, like=like, regex=regex).astype(
            np.int8
        )
        cmap = ListedColormap([QC_COLOR_FAILED, QC_COLOR_PASSED])
        norm = BoundaryNorm([-0.5, 0.5, 1.5], cmap.N)
        ticks = {0: "FAILED", 1: "PASSED"}
        cax_bounds = [0.05, -0.15, 0.3, 0.03]
    else:

        def get_failed(components):
            return self.failed(
                tests=_COMPONENT_TO_TEST_MAP.get(component).get(components)
            )

        series = self._sdf.replace_data(other=0.0).iloc[:, 0].astype(np.int8)
        series.loc[get_failed("1-component")] = np.int8(1)
        series.loc[get_failed("2-component")] = np.int8(2)
        series.loc[get_failed("3-component")] = np.int8(3)

        cmap = ListedColormap(["#e6f2ff", "#84e184", "#4d94ff", "#ff6666"])
        norm = BoundaryNorm([-0.5, 0.5, 1.5, 2.5, 3.5], cmap.N)
        ticks = {0: "Passed", 1: "1-comp", 2: "2-comp", 3: "3-comp"}
        cax_bounds = [0.025, -0.15, 0.35, 0.03]

    title = "QC Results"
    if component is not None:
        title += f" for {component.upper()}"
    network = self._sdf.custom_metadata.get("network", None)
    if network is not None and network.casefold() == "bsrn":
        station = self._sdf.custom_metadata.get("station", "unknown station")
        location = self._sdf.custom_metadata.get("location", "unknown location")
        acronym = self._sdf.custom_metadata.get("acronym", "unknown acronym")
        title += f" at {station}, {location} ({acronym.upper()}, BSRN)"
    title += f" (lat={self._sdf.latitude:.4f}, lon={self._sdf.longitude:.4f}, alt={self._sdf.elevation:.0f} m)"

    fig, ax = plt.subplots(1, 1, figsize=(14, 5), layout="constrained")
    ax.set_facecolor("white")

    kwargs = {
        "twilight_line": True,
        "aggfunc": "median",
        "cmap": cmap,
        "norm": norm,
    }
    series.solarplot.heatmap(ax=ax, colorbar=False, **kwargs)
    ax.set_title(title)

    mesh = ax.collections[0]
    cax = ax.inset_axes(cax_bounds, transform=ax.transAxes)
    cbar = fig.colorbar(mesh, cax=cax, orientation="horizontal")
    cbar.set_ticks(list(ticks.keys()))
    cbar.ax.set_xticklabels(list(ticks.values()))
mask_failed
mask_failed(
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
    **kwargs,
) -> DataFrame

Mask original values where selected QC tests fail.

Parameters:

  • component
    ((ghi, dni, dif), default: "ghi" ) –

    Convenience selector for pre-defined test groups.

  • tests
    (list[str] or None, default: None ) –

    Explicit test names.

  • like
    (str or None, default: None ) –

    Substring pattern forwarded to DataFrame.filter.

  • regex
    (str or None, default: None ) –

    Regex pattern forwarded to DataFrame.filter.

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments forwarded to DataFrame.mask.

Returns:

  • DataFrame

    Copy of the original data with failed timestamps set to NaN (or to the value specified via other in **kwargs).

Examples:

>>> masked = sdf.qc.mask_failed(component="ghi")
Source code in src/solarpandas/accessors/qcontrol.py
def mask_failed(
    self,
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
    **kwargs,
) -> pd.DataFrame:
    """Mask original values where selected QC tests fail.

    Parameters
    ----------
    component : {"ghi", "dni", "dif"} or None, default None
        Convenience selector for pre-defined test groups.
    tests : list[str] or None, default None
        Explicit test names.
    like : str or None, default None
        Substring pattern forwarded to ``DataFrame.filter``.
    regex : str or None, default None
        Regex pattern forwarded to ``DataFrame.filter``.
    **kwargs : Any
        Extra keyword arguments forwarded to ``DataFrame.mask``.

    Returns
    -------
    pandas.DataFrame
        Copy of the original data with failed timestamps set to ``NaN``
        (or to the value specified via ``other`` in ``**kwargs``).

    Examples
    --------
    >>> masked = sdf.qc.mask_failed(component="ghi")
    """

    failed = self.failed(component, tests=tests, like=like, regex=regex)

    if component is None:
        return self._sdf.mask(failed, **kwargs)

    masked_sdf = self._sdf.copy()
    masked_sdf[component] = masked_sdf[component].mask(failed, **kwargs)
    return masked_sdf
passed
passed(
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
) -> Series

Return a boolean mask where all selected tests pass or are neutral.

Parameters:

  • component
    ((ghi, dni, dif), default: "ghi" ) –

    Convenience selector for pre-defined test groups.

  • tests
    (list[str] or None, default: None ) –

    Explicit test names.

  • like
    (str or None, default: None ) –

    Substring pattern forwarded to DataFrame.filter.

  • regex
    (str or None, default: None ) –

    Regex pattern forwarded to DataFrame.filter.

Returns:

  • Series

    Boolean series; True where all selected tests pass or are not verifiable (neutral) for a given sample.

Examples:

>>> mask = sdf.qc.passed(component="dni")
>>> good_dni = sdf["dni"].loc[mask]
Source code in src/solarpandas/accessors/qcontrol.py
def passed(
    self,
    component: Literal["ghi", "dni", "dif"] | None = None,
    *,
    tests: list[str] | None = None,
    like: str | None = None,
    regex: str | None = None,
) -> pd.Series:
    """Return a boolean mask where all selected tests pass or are neutral.

    Parameters
    ----------
    component : {"ghi", "dni", "dif"} or None, default None
        Convenience selector for pre-defined test groups.
    tests : list[str] or None, default None
        Explicit test names.
    like : str or None, default None
        Substring pattern forwarded to ``DataFrame.filter``.
    regex : str or None, default None
        Regex pattern forwarded to ``DataFrame.filter``.

    Returns
    -------
    pandas.Series
        Boolean series; ``True`` where all selected tests pass or are
        not verifiable (neutral) for a given sample.

    Examples
    --------
    >>> mask = sdf.qc.passed(component="dni")
    >>> good_dni = sdf["dni"].loc[mask]
    """

    return (
        self.filter(component, tests=tests, like=like, regex=regex)
        .apply(lambda test: test.flag.passes | test.flag.not_verifiable)
        .all(axis=1)
    )

clear_cache

clear_cache() -> None

Clear the in-memory quality-control cache.

Examples:

>>> import solarpandas as sp
>>> sp.clear_qc_cache()
Source code in src/solarpandas/accessors/qcontrol.py
def clear_cache() -> None:
    """Clear the in-memory quality-control cache.

    Examples
    --------
    >>> import solarpandas as sp
    >>> sp.clear_qc_cache()
    """
    _run_cached_qc.cache_clear()
    logger.debug("qc cache cleared")

get_cache_info

get_cache_info()

Return cache statistics for quality-control computations.

Returns:

  • dict[str, int | None]

    Dictionary with hits, misses, current_size and max_size.

Examples:

>>> import solarpandas as sp
>>> info = sp.get_qc_cache_info()
>>> "misses" in info
True
Source code in src/solarpandas/accessors/qcontrol.py
def get_cache_info():
    """Return cache statistics for quality-control computations.

    Returns
    -------
    dict[str, int | None]
        Dictionary with ``hits``, ``misses``, ``current_size`` and ``max_size``.

    Examples
    --------
    >>> import solarpandas as sp
    >>> info = sp.get_qc_cache_info()
    >>> "misses" in info
    True
    """
    from .qcontrol import _run_cached_qc
    info = _run_cached_qc.cache_info()
    return {
        "hits": info.hits,
        "misses": info.misses,
        "current_size": info.currsize,
        "max_size": info.maxsize,
    }

qcflag

Accessor methods for working with quality-control flag series and plots.

Classes:

QCFlagAccessor

QCFlagAccessor(series: Series | SolarSeries)

Accessor for Series with QCFlagDtype dtype.

Examples:

>>> qc_series.flag.fails          # True where flag == -1
>>> qc_series.flag.passes         # True where flag == 1
>>> qc_series.flag.not_verifiable # True where flag == 0
>>> counts = qc_series.flag.counts()
>>> qc_series.flag.heatmap()

Methods:

  • counts

    Count occurrences of each QC flag category.

  • heatmap

    Plot a date-time heatmap of QC flag values.

  • pieplot

    Plot a pie chart of the QC flag distribution.

  • plot

    Plot QC results using the test-specific plotting function.

Attributes:

Source code in src/solarpandas/accessors/qcflag.py
def __init__(self, series: pd.Series | SolarSeries) -> None:
    if not series.dtype == QCFlagDtype():
        raise TypeError(
            "The .flag accessor is only available for Series with dtype 'QCFlagDtype'."
        )
    self._series = series
fails property

Boolean mask where QC flag equals -1 (failed).

not_verifiable property
not_verifiable: Series | SolarSeries

Boolean mask where QC flag equals 0 (not verifiable).

passes property
passes: Series | SolarSeries

Boolean mask where QC flag equals 1 (passed).

counts
counts(skip_nighttime: bool = True, **kwargs) -> Series

Count occurrences of each QC flag category.

Parameters:

  • skip_nighttime
    (bool, default: True ) –

    If True and input is SolarSeries, counts only daytime points (solar zenith angle below 90 degrees).

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments passed to Series.value_counts.

Returns:

  • Series

    Counts indexed by flag names.

Source code in src/solarpandas/accessors/qcflag.py
def counts(self, skip_nighttime: bool = True, **kwargs) -> pd.Series:
    """Count occurrences of each QC flag category.

    Parameters
    ----------
    skip_nighttime : bool, default True
        If ``True`` and input is ``SolarSeries``, counts only daytime points
        (solar zenith angle below 90 degrees).
    **kwargs : Any
        Extra keyword arguments passed to ``Series.value_counts``.

    Returns
    -------
    pandas.Series
        Counts indexed by flag names.
    """
    series = self._series
    if skip_nighttime:
        if not isinstance(series, SolarSeries):
            logger.warning(
                "skip_nighttime=True is only valid for SolarSeries. Skipping nighttime filtering."
            )
        else:
            series = series.loc[series.solpos.zenith < 90]
    return series.value_counts(**kwargs).rename(
        index={e.value: e.name for e in QCFlagEnum}
    )
heatmap
heatmap() -> None

Plot a date-time heatmap of QC flag values.

Displays a colour-encoded calendar grid with failed (red), not-verifiable (yellow) and passed (green) categories.

Examples:

>>> sdf.qc["ghi_ppl"].flag.heatmap()
Source code in src/solarpandas/accessors/qcflag.py
def heatmap(self) -> None:
    """Plot a date-time heatmap of QC flag values.

    Displays a colour-encoded calendar grid with failed (red),
    not-verifiable (yellow) and passed (green) categories.

    Examples
    --------
    >>> sdf.qc["ghi_ppl"].flag.heatmap()
    """
    import matplotlib.pyplot as plt
    from matplotlib.colors import BoundaryNorm, ListedColormap
    from ..mplstyles import QC_COLOR_FAILED, QC_COLOR_NOT_VERIFIABLE, QC_COLOR_PASSED

    if not isinstance(self._series, SolarSeries):
        logger.warning("testplot is only valid for SolarSeries. Cannot plot.")
        return

    cmap = ListedColormap(
        [QC_COLOR_FAILED, QC_COLOR_NOT_VERIFIABLE, QC_COLOR_PASSED]
    )
    norm = BoundaryNorm([-1.5, -0.5, 0.5, 1.5], cmap.N)

    fig, ax = plt.subplots(1, 1, figsize=(12, 5), layout="constrained")
    ax.set_facecolor("white")

    kwargs = {
        "twilight_line": True,
        "aggfunc": "median",
        "cmap": cmap,
        "norm": norm,
    }
    self._series.astype(np.int8).solarplot.heatmap(ax=ax, colorbar=False, **kwargs)
    ax.set_title(f"QC Flag for -- {self._series.name} --")

    mesh = ax.collections[0]
    cax = ax.inset_axes([0.0, -0.15, 0.4, 0.03], transform=ax.transAxes)
    cbar = fig.colorbar(mesh, cax=cax, orientation="horizontal")
    cbar.set_ticks([-1, 0, 1])
    cbar.ax.set_xticklabels(["FAILED", "NOT VERIFIABLE", "PASSED"])
pieplot
pieplot(skip_nighttime: bool = True, **kwargs) -> None

Plot a pie chart of the QC flag distribution.

Parameters:

  • skip_nighttime
    (bool, default: True ) –

    If True and input is a SolarSeries, restrict counts to daytime points (solar zenith angle below 90°).

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments passed to Series.plot.pie.

Source code in src/solarpandas/accessors/qcflag.py
def pieplot(self, skip_nighttime: bool = True, **kwargs) -> None:
    """Plot a pie chart of the QC flag distribution.

    Parameters
    ----------
    skip_nighttime : bool, default True
        If ``True`` and input is a ``SolarSeries``, restrict counts to
        daytime points (solar zenith angle below 90°).
    **kwargs : Any
        Extra keyword arguments passed to ``Series.plot.pie``.
    """
    counts = self.counts(skip_nighttime=skip_nighttime, normalize=True)
    defaults = {"labels": counts.index, "autopct": "%1.1f%%", "startangle": 90}
    counts.plot.pie(**(defaults | kwargs))
plot
plot(sdf: SolarDataFrame, **kwargs) -> None

Plot QC results using the test-specific plotting function.

Parameters:

  • sdf
    (SolarDataFrame) –

    Original data used as context for the plot.

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments forwarded to the test-specific plotter.

Source code in src/solarpandas/accessors/qcflag.py
def plot(self, sdf: SolarDataFrame, **kwargs) -> None:
    """Plot QC results using the test-specific plotting function.

    Parameters
    ----------
    sdf : SolarDataFrame
        Original data used as context for the plot.
    **kwargs : Any
        Extra keyword arguments forwarded to the test-specific plotter.
    """
    if not isinstance(self._series, SolarSeries):
        logger.warning("testplot is only valid for SolarSeries. Cannot plot.")
        return

    for _, obj in inspect.getmembers(
        qcrad, predicate=lambda obj: isinstance(obj, helpers.QCTest)
    ):
        if obj.name == self._series.name:
            plot_func = obj._plot_func
            break
    else:
        logger.warning(
            f"No QCTest found for series '{self._series.name}'. Cannot plot."
        )
        return

    return plot_func(sdf, self._series)

solarplot

Plotting utilities and scales for solar data visualization.

Classes:

  • SolarPlotAccessor

    Accessor with high-level plotting methods for solar time series.

SolarPlotAccessor

SolarPlotAccessor(sdf_obj)

Accessor with high-level plotting methods for solar time series.

Methods:

  • diurnal

    All-data compressed timeline removing nighttime gaps.

  • heatmap

    Date-time heatmap of a single variable.

  • rollingday

    Interactive day-by-day time series with keyboard/scroll navigation.

Examples:

>>> sdf.solarplot.diurnal(column="ghi")
>>> sdf.solarplot.heatmap(column="ghi", time_ref="tst")
>>> sdf.solarplot.rollingday("ghi", window_size=3)
Source code in src/solarpandas/accessors/solarplot.py
def __init__(self, sdf_obj):
    self._sdf = self._validate(sdf_obj)
diurnal
diurnal(
    column: str | list[str] | tuple[str, ...] | None = None,
    max_sza: float = 95.0,
    locator=None,
    formatter=None,
    **kwargs,
) -> Figure

Plot one or more variables on a compressed daytime-only timeline.

Parameters:

  • column
    (str, list[str], tuple[str, ...], or None, default: None ) –

    Column(s) to plot for dataframe inputs. Ignored for series inputs.

  • max_sza
    (float, default: 95.0 ) –

    Maximum solar zenith angle used to define daytime samples.

  • locator
    (Any, default: None ) –

    Optional matplotlib date locator/formatter for x-axis ticks.

  • formatter
    (Any, default: None ) –

    Optional matplotlib date locator/formatter for x-axis ticks.

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments forwarded to Axes.plot.

Returns:

  • Figure

    Figure containing the diurnal plot.

Examples:

>>> fig = sdf.solarplot.diurnal(column="ghi", color="gold", lw=1.5)
>>> fig = sdf.solarplot.diurnal(column=["ghi", "dni"], max_sza=90)
Source code in src/solarpandas/accessors/solarplot.py
def diurnal(
    self,
    column: str | list[str] | tuple[str, ...] | None = None,
    max_sza: float = 95.0,
    locator=None,
    formatter=None,
    **kwargs,
) -> plt.Figure:
    """Plot one or more variables on a compressed daytime-only timeline.

    Parameters
    ----------
    column : str, list[str], tuple[str, ...], or None, default None
        Column(s) to plot for dataframe inputs. Ignored for series inputs.
    max_sza : float, default 95.0
        Maximum solar zenith angle used to define daytime samples.
    locator, formatter : Any, optional
        Optional matplotlib date locator/formatter for x-axis ticks.
    **kwargs : Any
        Extra keyword arguments forwarded to ``Axes.plot``.

    Returns
    -------
    matplotlib.figure.Figure
        Figure containing the diurnal plot.

    Examples
    --------
    >>> fig = sdf.solarplot.diurnal(column="ghi", color="gold", lw=1.5)
    >>> fig = sdf.solarplot.diurnal(column=["ghi", "dni"], max_sza=90)
    """

    if isinstance(self._sdf, SolarSeries):
        if column is not None:
            logger.warning("Column name(s) ignored when plotting a SolarSeries.")
        columns = [self._sdf.name or "_unnamed_"]
        sdf = self._sdf.to_frame(columns[0])
    else:
        if column is None:
            columns = self._sdf.columns
        elif isinstance(column, str):
            columns = [column]
        elif isinstance(column, (list, tuple)):
            columns = list(column)
        else:
            raise TypeError(
                "`column` must be a string, a list/tuple of strings, or None."
            )

        missing = [c for c in columns if c not in self._sdf.columns]
        if missing:
            logger.warning(
                f"Columns {missing} not found in dataframe. Defaulting to the first column."
            )
            columns = [self._sdf.columns[0]]

        sdf = self._sdf[columns]

    sza = self._sdf.solpos.zenith
    daytime_mask = sza < max_sza
    df = sdf.where(sza < 91).loc[daytime_mask, columns].copy()
    if df.empty:
        raise ValueError("No daytime samples available with the selected max_sza.")

    step = pd.to_timedelta(infer_time_step(sdf))
    step_days = step / pd.Timedelta("1D")
    real_numdates = mpl.dates.date2num(df.index.to_pydatetime())
    mapper = _DiurnalMapper(real_numdates, nominal_step_days=float(step_days))

    # plt.style.use("solarpandas-diurnal")
    if (ax := kwargs.pop("ax", None)) is None:
        _, ax = plt.subplots(1, 1, figsize=(12, 6), layout="constrained")

    ax.set_xscale("diurnal", mapper=mapper, locator=locator, formatter=formatter)
    ax.plot(df.index, df, **kwargs)

    return ax.get_figure()
heatmap
heatmap(
    column: str | None = None,
    time_ref: Literal["lst", "tst", "lat", "utc"] = "tst",
    max_sza: float | None = 90.0,
    colorbar: bool = True,
    colorbar_title: str | None = None,
    twilight_line: bool = False,
    twilight_line_kwargs: dict | None = None,
    aggfunc: str | Callable = "mean",
    **kwargs,
) -> Figure

Render a date-time heatmap for a selected variable.

Parameters:

  • column
    (str or None, default: None ) –

    Column to plot for dataframe inputs. Defaults to first column.

  • time_ref
    ((lst, tst, lat, utc), default: "lst" ) –

    Time reference used for the y-axis.

  • max_sza
    (float or None, default: 90.0 ) –

    Nighttime masking threshold. Use None to disable masking.

  • colorbar
    (bool, default: True ) –

    Whether to add a colorbar.

  • twilight_line
    (bool, default: False ) –

    Whether to overlay sunrise and sunset curves.

  • aggfunc
    (str or Callable, default: "mean" ) –

    Aggregation function used in the date-time pivot table.

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments forwarded to Axes.pcolormesh.

Returns:

  • Figure

    Figure containing the heatmap.

Examples:

>>> fig = sdf.solarplot.heatmap(column="ghi")
>>> fig = sdf.solarplot.heatmap(column="dni", time_ref="utc", cmap="plasma")
Source code in src/solarpandas/accessors/solarplot.py
def heatmap(
    self,
    column: str | None = None,
    time_ref: Literal["lst", "tst", "lat", "utc"] = "tst",
    max_sza: float | None = 90.0,
    colorbar: bool = True,
    colorbar_title: str | None = None,
    twilight_line: bool = False,
    twilight_line_kwargs: dict | None = None,
    aggfunc: str | Callable = "mean",
    **kwargs,
) -> plt.Figure:
    """Render a date-time heatmap for a selected variable.

    Parameters
    ----------
    column : str or None, default None
        Column to plot for dataframe inputs. Defaults to first column.
    time_ref : {"lst", "tst", "lat", "utc"}, default "tst"
        Time reference used for the y-axis.
    max_sza : float or None, default 90.0
        Nighttime masking threshold. Use ``None`` to disable masking.
    colorbar : bool, default True
        Whether to add a colorbar.
    twilight_line : bool, default False
        Whether to overlay sunrise and sunset curves.
    aggfunc : str or Callable, default "mean"
        Aggregation function used in the date-time pivot table.
    **kwargs : Any
        Extra keyword arguments forwarded to ``Axes.pcolormesh``.

    Returns
    -------
    matplotlib.figure.Figure
        Figure containing the heatmap.

    Examples
    --------
    >>> fig = sdf.solarplot.heatmap(column="ghi")
    >>> fig = sdf.solarplot.heatmap(column="dni", time_ref="utc", cmap="plasma")
    """

    MAP_OF_YLABELS = {
        "lst": "Local Solar Time",
        "tst": "True Solar Time",
        "lat": "Local Apparent Time",
        "utc": "Coordinated Universal Time",
    }

    def time_to_minutes(time_obj):
        return int(time_obj.hour * 60 + time_obj.minute + time_obj.second / 60)

    if isinstance(self._sdf, SolarSeries):
        if column is not None:
            logger.warning("Column name ignored when plotting a SolarSeries.")
        column = self._sdf.name or "_unnamed_"
        sdf = self._sdf.to_frame(column)
    else:
        if column is None:
            logger.warning(
                "No column specified for plotting. Defaulting to the first column."
            )
            column = self._sdf.columns[0]
        elif column not in self._sdf.columns:
            logger.warning(
                f"Column '{column}' not found in dataframe. Defaulting to the first column."
            )
            column = self._sdf.columns[0]
        sdf = self._sdf[[column]]

    df = pd.DataFrame(
        sdf.where(self._sdf.solpos.zenith < (max_sza or 180.0), pd.NA)
    )

    # extend the dataframe to have a complete first and last days
    df = normalize(df)

    time_step = infer_time_step(df)

    if time_ref.casefold() == "lst":
        df = df.set_index(self._sdf.solpos.lst)
    elif time_ref.casefold() in ("tst", "lat"):
        df = df.set_index(self._sdf.solpos.tst)
    else:
        df = df.set_index(self._sdf.index.tz_convert("UTC").tz_localize(None))

    if time_ref.casefold() in ("lst", "tst", "lat"):
        df = df.set_index(df.index.round(time_step))

    table = df.assign(date=df.index.date, time=df.index.time).pivot_table(
        index="time", columns="date", values=column, aggfunc=aggfunc
    )

    date_coords = table.columns
    time_coords = table.index.map(lambda t: np.datetime64(time_to_minutes(t), "m"))

    plt.style.use("solarpandas-dtmap")
    if "rc" in kwargs:
        mpl.rcParams.update(kwargs.pop("rc"))

    if (ax := kwargs.pop("ax", None)) is None:
        _, ax = plt.subplots(1, 1, figsize=(12, 6), layout="constrained")

    mesh = ax.pcolormesh(date_coords, time_coords, table.values, **kwargs)
    if colorbar:
        plt.colorbar(
            mesh,
            ax=ax,
            label=colorbar_title or column,
            pad=0.01,
            fraction=0.025,
            shrink=1.0,
        )

    if twilight_line:

        def get_twilight(which: str):
            twilight = (
                getattr(self._sdf.solpos, which)(units=time_ref)
                .resample("D")
                .median()
                .dt.round("1s")
                .dt.time.map(lambda t: np.datetime64(time_to_minutes(t), "m"))
            )
            twilight = twilight.set_axis(twilight.index.date)
            return twilight.reindex(
                date_coords, method="nearest", tolerance=pd.Timedelta("1D")
            )

        default_twilight_kwargs = {"color": "purple", "ls": "--", "lw": 1.5}
        twilight_line_kwargs = default_twilight_kwargs | (
            twilight_line_kwargs or {}
        )
        ax.plot(
            date_coords,
            get_twilight("sunrise"),
            label="Sunrise",
            **twilight_line_kwargs,
        )
        ax.plot(
            date_coords,
            get_twilight("sunset"),
            label="Sunset",
            **twilight_line_kwargs,
        )

    ax.set_xlabel("Date")

    ax.yaxis.set_major_formatter(DateFormatter("%H:%M"))
    ax.set_ylim(np.datetime64(0, "m"), np.datetime64(24 * 60, "m"))
    ax.set_ylabel(MAP_OF_YLABELS.get(time_ref.casefold()))

    return ax.get_figure()
rolling
rolling(
    column: str | list[str] | tuple[str, ...] | None = None,
    step: int = 1,
    window_size: int = 1,
    max_sza: float = 95.0,
    y_scale: Literal["per_day", "global"] = "per_day",
    plot_kwargs: dict[str, dict] | None = None,
    **kwargs,
) -> Figure

Plot a time series with a rolling window of a given size.

Parameters:

  • column
    (str, list[str], tuple[str, ...], or None, default: None ) –

    Column(s) to plot for dataframe inputs. Ignored for series inputs. When None all columns are shown.

  • step
    (int, default: 1 ) –

    Number of days to navigate when using the left/right arrow keys or mouse scroll.

  • window_size
    (int, default: 1 ) –

    Number of consecutive days shown in the plot area at once.

  • max_sza
    (float, default: 95.0 ) –

    Maximum solar zenith angle of the data to plot. Timestamps with SZA above this threshold are excluded.

  • y_scale
    ((per_day, 'global'), default: "per_day" ) –

    Y-axis scaling strategy. "per_day" autoscales the y-axis to the data visible in the current window on every navigation step. "global" fixes the y-axis to the range of the full dataset and keeps it constant while navigating.

  • plot_kwargs
    (dict[str, dict] or None, default: None ) –

    Per-column matplotlib keyword arguments. Keys are column names; values are dicts of kwargs forwarded to Axes.plot for that specific line. Columns absent from this dict inherit **kwargs.

  • **kwargs
    (Any, default: {} ) –

    Global keyword arguments forwarded to Axes.plot for every line. Per-column entries in plot_kwargs take precedence.

Returns:

  • Figure

    Interactive figure. Use left/right arrow keys or mouse wheel to navigate between days.

Examples:

>>> fig = sdf.solarplot.rollingday(
...     column=["ghi", "dni"],
...     window_size=3,
...     plot_kwargs={
...         "ghi": {"color": "gold", "lw": 2.0},
...         "dni": {"color": "tomato", "ls": "--"},
...     },
...     lw=1.0,
... )
>>> plt.show()
Source code in src/solarpandas/accessors/solarplot.py
def rolling(
    self,
    column: str | list[str] | tuple[str, ...] | None = None,
    step: int = 1,
    window_size: int = 1,
    max_sza: float = 95.0,
    y_scale: Literal["per_day", "global"] = "per_day",
    plot_kwargs: dict[str, dict] | None = None,
    **kwargs,
) -> plt.Figure:
    """Plot a time series with a rolling window of a given size.

    Parameters
    ----------
    column : str, list[str], tuple[str, ...], or None, default None
        Column(s) to plot for dataframe inputs. Ignored for series inputs.
        When ``None`` all columns are shown.
    step : int, default 1
        Number of days to navigate when using the left/right arrow keys or
        mouse scroll.
    window_size : int, default 1
        Number of consecutive days shown in the plot area at once.
    max_sza : float, default 95.0
        Maximum solar zenith angle of the data to plot. Timestamps with
        SZA above this threshold are excluded.
    y_scale : {"per_day", "global"}, default "per_day"
        Y-axis scaling strategy.
        ``"per_day"`` autoscales the y-axis to the data visible in the
        current window on every navigation step.
        ``"global"`` fixes the y-axis to the range of the full dataset
        and keeps it constant while navigating.
    plot_kwargs : dict[str, dict] or None, default None
        Per-column matplotlib keyword arguments. Keys are column names;
        values are dicts of kwargs forwarded to ``Axes.plot`` for that
        specific line. Columns absent from this dict inherit ``**kwargs``.
    **kwargs : Any
        Global keyword arguments forwarded to ``Axes.plot`` for every
        line. Per-column entries in *plot_kwargs* take precedence.

    Returns
    -------
    matplotlib.figure.Figure
        Interactive figure. Use left/right arrow keys or mouse wheel to
        navigate between days.

    Examples
    --------
    >>> fig = sdf.solarplot.rollingday(
    ...     column=["ghi", "dni"],
    ...     window_size=3,
    ...     plot_kwargs={
    ...         "ghi": {"color": "gold", "lw": 2.0},
    ...         "dni": {"color": "tomato", "ls": "--"},
    ...     },
    ...     lw=1.0,
    ... )
    >>> plt.show()
    """

    if window_size < 1:
        raise ValueError("`window_size` must be >= 1")

    # 1. Resolve columns
    if isinstance(self._sdf, SolarSeries):
        if column is not None:
            logger.warning("Column name(s) ignored when plotting a SolarSeries.")
        columns = [self._sdf.name or "_unnamed_"]
        sdf = self._sdf.to_frame(columns[0])
    else:
        if column is None:
            columns = list(self._sdf.columns)
        elif isinstance(column, str):
            columns = [column]
        elif isinstance(column, (list, tuple)):
            columns = list(column)
        else:
            raise TypeError(
                "`column` must be a string, a list/tuple of strings, or None."
            )

        missing = [c for c in columns if c not in self._sdf.columns]
        if missing:
            logger.warning(
                f"Columns {missing} not found in dataframe. Skipping them."
            )
            columns = [c for c in columns if c in self._sdf.columns]
        if not columns:
            raise ValueError("No valid columns to plot.")
        sdf = self._sdf[columns]

    # 2. Pre-compute SZA and time step
    sza = self._sdf.solpos.zenith
    step_days = float(pd.to_timedelta(infer_time_step(sdf)) / pd.Timedelta("1D"))

    # in order to show entire "solar" days I need to set the time index in true
    # solar time coordinates. Otherwise, data from sites far from the prime meridian
    # would have their solar days split across two calendar days
    sdf = sdf.set_index(sdf.solpos.tst).copy()
    sza.index = (
        sdf.index
    )  # apply the same tst index to sza for easier masking later

    # 3. Calendar dates for navigation — tz-independent via .date property
    all_dates = np.unique(sdf.index.date)  # sorted array of datetime.date
    n_total = len(all_dates)

    # 4. Figure setup
    if (ax := kwargs.pop("ax", None)) is None:
        _, ax = plt.subplots(1, 1, figsize=(12, 5), layout="constrained")

    per_col = plot_kwargs or {}

    # Pre-compute global y-limits (physical daytime only)
    global_ylim = None
    if y_scale == "global":
        all_day = sdf.where(sza < 93)
        vmin_g, vmax_g = all_day.min().min(), all_day.max().max()
        if pd.notna(vmin_g) and pd.notna(vmax_g) and vmin_g < vmax_g:
            pad_g = (vmax_g - vmin_g) * 0.05
            global_ylim = (vmin_g - pad_g, vmax_g + pad_g)

    # 5. Navigation state and draw function
    state = {"idx": 0}

    def _update(i: int) -> None:
        # define viewing window limits...
        state["idx"] = max(0, min(i, n_total - window_size))
        idx_start = state["idx"]
        idx_end = min(idx_start + window_size - 1, n_total - 1)

        # Select timestamps: within window dates AND below max_sza
        window_dates = all_dates[idx_start : idx_end + 1]
        date_mask = np.isin(sdf.index.date, window_dates)
        sza_mask = (sza < max_sza).values
        win_mask = date_mask & sza_mask

        sdf_win = sdf.loc[win_mask]
        sza_win = sza.loc[win_mask]
        sdf_day = sdf_win.where(sza_win < 91)

        ax.cla()

        if len(sdf_win) > 0:
            real_nums = mpl.dates.date2num(sdf_win.index.to_pydatetime())
            mapper = _DiurnalMapper(real_nums, nominal_step_days=step_days)
            ax.set_xscale("diurnal", mapper=mapper)
            for col in columns:
                kw = {**kwargs, **per_col.get(col, {})}
                ax.plot(sdf_day.index, sdf_day[col], label=col, **kw)
            if len(columns) > 1:
                ax.legend()
            ax.autoscale_view()

        if global_ylim is not None:
            ax.set_ylim(*global_ylim)
        elif y_scale == "per_day" and not sdf_day.empty:
            vmin = sdf_day.min().min()
            vmax = sdf_day.max().max()
            if pd.notna(vmin) and pd.notna(vmax) and vmin < vmax:
                pad = (vmax - vmin) * 0.05
                ax.set_ylim(vmin - pad, vmax + pad)

        date_start = all_dates[idx_start]
        date_end = all_dates[idx_end]
        date_str = (
            str(date_start)
            if window_size == 1
            else f"{date_start} \u2013 {date_end}"
        )
        ax.set_title(
            f"{date_str}  [{idx_start + 1}/{n_total}]"
            "  \u2190\u2192 or scroll to navigate"
        )
        ax.get_figure().canvas.draw_idle()

    def _on_key(event) -> None:
        if (
            step_ := step
            if event.key == "right"
            else -step
            if event.key == "left"
            else 0
        ):
            _update(state["idx"] + step_)

    def _on_scroll(event) -> None:
        _update(state["idx"] + (step if event.step < 0 else -step))

    # 6. Connect events and show first window
    fig = ax.get_figure()

    _non_interactive = {"agg", "cairo", "pdf", "pgf", "ps", "svg", "template"}
    if mpl.get_backend().lower() in _non_interactive:
        logger.warning(
            f"Backend '{mpl.get_backend()}' is non-interactive; "
            "keyboard/scroll navigation will not work."
        )

    fig.canvas.mpl_connect("key_press_event", _on_key)
    fig.canvas.mpl_connect("scroll_event", _on_scroll)

    _update(0)

    return fig

param

Accessors to retrieve and manipulate parameter metadata in solar series.

Classes:

  • ParameterAccessor

    Accessor for derived irradiance parameters used in QC workflows.

ParameterAccessor

ParameterAccessor(sdf_obj)

Accessor for derived irradiance parameters used in QC workflows.

Examples:

>>> sdf.param.KT
>>> sdf.param.Kn

Attributes:

  • K (SolarSeries) –

    Return diffuse fraction K derived from dif / ghi.

  • KT (SolarSeries) –

    Return the clearness index KT derived from ghi / eth.

  • Kn (SolarSeries) –

    Return normalized beam index Kn derived from dni * cosz / eth.

Source code in src/solarpandas/accessors/param.py
def __init__(self, sdf_obj):
    self._sdf = self._validate(sdf_obj)
K property

Return diffuse fraction K derived from dif / ghi.

Returns:

  • SolarSeries

    Daytime-clipped K values in the range [1e-3, 1.10].

KT property

Return the clearness index KT derived from ghi / eth.

Returns:

  • SolarSeries

    Daytime-clipped KT values in the range [1e-3, 1.35].

Kn property

Return normalized beam index Kn derived from dni * cosz / eth.

Returns:

  • SolarSeries

    Daytime-clipped Kn values in the range [1e-3, 1.10].

pvirrad

Accessors to evaluate pv yield.

Classes:

  • PVAccessor

    Accessor for PV yield evaluation.

PVAccessor

PVAccessor(sdf_obj)

Accessor for PV yield evaluation.

Examples:

>>> sdf.pv.yield_dc(...)
>>> sdf.pv.yield_ac(...)
>>> sdf.pv.clipping_losses(...)
>>> sdf.pv.optimal_dc_to_ac_ratio(...)

Methods:

  • clipping_losses

    Calculate the clipping losses of a PV system assuming the pvlib's PVWatts inverter model.

  • optimal_dc_to_ac_ratio

    Calculate the optimal DC/AC ratio of a PV system.

  • poa_irradiance

    Transposition of solar irradiance to the PV plane of array.

  • yield_ac

    Calculate the AC power yield of a PV system.

  • yield_dc

    Calculate the DC power yield of a PV system.

Source code in src/solarpandas/accessors/pvirrad.py
def __init__(self, sdf_obj):
    self._sdf: SolarDataFrame = self._validate(sdf_obj)
clipping_losses
clipping_losses(
    dc_to_ac_ratio: ndarray[tuple[int]]
    | float
    | None = None,
    time_series: bool = False,
    units: Literal["W", "fraction"] = "fraction",
    yield_dc_kwargs: dict | None = None,
    inverter_effic: float = 0.96,
    method: Literal["integral", "explicit"] = "integral",
    integral_bins: ndarray[tuple[int]] | int = 200,
) -> float | Series | SolarDataFrame

Calculate the clipping losses of a PV system assuming the pvlib's PVWatts inverter model.

Parameters:

  • dc_to_ac_ratio
    (ndarray[tuple[int]] | float | None, default: None ) –

    DC/AC ratio of the PV system. If None, a default range of values from 1.0 to 1.8 (inclusive) is used.

  • time_series
    (bool, default: False ) –

    If True, returns the time series of clipping losses instead of the total value. If True, dc_to_ac_ratio must be a single value.

  • units
    (Literal['W', 'fraction'], default: 'fraction' ) –

    Units for the output clipping losses. Options are "fraction" for the fraction of DC power yield lost due to clipping, and "W" for the average DC power lost due to clipping.

  • yield_dc_kwargs
    (dict | None, default: None ) –

    Keyword arguments to be passed to the yield_dc method.

  • inverter_effic
    (float, default: 0.96 ) –

    Inverter efficiency.

  • method
    (Literal['integral', 'explicit'], default: 'integral' ) –

    Method to use for calculating clipping losses. Options are "integral" and "explicit".

  • integral_bins
    (ndarray[tuple[int]] | int, default: 200 ) –

    Number of bins to evaluate the integral for the integral method.

Returns:

  • float or Series or SolarDataFrame

    If time_series is True, a SolarDataFrame is returned with the time series of clipping losses, DC power and AC power. Otherwise, it returns clipping losses as a fraction of the total DC energy yield. If dc_to_ac_ratio is a single value, a float is returned. If dc_to_ac_ratio is an array-like, a pd.Series is returned with the DC/AC ratios as the index.

Notes

The explicit method is based on the Michelli et al. (doi: 10.1016/j.renene.2024.120317) approach:

C_L = E_{DC} \eta_{inv} - E^{peak}_{AC}

where \(E_{DC}\) is the DC energy yield, \(\eta_{inv}\) is the inverter's nominal efficiency, \(E^{peak}_{AC} = \frac{E_{DC}}{\tau}\) is the AC energy yield at the inverter's peak power limit, and \(\tau\) is the DC/AC ratio.

The integral method evaluates clipping losses making explicit their relation with the probability density function (PDF) of the DC power output, and so also to that of the plane-of-array (POA) irradiance. Thus, it provides direct evidence of the impact of solar irradiance characteristics on clipping losses.

C_L = \int_{P_{AC}^{peak}}^{\infty} (P_{DC} \eta_{inv} - P_{AC}^{peak}) f(P_{DC}) dP_{DC}
Source code in src/solarpandas/accessors/pvirrad.py
def clipping_losses(
    self,
    dc_to_ac_ratio: np.ndarray[tuple[int]] | float | None = None,
    time_series: bool = False,
    units: Literal["W", "fraction"] = "fraction",
    yield_dc_kwargs: dict | None = None,
    inverter_effic: float = 0.96,
    method: Literal["integral", "explicit"] = "integral",
    integral_bins: np.ndarray[tuple[int]] | int = 200,
) -> float | pd.Series | SolarDataFrame:
    r"""Calculate the clipping losses of a PV system assuming the pvlib's PVWatts inverter model.

    Parameters
    ----------
    dc_to_ac_ratio: float or array-like
        DC/AC ratio of the PV system. If None, a default range of values from 1.0 to 1.8
        (inclusive) is used.
    time_series: bool, default False
        If True, returns the time series of clipping losses instead of the total value. If True,
        `dc_to_ac_ratio` must be a single value.
    units: str, default "fraction"
        Units for the output clipping losses. Options are "fraction" for the fraction of DC
        power yield lost due to clipping, and "W" for the average DC power lost due to clipping.
    yield_dc_kwargs: dict, default None
        Keyword arguments to be passed to the `yield_dc` method.
    inverter_effic: float, default 0.96
        Inverter efficiency.
    method: str, default "integral"
        Method to use for calculating clipping losses. Options are "integral" and "explicit".
    integral_bins: int or array-like, default 200
        Number of bins to evaluate the integral for the integral method.

    Returns
    -------
    float or pd.Series or SolarDataFrame
        If `time_series` is True, a SolarDataFrame is returned with the time series of clipping
        losses, DC power and AC power. Otherwise, it returns clipping losses as a fraction of the
        total DC energy yield. If `dc_to_ac_ratio` is a single value, a float is returned. If
        `dc_to_ac_ratio` is an array-like, a pd.Series is returned with the DC/AC ratios as the index.

    Notes
    -----
    The `explicit` method is based on the Michelli et al. (doi: 
    [10.1016/j.renene.2024.120317](https://doi.org/10.1016/j.renene.2024.120317)) approach:

    ```math
    C_L = E_{DC} \eta_{inv} - E^{peak}_{AC}
    ```

    where $E_{DC}$ is the DC energy yield, $\eta_{inv}$ is the inverter's nominal efficiency,
    $E^{peak}_{AC} = \frac{E_{DC}}{\tau}$ is the AC energy yield at the inverter's peak power limit,
    and $\tau$ is the DC/AC ratio.

    The `integral` method evaluates clipping losses making explicit their relation with the probability
    density function (PDF) of the DC power output, and so also to that of the plane-of-array (POA)
    irradiance. Thus, it provides direct evidence of the impact of solar irradiance characteristics
    on clipping losses.

    ```math
    C_L = \int_{P_{AC}^{peak}}^{\infty} (P_{DC} \eta_{inv} - P_{AC}^{peak}) f(P_{DC}) dP_{DC}
    ```
    """

    if dc_to_ac_ratio is None:
        dc_to_ac_ratio = np.linspace(1., 2., 51)

    if isinstance(dc_to_ac_ratio, (int, float)):
        dc_to_ac_ratio = np.array([dc_to_ac_ratio])

    yield_dc_kwargs = yield_dc_kwargs or {}
    yield_dc_kwargs.update({"units": "W", "full_output": False})
    pdc = self.yield_dc(**yield_dc_kwargs)
    p_dc_peak = float(pdc.custom_metadata["pvsystem"]["p_dc_peak"])

    max_sza = 180.  # deg
    sza = pdc.solpos.zenith
    diurnal = sza.lt(max_sza)

    def compute_clipping_losses_numpy(
        pdc: np.ndarray[tuple[int]]
    ) -> tuple[np.ndarray, np.ndarray]:
        """Compute the clipping losses for a given DC power output and DC/AC ratios.

        The AC power yield is evaluated using the full pvwatts inverter model in order to account
        for the non-linear behavior of the inverter at low DC power inputs, which is especially
        relevant for low DC/AC ratios and low irradiance conditions. This approach is as using
        `self.yield_ac` but it is more efficient when AC power is computed for several DC/AC ratios
        at once, as `self.yield_ac` would requiere an iteration with repeating calculations every
        loop.

        Parameters
        ----------
        pdc: np.ndarray
            DC power output of the PV system in Watts.

        Uses p_dc_peak, dc_to_ac_ratio, and inverter_effic from the enclosing scope.

        Returns
        -------
        - clipping losses: np.ndarray [Watts]
        - clipped AC power: np.ndarray [Watts]
        """
        # # approximate approach, as in Micheli et al.
        # p_ac_peak = p_dc_peak / dc_to_ac_ratio  # (n_ratios,)
        # pac_unclipped = pdc_centers*inverter_effic  # (n_times or n_bins,)
        # return np.clip(pac_unclipped[:, None] - p_ac_peak[None, :], 0., None)  # (n_times or n_bins, n_ratios)

        p_ac_peak = p_dc_peak / dc_to_ac_ratio  # (n_ratios,) [Watts]
        p_dc0 = p_ac_peak / inverter_effic  # (n_ratios,) [Watts]
        zeta = pdc[:, None] / p_dc0[None, :]  # (n_times or n_bins, n_ratios) [-]
        domain = zeta > 0
        eta = np.zeros_like(zeta, dtype=float)
        eta[domain] = ((inverter_effic/0.9637)
                       *(-0.0162*zeta[domain] - 0.0059/zeta[domain] + 0.9858))  # (n_times or n_bins, n_ratios) [-]
        pac_unclipped = pdc[:, None]*eta  # (n_times or n_bins, n_ratios) [Watts]
        losses = np.clip(pac_unclipped - p_ac_peak[None, :], 0., None)  # (n_times or n_bins, n_ratios) [Watts]
        return losses, np.minimum(pac_unclipped, p_ac_peak[None, :])  # (n_times or n_bins, n_ratios) [Watts]

    if time_series is True:
        logger.debug("Calculating clipping losses time series")
        losses, pac = compute_clipping_losses_numpy(pdc.to_numpy())
        if units == "fraction":
            logger.warning("`units='fraction'` ignored: clipping losses time series are provided in Watts.")
        return pdc.to_frame().assign(
            pac=pac,
            clipping_losses=np.squeeze(losses))

    if method == "explicit":
        logger.debug("Calculating clipping losses using the explicit method")
        losses, _ = compute_clipping_losses_numpy(pdc.to_numpy())  # (n_times, n_ratios) [Watts]
        total_losses = np.nanmean(losses[diurnal], axis=0)  # (n_ratios,) [Watts]

    elif method == "integral":
        logger.debug("Calculating clipping losses using the integral method")
        pdf, bin_edges = np.histogram(pdc.loc[diurnal].dropna(), bins=integral_bins, density=True)  # pdf: (n_bins,) [Watts-1]
        pdc_intervals = np.diff(bin_edges)  # bin intervals: (n_bins,) [Watts]
        pdc_centers = (bin_edges[1:] + bin_edges[:-1]) / 2  # bin centers: (n_bins,) [Watts]
        p_ac_peak = p_dc_peak / dc_to_ac_ratio  # (n_ratios,) [Watts]
        losses, _ = compute_clipping_losses_numpy(pdc_centers)  # (n_bins, n_ratios) [Watts]
        integrand = losses*pdf[:, None]*pdc_intervals[:, None]  # (n_bins, n_ratios) [Watts]
        integrand[pdc_centers[:, None] < p_ac_peak[None, :]] = np.nan  # restricts the integral to the clipping region
        total_losses = np.nansum(integrand, axis=0)  # (n_ratios,) [Watts]

    else:
        raise ValueError(f"unknown method {method=}")

    if units == "fraction":
        total_losses = total_losses / pdc.where(diurnal).mean()  # [-]

    if total_losses.size == 1:
        return total_losses.item()

    return pd.Series(
        data=total_losses,
        index=dc_to_ac_ratio,
        name="clipping_losses",
        dtype=float)
optimal_dc_to_ac_ratio
optimal_dc_to_ac_ratio(
    selling_price: float = 4e-05,
    inverter_cost: float = 0.35,
    inverter_payback: int = 10,
    clipping_losses_kwargs: dict | None = None,
) -> tuple[float, Series]

Calculate the optimal DC/AC ratio of a PV system.

The optimal DC/AC ratio is the one that maximizes the net savings of the PV system, taking into account the cost of the inverter, its payback period, and the selling price of the electricity produced by the PV system. The net savings are calculated as the difference between the inverter savings and the cost of the inverter prorated over its payback period.

Parameters:

  • selling_price
    (float, default: 4e-05 ) –

    Selling price of the electricity produced by the PV system, in EUR/Wh. Default is 40e-6 EUR/Wh (i.e., 40 EUR/MWh).

  • inverter_cost
    (float, default: 0.35 ) –

    Cost of the inverter, in EUR/W. Default is 0.35 EUR/W, which is typical for a 350 kW+ string inverter used in commercial PV plants.

  • inverter_payback
    (int, default: 10 ) –

    Payback period for the inverter, in years. Default is 10 years.

  • clipping_losses_kwargs
    (dict | None, default: None ) –

    Additional kwargs to be passed to the clipping_losses method. See clipping_losses for details.

Returns:

  • float and Series

    The DC/AC ratio that maximizes the net savings and a pandas Series with the net savings (in EUR/year) for different DC/AC ratios.

Source code in src/solarpandas/accessors/pvirrad.py
def optimal_dc_to_ac_ratio(
    self,
    selling_price: float = 40e-6,  # EUR/Wh  (=40 EUR/MWh)
    inverter_cost: float = 0.35,  # EUR/W  (for a 350 kW+ string inverter, typical for commercial PV plants)
    inverter_payback: int = 10,  # years
    clipping_losses_kwargs: dict | None = None,
) -> tuple[float, pd.Series]:
    """Calculate the optimal DC/AC ratio of a PV system.

    The optimal DC/AC ratio is the one that maximizes the net savings of the PV system, taking into account
    the cost of the inverter, its payback period, and the selling price of the electricity produced by the
    PV system. The net savings are calculated as the difference between the inverter savings and the cost of
    the inverter prorated over its payback period.

    Parameters
    ----------
    selling_price: float
        Selling price of the electricity produced by the PV system, in EUR/Wh. Default is 40e-6 EUR/Wh (i.e., 40 EUR/MWh).
    inverter_cost: float
        Cost of the inverter, in EUR/W. Default is 0.35 EUR/W, which is typical for a 350 kW+ string inverter used in commercial PV plants.
    inverter_payback: int
        Payback period for the inverter, in years. Default is 10 years.
    clipping_losses_kwargs: dict | None
        Additional kwargs to be passed to the `clipping_losses` method. See `clipping_losses` for details.

    Returns
    -------
    float and pd.Series
        The DC/AC ratio that maximizes the net savings and a pandas Series with the net savings (in EUR/year) for different DC/AC ratios.
    """

    # Análisis aproximado de costos, según una búsqueda en Gemini:
    #   - En términos absolutos, el costo promedio de instalación de una planta FV es de 0.70-0.90 USD/Wp.
    #     Para una planta de 1 MWp: 700-900 kUSD
    #   - En términos relativos, el costo se desglosa en:
    #     - Módulos FV: 35-45% (~320 kUSD)
    #     - Inversor: 8-12% (~80 kUSD)
    #     - Estructura y seguidores: 10-15% (~90 kUSD)
    #     - Cableado, protecciones, transformador: 10-12% (~90 kUSD)
    #     - Instalación y mano de obra: 10-15% (~90 kUSD)
    #     - Otros: 10-15% (~90 kUSD)
    #   - No obstante, en una búsqueda independiente sobre el precio promedio por vatio de un inversor para
    #     planta fotovoltaica he encontrado precios bastante superiores:
    #     - Inversores centrales: son la opción más económica para proyectos 50 MW+, con costos que pueden
    #       bajar hasta 0.10-0.25 USD/W en mercados muy competitivos, como China, o mantenerse cerca de los
    #       0.35 USD/W en otras regiones.
    #     - Inversores de cadena (string): son más caros que los centrales (a veces incluso el doble), pero
    #       los de alta potencia (350 kW+) para aplicaciones comerciales e industriales rondan 0.30-0.40 USD/W
    #     - Con estos datos, y asumiendo un DC/AC ratio de 1.3, el coste del inversor en una planta de 1 MWp
    #       sería de ~230-300 kUSD.

    from scipy.optimize import minimize_scalar

    P_DC_PEAK = 1.  # just a reference value for the calculations [Wp]

    clipping_losses_kwargs = clipping_losses_kwargs or {}
    clipping_losses_kwargs.setdefault("dc_to_ac_ratio", np.linspace(1.0, 1.5, 100))
    clipping_losses_kwargs.setdefault("yield_dc_kwargs", {})
    clipping_losses_kwargs.update({"time_series": False, "units": "W"})
    clipping_losses_kwargs["yield_dc_kwargs"].update({"p_dc_peak": P_DC_PEAK})
    cliploss = self.clipping_losses(**clipping_losses_kwargs) * 8760  # (n_ratios,) [Wh/year]  (8760 h/year)

    dc_to_ac_ratio = cliploss.index
    inverter_savings = (P_DC_PEAK - P_DC_PEAK/dc_to_ac_ratio) * inverter_cost / inverter_payback  # (n_ratios,) [EUR/year]
    selling_loss = cliploss * selling_price  # (n_ratios,) [EUR/year]
    net_savings = inverter_savings - selling_loss  # (n_ratios,) [EUR/year]
    result = minimize_scalar(
        lambda x: -np.interp(x, dc_to_ac_ratio, net_savings),
        bounds=(dc_to_ac_ratio.min(), dc_to_ac_ratio.max()),
        method='bounded')
    net_savings = pd.Series(
        data=net_savings,
        index=dc_to_ac_ratio,
        name="net_savings",
        dtype=float)  # (n_ratios,) [EUR/year]
    return result.x, net_savings
poa_irradiance
poa_irradiance(
    tracking: Literal[
        "fixed", "fixed_optimal", "singleaxis", "dualaxis"
    ] = "singleaxis",
    aoi_losses: bool = True,
    tracking_kwargs: dict = None,
    transposition_kwargs: dict = None,
) -> SolarDataFrame

Transposition of solar irradiance to the PV plane of array.

It requires ghi, dni and dif columns. The calculations are delegated to the pvlib library.

Parameters:

  • tracking
    (Literal['fixed', 'fixed_optimal', 'singleaxis', 'dualaxis'], default: 'singleaxis' ) –

    type of tracking system to use for calculating the plane-of-array (POA) irradiance. Options are: - "fixed": fixed tilt system, with the tilt and azimuth angles specified in tracking_kwargs as poa_tilt and poa_azimuth, respectively. The tilt angle is the angle between the plane of the PV array and the horizontal plane, while the azimuth angle is the compass direction that the PV array faces (0° for north, 90° for east, 180° for south, and 270° for west). - "fixed_optimal": fixed tilt system with the optimal tilt angle calculated based on the latitude of the location. The optimal tilt angle is calculated using a simple empirical formula that provides a good approximation for many locations: poa_tilt = 0.87*latitude for latitudes between -25° and 25°, poa_tilt = 0.76*latitude + 3.1 for latitudes between -50° and 50°, and a fixed tilt of 40° for latitudes outside this range. The azimuth angle is set to 180° (south-facing) by default. - "singleaxis": single-axis tracking system, with the tracking parameters specified in tracking_kwargs as axis_tilt, axis_azimuth, max_angle, backtrack, and gcr. The single-axis tracking system rotates around a single axis to follow the sun's movement, which can increase the energy yield compared to a fixed tilt system. The axis_tilt is the angle of the rotation axis relative to the horizontal plane, while the axis_azimuth is the compass direction of the rotation axis. The max_angle is the maximum rotation angle of the tracker, while backtrack indicates whether to use backtracking to avoid shading between rows of panels. The gcr (ground coverage ratio) is the ratio of the area covered by the PV panels to the total ground area. - "dualaxis": dual-axis tracking system, which can rotate around two axes to follow the sun's movement more accurately. In this case, the plane-of-array (POA) tilt and azimuth angles are calculated based on the solar zenith and azimuth angles, resulting in a POA that is always perpendicular to the sun's rays. This type of tracking system can provide the highest energy yield, but it is also more complex and expensive than fixed or single-axis tracking systems. The dual-axis tracking system is particularly beneficial in locations with high solar variability or for applications that require maximizing energy yield, such as in concentrated solar power (CSP) systems or for certain types of PV installations where space is limited and maximizing energy production is critical.

  • aoi_losses
    (bool, default: True ) –

    whether to apply the Martin-Ruiz incidence angle modifier (IAM) correction to account for the reduction in effective irradiance on the PV modules at high angles of incidence. The IAM correction reduces the effective irradiance on the PV modules when the angle of incidence of the sunlight is large, which can occur during early morning, late afternoon, or in locations with high solar zenith angles. Applying the IAM correction can provide a more accurate estimation of the DC power output, especially for fixed tilt systems or for locations with high solar variability. The Martin-Ruiz model is a widely used empirical model for calculating the IAM and is based on measurements of the performance of PV modules at different angles of incidence.

  • tracking_kwargs
    (dict, default: None ) –

    keyword arguments for the tracking system, such as poa_tilt and poa_azimuth, which must be provided for fixed tilt systems (tracking='fixed'), or axis_tilt, axis_azimuth, max_angle, backtrack, and gcr for single-axis tracking systems (tracking='single-axis'). The default values for the single-axis parameters are axis_tilt=0., axis_azimuth=180., max_angle=60., backtrack=True, and gcr=0.4, which correspond to a common single-axis tracking configuration with a horizontal rotation axis (sometimes referred to as HSAT --horizontal single-axis tracker--) and a ground coverage ratio of 0.4, which is typical for commercial PV installations. These default values can be overridden by providing the desired values in tracking_kwargs. For dual-axis tracking systems (tracking='dual-axis'), no additional parameters are required as the POA tilt and azimuth are calculated based on the solar position.

  • transposition_kwargs
    (dict, default: None ) –

    keyword arguments to select the transposition model to be used to calculate the POA irradiance. It can affect the accuracy of the POA irradiance calculation, especially under certain sky conditions (e.g., cloudy vs. clear skies). The model is selected with the key model, whose possible values are isotropic (the default), klucher, haydavies, reindl, king, perez and perez-driesse. Some of the model choices accept additional parameters to be passed also in transposition_kwargs, such as, "perez_model" if the selected model is perez, or "dni_extra" if the selected model is one of haydavies, reindl, perez or perez-driesse.

Returns:

  • SolarDataFrame

    a new SolarDataFrame with the same index as the original but with the following columns: - tilt: plane-of-array tilt angle (degrees) - azimuth: plane-of-array azimuth angle (degrees) - aoi: angle of incidence of the sunlight on the plane of array (degrees) - direct: direct component of the plane-of-array irradiance (W m-2) - sky_diffuse: sky diffuse component of the plane-of-array irradiance (W m-2) - ground_diffuse: ground diffuse component of the plane-of-array irradiance (W m-2) - diffuse: total diffuse component of the plane-of-array irradiance (W m-2) - global: total plane-of-array irradiance (W m-2)

Notes

For more details see the pvlib documentation for the get_total_irradiance function, which is used to perform the transposition calculations, and the iam.martin_ruiz and iam.martin_ruiz_diffuse functions, which are used to apply the incidence angle modifier (IAM) correction if aoi_losses is set to True.

https://pvlib-python.readthedocs.io/en/stable/reference/generated/pvlib.irradiance.get_total_irradiance.html#pvlib-irradiance-get-total-irradiance

Source code in src/solarpandas/accessors/pvirrad.py
def poa_irradiance(
    self,
    tracking: Literal["fixed", "fixed_optimal", "singleaxis", "dualaxis"] = "singleaxis",
    aoi_losses: bool = True,
    tracking_kwargs: dict = None,
    transposition_kwargs: dict = None,
) -> SolarDataFrame:
    """Transposition of solar irradiance to the PV plane of array.

    It requires `ghi`, `dni` and `dif` columns. The calculations are delegated to the `pvlib` library.

    Parameters
    ----------
    tracking: str
        type of tracking system to use for calculating the plane-of-array (POA) irradiance. Options are:
        - "fixed": fixed tilt system, with the tilt and azimuth angles specified in `tracking_kwargs` as
            `poa_tilt` and `poa_azimuth`, respectively. The tilt angle is the angle between the plane of the
            PV array and the horizontal plane, while the azimuth angle is the compass direction that the PV
            array faces (0° for north, 90° for east, 180° for south, and 270° for west).
        - "fixed_optimal": fixed tilt system with the optimal tilt angle calculated based on the latitude
            of the location. The optimal tilt angle is calculated using a simple empirical formula that provides
            a good approximation for many locations: `poa_tilt = 0.87*latitude` for latitudes between -25° and
            25°, `poa_tilt = 0.76*latitude + 3.1` for latitudes between -50° and 50°, and a fixed tilt of 40°
            for latitudes outside this range. The azimuth angle is set to 180° (south-facing) by default.
        - "singleaxis": single-axis tracking system, with the tracking parameters specified in `tracking_kwargs`
            as `axis_tilt`, `axis_azimuth`, `max_angle`, `backtrack`, and `gcr`. The single-axis tracking system
            rotates around a single axis to follow the sun's movement, which can increase the energy yield
            compared to a fixed tilt system. The `axis_tilt` is the angle of the rotation axis relative to the
            horizontal plane, while the `axis_azimuth` is the compass direction of the rotation axis. The
            `max_angle` is the maximum rotation angle of the tracker, while `backtrack` indicates whether to use
            backtracking to avoid shading between rows of panels. The `gcr` (ground coverage ratio) is the ratio
            of the area covered by the PV panels to the total ground area.
        - "dualaxis": dual-axis tracking system, which can rotate around two axes to follow the sun's movement
            more accurately. In this case, the plane-of-array (POA) tilt and azimuth angles are calculated based
            on the solar zenith and azimuth angles, resulting in a POA that is always perpendicular to the sun's
            rays. This type of tracking system can provide the highest energy yield, but it is also more complex
            and expensive than fixed or single-axis tracking systems. The dual-axis tracking system is particularly
            beneficial in locations with high solar variability or for applications that require maximizing energy
            yield, such as in concentrated solar power (CSP) systems or for certain types of PV installations
            where space is limited and maximizing energy production is critical.
    aoi_losses: bool
        whether to apply the Martin-Ruiz incidence angle modifier (IAM) correction to account for the reduction
        in effective irradiance on the PV modules at high angles of incidence. The IAM correction reduces the
        effective irradiance on the PV modules when the angle of incidence of the sunlight is large, which can
        occur during early morning, late afternoon, or in locations with high solar zenith angles. Applying the
        IAM correction can provide a more accurate estimation of the DC power output, especially for fixed tilt
        systems or for locations with high solar variability. The Martin-Ruiz model is a widely used empirical
        model for calculating the IAM and is based on measurements of the performance of PV modules at different
        angles of incidence.
    tracking_kwargs: dict
        keyword arguments for the tracking system, such as `poa_tilt` and `poa_azimuth`, which must be provided
        for fixed tilt systems (`tracking='fixed'`), or `axis_tilt`, `axis_azimuth`, `max_angle`, `backtrack`,
        and `gcr` for single-axis tracking systems (`tracking='single-axis'`).
        The default values for the single-axis parameters are `axis_tilt=0.`, `axis_azimuth=180.`, `max_angle=60.`,
        `backtrack=True`, and `gcr=0.4`, which correspond to a common single-axis tracking configuration with a
        horizontal rotation axis (sometimes referred to as HSAT --horizontal single-axis tracker--) and a ground
        coverage ratio of 0.4, which is typical for commercial PV installations. These default values can be
        overridden by providing the desired values in `tracking_kwargs`.
        For dual-axis tracking systems (`tracking='dual-axis'`), no additional parameters are required as the POA
        tilt and azimuth are calculated based on the solar position.
    transposition_kwargs: dict
        keyword arguments to select the transposition model to be used to calculate the POA irradiance. It can
        affect the accuracy of the POA irradiance calculation, especially under certain sky conditions (e.g., cloudy
        vs. clear skies). The model is selected with the key `model`, whose possible values are `isotropic` (the
        default), `klucher`, `haydavies`, `reindl`, `king`, `perez` and `perez-driesse`. Some of the model choices
        accept additional parameters to be passed also in transposition_kwargs, such as, "perez_model" if the selected
        model is `perez`, or "dni_extra" if the selected model is one of `haydavies`, `reindl`, `perez` or
        `perez-driesse`.

    Returns
    -------
    SolarDataFrame
        a new SolarDataFrame with the same index as the original but with the following columns:
        - `tilt`: plane-of-array tilt angle (degrees)
        - `azimuth`: plane-of-array azimuth angle (degrees)
        - `aoi`: angle of incidence of the sunlight on the plane of array (degrees)
        - `direct`: direct component of the plane-of-array irradiance (W m-2)
        - `sky_diffuse`: sky diffuse component of the plane-of-array irradiance (W m-2)
        - `ground_diffuse`: ground diffuse component of the plane-of-array irradiance (W m-2)
        - `diffuse`: total diffuse component of the plane-of-array irradiance (W m-2)
        - `global`: total plane-of-array irradiance (W m-2)

    Notes
    -----
    For more details see the `pvlib` documentation for the `get_total_irradiance` function, which is used to perform
    the transposition calculations, and the `iam.martin_ruiz` and `iam.martin_ruiz_diffuse` functions, which are used
    to apply the incidence angle modifier (IAM) correction if `aoi_losses` is set to True.

    https://pvlib-python.readthedocs.io/en/stable/reference/generated/pvlib.irradiance.get_total_irradiance.html#pvlib-irradiance-get-total-irradiance

    """

    if tracking not in ("fixed", "fixed_optimal", "singleaxis", "dualaxis"):
        raise AssertionError(f"unknown option {tracking=}")

    sdf = self._sdf.resolve_closure()

    tracking_kwargs = tracking_kwargs or {}
    tracking_kwargs.setdefault("axis_tilt", 0.)  # horizontal axis
    tracking_kwargs.setdefault("axis_azimuth", 180.)  # south-facing
    tracking_kwargs.setdefault("max_angle", 60.)  # a common maximum rotation
    tracking_kwargs.setdefault("backtrack", True)  # backtrack for a typical c-Si array
    tracking_kwargs.setdefault("gcr", 0.4)  # common ground coverage ratio

    transposition_kwargs = transposition_kwargs or {}
    transposition_kwargs.setdefault("model", "isotropic")  # the simplest transposition approach

    poa = pd.DataFrame(index=sdf.index, dtype=float)  # container for output results

    # t r a c k i n g: compute poa tilt, azimuth and angle of incidence (aoi) from the tracker configuration

    if tracking == "fixed":
        if "poa_tilt" not in tracking_kwargs:
            raise ValueError("`poa_tilt` required in `tracking_kwargs` for tracking='fixed'")
        if "poa_azimuth" not in tracking_kwargs:
            raise ValueError("`poa_azimuth` required in `tracking_kwargs` for tracking='fixed'")
        poa_tilt = tracking_kwargs["poa_tilt"]
        poa_azimuth = tracking_kwargs["poa_azimuth"]
        aoi = pvlib.irradiance.aoi(poa_tilt, poa_azimuth, sdf.solpos.zenith, sdf.solpos.azimuth)
        poa = poa.assign(tilt=poa_tilt, azimuth=poa_azimuth, aoi=aoi)

    if tracking == "fixed_optimal":
        poa_tilt = (0.87*sdf.latitude if -25 < sdf.latitude < 25
                    else (0.76*sdf.latitude+3.1 if -50 < sdf.latitude < 50 else 40.))
        poa_azimuth = 0.  # southern
        aoi = pvlib.irradiance.aoi(poa_tilt, poa_azimuth, sdf.solpos.zenith, sdf.solpos.azimuth)
        poa = poa.assign(tilt=poa_tilt, azimuth=poa_azimuth, aoi=aoi)

    if tracking == "singleaxis":
        tracking_geometry = pvlib.tracking.singleaxis(sdf.solpos.zenith, sdf.solpos.azimuth, **tracking_kwargs)
        poa_tilt = tracking_geometry["surface_tilt"]
        poa_azimuth = tracking_geometry["surface_azimuth"]
        aoi = tracking_geometry["aoi"]
        poa = poa.assign(tilt=poa_tilt, azimuth=poa_azimuth, aoi=aoi)

    if tracking == "dualaxis":
        poa_tilt = sdf.solpos.zenith.clip(lower=0, upper=90)
        poa_azimuth = sdf.solpos.azimuth
        poa = poa.assign(tilt=poa_tilt, azimuth=poa_azimuth, aoi=0.)

    # t r a n s p o s i t i o n

    transposed_irradiance = pvlib.irradiance.get_total_irradiance(
        surface_tilt=poa.tilt,
        surface_azimuth=poa.azimuth,
        solar_zenith=sdf.solpos.zenith,
        solar_azimuth=sdf.solpos.azimuth,
        ghi=sdf.ghi,
        dni=sdf.dni,
        dhi=sdf.dif,
        **transposition_kwargs)

    poa = poa.assign(**transposed_irradiance.rename(columns=lambda name: name.removeprefix("poa_")))

    # i n c i d e n c e   a n g l e   m o d i f i e r   ( I A M )   c o r r e c t i o n

    if aoi_losses:
        iam_direct = pvlib.iam.martin_ruiz(poa.aoi)
        poa["direct"] = poa["direct"] * iam_direct
        iam_sky, iam_ground = pvlib.iam.martin_ruiz_diffuse(poa.tilt)
        poa["sky_diffuse"] = poa["sky_diffuse"] * iam_sky
        poa["ground_diffuse"] = poa["ground_diffuse"] * iam_ground

    # N.B. I have noticed that sky/ground diffuse provides negative values sometimes. This can be due to
    # inconsistencies in the transposition models and should have been screened in pvlib, but it is not.
    # I am simply clipping it here.
    poa["sky_diffuse"] = poa["sky_diffuse"].clip(lower=0)
    poa["ground_diffuse"] = poa["ground_diffuse"].clip(lower=0)
    poa["diffuse"] = poa["sky_diffuse"] + poa["ground_diffuse"]
    poa["direct"] = poa["direct"].clip(lower=0)
    poa["global"] = poa["diffuse"] + poa["direct"]

    sdf = sdf.replace_data(poa)
    sdf.custom_metadata.update({"tracking": tracking_kwargs, "transposition": transposition_kwargs})
    return sdf
yield_ac
yield_ac(
    dc_to_ac_ratio: float = 1.0,
    inverter_effic: float = 0.96,
    yield_dc_kwargs: dict | None = None,
    units: Literal["W", "Wh"] = "W",
    full_output: bool = False,
) -> SolarSeries | SolarDataFrame

Calculate the AC power yield of a PV system.

For simplicity, the pvwatts inverter is assumed (see Notes below). For more precise simulation of specific inverters, consider using the Sandia or ADR inverters included also in pvlib. Unfortunately, they are not supported here.

Parameters:

  • dc_to_ac_ratio
    (float, default: 1.0 ) –

    DC/AC ratio of the PV system, i.e., the ratio between the DC power capacity (the installed power of the PV modules) and the AC power capacity (the nominal power of the inverter). Hence, the size of the DC-to-AC inverter is defined with respect to the DC capacity using this ratio. It is an important design parameter in PV systems, as it determines how much of the DC power can be converted into AC power. A higher DC/AC ratio means that the inverter is "undersized" relative to the DC capacity, which can lead to clipping losses during periods of high irradiance. However, it can also reduce costs, as inverters are typically more expensive than PV modules. The optimal DC/AC ratio depends on various factors, including the cost of the inverter and modules, the expected irradiance levels, and the specific design goals of the PV system.

  • inverter_effic
    (float, default: 0.96 ) –

    nominal efficiency of the inverter (between 0 and 1)

  • yield_dc_kwargs
    (dict | None, default: None ) –

    Additional kwargs to be passed to the yield_dc method. See yield_dc for details.

  • units
    (Literal['W', 'Wh'], default: 'W' ) –

    Units for the output power. Options are "W" for Watts (instantaneous power) and "Wh" for Watt-hours (energy).

  • full_output
    (bool, default: False ) –

    Include the outputs from yield_dc. Default is False.

Returns:

  • SolarSeries or SolarDataFrame

    a new SolarDataFrame with the same index as the original but with the following columns: - pac: AC power output of the PV system in Watts - the output columns of yield_dc if full_output=True.

Notes

The AC power output is calculated by applying the pvlib's pvwatts inverter model to the DC power output obtained from the yield_dc method. Besides the input DC power, the pvwatts inverter requires the DC power at which the inverter reaches its AC power limit and the nominal efficiency of the inverter. The former is calculated based on the installed DC capacity and the specified DC/AC ratio. It represents an upper threshold for the DC power input to the inverter. If the DC power input exceeds this threshold, the inverter will "clip" the output power to its maximum AC power capacity.

Typically, however, what is known at design time is the nominal DC power capacity of the modules and the DC/AC ratio is used to determine the nominal AC power capacity of the inverter (i.e., the maximum AC power output it can deliver). For example, for a DC power capacity of 1000 Wp and a DC/AC ratio of 1.2, the nominal AC power capacity of the inverter would be 833.3 W. These 833.3 W represent the maximum AC power output the inverter can deliver, that is, greater values are "clipped". To achieve this behavior with the pvwatts inverter, the nomial AC power of the inverter still has to be divided by the inverter efficiency. This is the approach used here.

Why is dc_to_ac_ratio typically greater than one (i.e., the inverter is undersized relative to the DC capacity)? The main reason is cost. Inverters are typically more expensive than PV modules, and oversizing the inverter to match the DC capacity would increase the overall system cost. By undersizing the inverter, the system can be more cost-effective, even though it may result in some clipping losses (see clipping_losses) during periods of high irradiance. The optimal DC/AC ratio (see optimal_dc_ac_ratio) depends on various factors, including the cost of the inverter and modules, the expected irradiance levels, and the specific design goals of the PV system. Values of dc_to_ac_ratio between 1.2 and 1.4 are common in commercial PV systems, as they provide a good balance between cost and performance.

Source code in src/solarpandas/accessors/pvirrad.py
def yield_ac(
    self,
    dc_to_ac_ratio: float = 1.0,
    inverter_effic: float = 0.96,
    yield_dc_kwargs: dict | None = None,
    units: Literal["W", "Wh"] = "W",
    full_output: bool = False
) -> SolarSeries | SolarDataFrame:

    """Calculate the AC power yield of a PV system.

    For simplicity, the `pvwatts` inverter is assumed (see Notes below). For more precise simulation of
    specific inverters, consider using the Sandia or ADR inverters included also in `pvlib`. Unfortunately,
    they are not supported here.

    Parameters
    ----------
    dc_to_ac_ratio: float
        DC/AC ratio of the PV system, i.e., the ratio between the DC power capacity (the installed power
        of the PV modules) and the AC power capacity (the nominal power of the inverter). Hence, the size
        of the DC-to-AC inverter is defined with respect to the DC capacity using this ratio. It is an
        important design parameter in PV systems, as it determines how much of the DC power can be
        converted into AC power. A higher DC/AC ratio means that the inverter is "undersized" relative
        to the DC capacity, which can lead to clipping losses during periods of high irradiance. However,
        it can also reduce costs, as inverters are typically more expensive than PV modules. The optimal
        DC/AC ratio depends on various factors, including the cost of the inverter and modules, the expected
        irradiance levels, and the specific design goals of the PV system.
    inverter_effic: float
        nominal efficiency of the inverter (between 0 and 1)
    yield_dc_kwargs: dict | None
        Additional kwargs to be passed to the `yield_dc` method. See `yield_dc` for details.
    units: str
        Units for the output power. Options are "W" for Watts (instantaneous power) and "Wh" for Watt-hours (energy).
    full_output: bool
        Include the outputs from `yield_dc`. Default is False.

    Returns
    -------
    SolarSeries or SolarDataFrame
        a new SolarDataFrame with the same index as the original but with the following columns:
        - `pac`: AC power output of the PV system in Watts
        - the output columns of `yield_dc` if `full_output=True`.

    Notes
    -----
    The AC power output is calculated by applying the `pvlib`'s `pvwatts` inverter model to the DC power output
    obtained from the `yield_dc` method. Besides the input DC power, the `pvwatts` inverter requires the DC power
    at which the inverter reaches its AC power limit and the nominal efficiency of the inverter. The former is
    calculated based on the installed DC capacity and the specified DC/AC ratio. It represents an upper threshold
    for the DC power input to the inverter. If the DC power input exceeds this threshold, the inverter will "clip"
    the output power to its maximum AC power capacity.

    Typically, however, what is known at design time is the nominal DC power capacity of the modules and the DC/AC
    ratio is used to determine the nominal AC power capacity of the inverter (i.e., the maximum AC power output it
    can deliver). For example, for a DC power capacity of 1000 Wp and a DC/AC ratio of 1.2, the nominal AC power
    capacity of the inverter would be 833.3 W. These 833.3 W represent the maximum AC power output the inverter can
    deliver, that is, greater values are "clipped". To achieve this behavior with the `pvwatts` inverter, the nomial
    AC power of the inverter still has to be divided by the inverter efficiency. This is the approach used here.

    Why is `dc_to_ac_ratio` typically greater than one (i.e., the inverter is undersized relative to the DC capacity)?
    The main reason is cost. Inverters are typically more expensive than PV modules, and oversizing the inverter to
    match the DC capacity would increase the overall system cost. By undersizing the inverter, the system can be more
    cost-effective, even though it may result in some clipping losses (see `clipping_losses`) during periods of high
    irradiance. The optimal DC/AC ratio (see `optimal_dc_ac_ratio`) depends on various factors, including the cost of
    the inverter and modules, the expected irradiance levels, and the specific design goals of the PV system. Values
    of dc_to_ac_ratio between 1.2 and 1.4 are common in commercial PV systems, as they provide a good balance between
    cost and performance.
    """

    yield_dc_kwargs = yield_dc_kwargs or {}
    yield_dc_kwargs.update({"units": "W"})
    pdc = self.yield_dc(**yield_dc_kwargs)  # in Watts
    p_dc_peak = float(pdc.custom_metadata["pvsystem"]["p_dc_peak"])

    if isinstance(pdc, pd.Series):
        pdc = pdc.to_frame()

    p_ac_peak = p_dc_peak / dc_to_ac_ratio  # inverter power limit, in Watts-peak
    pdc0 = p_ac_peak / inverter_effic  # input DC power at which the inverter outputs its maximum AC power, in Watts
    pac = pvlib.inverter.pvwatts(  # in Watts
        pdc=pdc["pdc"],  # DC power input to the inverter, W
        pdc0=pdc0,
        eta_inv_nom=inverter_effic)

    if units == "Wh":
        time_step_seconds = infer_time_step(self._sdf).total_seconds()
        time_step_hours = time_step_seconds / 3600.
        pdc["pdc"] = pdc["pdc"].mul(time_step_hours)
        pac = pac.mul(time_step_hours)

    pac = pdc.replace_data(pac).iloc[:, 0].rename("pac")
    pvsystem = pac.custom_metadata["pvsystem"]
    pvsystem = pvsystem | {
        "inverter": "pvwatts",
        "dc_to_ac_ratio": dc_to_ac_ratio,
        "inverter_effic": inverter_effic,
        "pac_units": units}
    pac.custom_metadata.update({"pvsystem": pvsystem})
    if full_output:
        return pac.to_frame().join(pdc)
    return pac
yield_dc
yield_dc(
    p_dc_peak: float = 1.0,
    dc_model: Literal["huld", "pvwatts"] = "huld",
    dc_model_kwargs: dict | None = None,
    temperature: Series | str | float | None = None,
    wind_speed: Series | str | float | None = None,
    poa_irradiance_kwargs: dict | None = None,
    units: Literal["W", "Wh"] = "W",
    full_output: bool = False,
) -> SolarSeries | SolarDataFrame

Calculate the DC power yield of a PV system.

Parameters:

  • p_dc_peak
    (float, default: 1.0 ) –

    Installed peak capacity in Watts-peak. Default is 1 Wp.

  • dc_model
    (str, default: 'huld' ) –

    Model to convert plane-of-array (POA) irradiance and cell temperature into DC power. Options are: - "pvwatts": the PVWatts model, which is a simple empirical model based on the performance of a large number of PV systems. It is widely used for its simplicity and reasonable accuracy for many applications. It estimates the DC power output based on the plane-of-array irradiance, the cell temperature, and the installed DC capacity, using a simple linear model with a temperature coefficient. - "huld": the Huld model, which is a more detailed empirical model that accounts for the non-linear effects of irradiance and temperature on the DC power output. It is based on the performance of a large number of PV systems and provides a more accurate estimation of the DC power output, especially under low irradiance and high temperature conditions. It requires more parameters than the PVWatts model, such as the cell type and the version of the model to use (e.g., "pvgis6" for the latest version). The Huld model is particularly useful for applications that require a more accurate estimation of the DC power output, such as performance ratio calculations or detailed energy yield assessments. The Faiman's cell temperature model is used. See pvlib.

  • dc_model_kwargs
    (dict, default: None ) –

    Additional kwargs to be passed to the "dc_model". See pvlib.pvsystem.pvwatts_dc and pvlib.pvarray.huld for details. For dc_model="huld", default values cell_type="cSi" and k_version="pvgis6" are used.

  • temperature
    (pd.Series, str, float or None. Default is None., default: None ) –

    Air temperature in deg C. If it is str, it should be a column name in the SolarDataFrame. If it is None, a default temperature value of 25 deg C is used.

  • wind_speed
    (pd.Series, str, float or None. Default is None., default: None ) –

    Wind speed in m s-1. If it is str, it should be a column name in the SolarDataFrame. If it is None, a default wind speed value of 1 m s-1 is used.

  • poa_irradiance_kwargs
    (dict | None. Default is None., default: None ) –

    Additional kwargs to be passed to the poa_irradiance method. See poa_irradiance for details.

  • units
    (str, default: 'W' ) –

    Units for the output power. Options are "W" for Watts (instantaneous power) and "Wh" for Watt-hours (energy). If "Wh" is selected, the output power is multiplied by the time step inferred from the index of the SolarDataFrame, so it represents the energy produced during each time step. Default is "W".

  • full_output
    (bool, default: False ) –

    Include the outputs from poa_irradiance. Default is False.

Returns:

  • SolarSeries or SolarDataFrame

    a new SolarDataFrame with the same index as the original but with the following columns: - pdc: DC power output of the PV system in Watts - the output columns of poa_irradiance if full_output=True.

Source code in src/solarpandas/accessors/pvirrad.py
def yield_dc(
    self,
    # options for PV modelling
    p_dc_peak: float = 1.,  # power capacity in Wp
    dc_model: Literal["huld", "pvwatts"] = "huld",
    dc_model_kwargs: dict | None = None,
    # additional atmospheric data
    temperature: pd.Series | str | float | None = None,
    wind_speed: pd.Series | str | float | None = None,  # m s-1
    # plane-of-array (POA) irradiance calculation options
    poa_irradiance_kwargs: dict | None = None,
    units: Literal["W", "Wh"] = "W",
    full_output: bool = False
) -> SolarSeries | SolarDataFrame:

    """Calculate the DC power yield of a PV system.

    Parameters
    ----------
    p_dc_peak : float
        Installed peak capacity in Watts-peak. Default is 1 Wp.
    dc_model : str
        Model to convert plane-of-array (POA) irradiance and cell temperature into DC power. Options are:
        - "pvwatts": the PVWatts model, which is a simple empirical model based on the performance of a
            large number of PV systems. It is widely used for its simplicity and reasonable accuracy for many
            applications. It estimates the DC power output based on the plane-of-array irradiance, the cell
            temperature, and the installed DC capacity, using a simple linear model with a temperature
            coefficient.
        - "huld": the Huld model, which is a more detailed empirical model that accounts for the non-linear
            effects of irradiance and temperature on the DC power output. It is based on the performance of a
            large number of PV systems and provides a more accurate estimation of the DC power output, especially
            under low irradiance and high temperature conditions. It requires more parameters than the PVWatts
            model, such as the cell type and the version of the model to use (e.g., "pvgis6" for the latest
            version). The Huld model is particularly useful for applications that require a more accurate
            estimation of the DC power output, such as performance ratio calculations or detailed energy yield
            assessments.
        The Faiman's cell temperature model is used. See `pvlib`.
    dc_model_kwargs : dict
        Additional kwargs to be passed to the "dc_model". See `pvlib.pvsystem.pvwatts_dc` and `pvlib.pvarray.huld`
        for details. For `dc_model="huld"`, default values `cell_type="cSi"` and `k_version="pvgis6"` are used.
    temperature : pd.Series, str, float or None. Default is None.
        Air temperature in deg C. If it is str, it should be a column name in the SolarDataFrame. If it is None,
        a default temperature value of 25 deg C is used.
    wind_speed : pd.Series, str, float or None. Default is None.
        Wind speed in m s-1. If it is str, it should be a column name in the SolarDataFrame. If it is None, a
        default wind speed value of 1 m s-1 is used.
    poa_irradiance_kwargs : dict | None. Default is None.
        Additional kwargs to be passed to the `poa_irradiance` method. See `poa_irradiance` for details.
    units : str
        Units for the output power. Options are "W" for Watts (instantaneous power) and "Wh" for Watt-hours (energy).
        If "Wh" is selected, the output power is multiplied by the time step inferred from the index of the
        SolarDataFrame, so it represents the energy produced during each time step. Default is "W".
    full_output : bool
        Include the outputs from `poa_irradiance`. Default is False.

    Returns
    -------
    SolarSeries or SolarDataFrame
        a new SolarDataFrame with the same index as the original but with the following columns:
        - `pdc`: DC power output of the PV system in Watts
        - the output columns of `poa_irradiance` if `full_output=True`.
    """

    DEFAULT_TEMPERATURE = 25.  # deg C
    DEFAULT_WIND_SPEED = 1.  # m s-1

    if temperature is None:
        temperature = DEFAULT_TEMPERATURE

    if isinstance(temperature, str):
        if temperature in self._sdf.columns:
            temperature = self._sdf[temperature]
        else:
            temperature = DEFAULT_TEMPERATURE
            logger.warning (f"temperature column '{temperature}' not found in data. "
                            f"Set to the default value {DEFAULT_TEMPERATURE} deg C.")

    if wind_speed is None:
        wind_speed = DEFAULT_WIND_SPEED

    if isinstance(wind_speed, str):
        if wind_speed in self._sdf.columns:
            wind_speed = self._sdf[wind_speed]
        else:
            wind_speed = DEFAULT_WIND_SPEED
            logger.warning(f"wind_speed column '{wind_speed}' not found in data. "
                           f"Set to the default value {DEFAULT_WIND_SPEED} m s-1.")

    poa_irradiance_kwargs = poa_irradiance_kwargs or {}
    poa = self.poa_irradiance(**poa_irradiance_kwargs)
    poa = poa.assign(temperature=temperature, wind_speed=wind_speed)

    # to detect potential RuntimeWarning: invalid value encountered in log
    if (illegal := poa["global"].le(-1e-6) & self._sdf.solpos.zenith.lt(90.)).any():
        logger.warning(f"Illegal data found.\ndata:\n{self._sdf.loc[illegal]}\npoa:\n{poa.loc[illegal]}")

    temp_cell = pvlib.temperature.faiman(
        poa_global=poa["global"],  # total incident irradiance, W m-2
        temp_air=poa["temperature"],  # ambient dry bulb temperature, degC
        wind_speed=poa["wind_speed"])  # wind speed, m s-1

    if dc_model == "pvwatts":
        pdc = pvlib.pvsystem.pvwatts_dc(
            effective_irradiance=poa["global"],
            temp_cell=temp_cell,
            pdc0=p_dc_peak,  # power of the modules at STC (i.e., 1000 W m-2 and 25 degC), W
            **(dc_model_kwargs or {}),
        ).clip(lower=0.)

    if dc_model == "huld":
        dc_model_kwargs = dc_model_kwargs or {}
        dc_model_kwargs.setdefault("cell_type", "cSi")
        dc_model_kwargs.setdefault("k_version", "pvgis6")
        pdc = pvlib.pvarray.huld(
            effective_irradiance=poa["global"],  # irradiance to be converted, W m-2
            temp_mod=temp_cell,  # module back-surface temperature, degC
            pdc0=p_dc_peak,  # power of the modules at STC (i.e., 1000 W m-2 and 25 degC), W
            **dc_model_kwargs
        ).clip(lower=0)

    if units == "Wh":
        time_step_seconds = infer_time_step(self._sdf).total_seconds()
        time_step_hours = time_step_seconds / 3600.
        pdc = pdc.mul(time_step_hours).rename("pdc")

    pdc = poa.replace_data(pdc).iloc[:, 0].rename("pdc")
    pvsystem = {
        "dc_model": dc_model,
        "dc_model_kwargs": dc_model_kwargs,
        "p_dc_peak": p_dc_peak,
        "pdc_units": units,
        "poa_irradiance_kwargs": poa_irradiance_kwargs}
    pdc.custom_metadata.update({"pvsystem": pvsystem})
    if full_output:
        return pdc.to_frame().join(poa)
    return pdc

Quality Control

helpers

Shared helpers and data structures for quality-control modules.

Classes:

  • QCTest

    Callable container that binds a QC test function and optional plotter.

Functions:

QCTest dataclass

QCTest(
    name: str,
    _test_func: Callable[[SolarDataFrame], ndarray[int8]],
    _plot_func: Callable[
        [SolarDataFrame, ndarray[int8]], Axes
    ]
    | None = None,
)

Callable container that binds a QC test function and optional plotter.

Methods:

  • __call__

    Run the test and return QC flags as a SolarSeries.

  • plot

    Plot test diagnostics if a plotting function is available.

__call__
__call__(sdf: SolarDataFrame) -> SolarSeries

Run the test and return QC flags as a SolarSeries.

Source code in src/solarpandas/qcontrol/helpers.py
def __call__(self, sdf: SolarDataFrame) -> SolarSeries:
    """Run the test and return QC flags as a ``SolarSeries``."""
    return construct_flag_series(sdf, self.name, self._test_func(sdf))
plot
plot(sdf: SolarDataFrame) -> Axes | None

Plot test diagnostics if a plotting function is available.

Source code in src/solarpandas/qcontrol/helpers.py
def plot(self, sdf: SolarDataFrame) -> plt.Axes | None:
    """Plot test diagnostics if a plotting function is available."""
    if self._plot_func is None:
        logger.warning(f"No plot function defined for test '{self.name}'")
        return None
    test_result = self._test_func(sdf)
    return self._plot_func(sdf, test_result)

construct_flag_series

construct_flag_series(
    sdf: SolarDataFrame | SolarSeries,
    name: str,
    test_result: ndarray,
) -> SolarSeries

Wrap a QC result array as a SolarSeries with metadata preserved.

Source code in src/solarpandas/qcontrol/helpers.py
def construct_flag_series(
    sdf: SolarDataFrame | SolarSeries, name: str, test_result: np.ndarray
) -> SolarSeries:
    """Wrap a QC result array as a ``SolarSeries`` with metadata preserved."""
    return SolarSeries(
        data=test_result,
        index=sdf.index,
        latitude=sdf.latitude,
        longitude=sdf.longitude,
        custom_metadata=copy.deepcopy(sdf.custom_metadata),
        name=name,
        dtype="qcflag",
    )

construct_qcflag_array

construct_qcflag_array(
    failed: Series, passed: Series
) -> ndarray[int8]

Build QC flag values from boolean failed and passed masks.

Parameters:

  • failed
    (Series) –

    Boolean series; True where the test fails.

  • passed
    (Series) –

    Boolean series; True where the test passes.

Returns:

  • ndarray

    Int8 array with values -1 (failed), 0 (not verifiable) and 1 (passed).

Source code in src/solarpandas/qcontrol/helpers.py
def construct_qcflag_array(failed: pd.Series, passed: pd.Series) -> np.ndarray[np.int8]:
    """Build QC flag values from boolean failed and passed masks.

    Parameters
    ----------
    failed : pandas.Series
        Boolean series; ``True`` where the test fails.
    passed : pandas.Series
        Boolean series; ``True`` where the test passes.

    Returns
    -------
    numpy.ndarray
        Int8 array with values ``-1`` (failed), ``0`` (not verifiable) and
        ``1`` (passed).
    """
    n = len(failed)
    flag_array = np.full(n, QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)
    flag_array[failed] = QCFlagEnum.FAILED.value
    flag_array[passed] = QCFlagEnum.PASSED.value
    return flag_array

qcrad

Registry of qcrad quality-control tests as reusable QCTest objects.

ppl

Physically possible limits (PPL) quality-control checks.

This module implements qcrad physically possible limit tests and plotting helpers for global, diffuse and direct irradiance components.

Functions:

  • plot_test

    Render a standard PPL diagnostic density plot for one irradiance column.

  • plot_test_dif

    Plot DIF PPL limits and flagged points against solar zenith angle.

  • plot_test_dni

    Plot DNI PPL limits and flagged points against solar zenith angle.

  • plot_test_ghi

    Plot GHI PPL limits and flagged points against solar zenith angle.

  • test_dif

    Evaluate physically-possible limits test for DIF.

  • test_dni

    Evaluate physically-possible limits test for DNI.

  • test_ghi

    Evaluate physically-possible limits test for GHI.

plot_test

plot_test(
    column: str, sdf: SolarDataFrame, **kwargs
) -> Axes

Render a standard PPL diagnostic density plot for one irradiance column.

Source code in src/solarpandas/qcontrol/ppl.py
def plot_test(column: str, sdf: SolarDataFrame, **kwargs) -> plt.Axes:
    """Render a standard PPL diagnostic density plot for one irradiance column."""

    plt.style.use("solarpandas-qc")
    if "rc" in kwargs:
        mpl.rcParams.update(kwargs["rc"])

    ax = kwargs.pop("ax", None)
    if ax is None:
        _, ax = plt.subplots(1, 1, figsize=(12, 8), layout="constrained")
    ax_box = ax.get_window_extent()

    title = f"{column.upper()} PPL Test Results"
    if "location" in sdf.custom_metadata:
        title += f" at {sdf.custom_metadata['location']}"
    if "station" in sdf.custom_metadata:
        title += f" ({sdf.custom_metadata['station']}"
        if "network" in sdf.custom_metadata:
            title += f", {sdf.custom_metadata['network']}"
        title += ")"
    else:
        if "network" in sdf.custom_metadata:
            title += f" ({sdf.custom_metadata['network']})"
    title += f" (lat={sdf.latitude:.4f}, lon={sdf.longitude:.4f}, alt={sdf.elevation:.0f} m)"

    cvs = ds.Canvas(plot_width=int(ax_box.width), plot_height=int(ax_box.height),
                    x_range=(sdf.solpos.zenith.min(), sdf.solpos.zenith.max()),
                    y_range=(-10, sdf[column].max()))

    plt.scatter("zenith", "max_value", data=sdf, label="Max Limit", color=MAX_VALUE_COLOR, s=2)
    plt.scatter("zenith", "min_value", data=sdf, label="Min Limit", color=MIN_VALUE_COLOR, s=2)
    agg = cvs.points(sdf, "zenith", column, ds.count()).pipe(lambda xa: xa.where(xa > 0))
    mesh = ax.pcolormesh(agg.zenith, agg[column], agg.values, cmap=DENSITY_CMAP, norm=plt.cm.colors.LogNorm())
    plt.colorbar(mesh, ax=ax, pad=0.02, label=f"{column.upper()} Counts Density (log scale)")
    plt.scatter("zenith", column, data=sdf.loc[sdf.test.flag.fails], label="Failed Points",
                color=FAILED_COLOR, s=5, zorder=1003)
    plt.xlabel("Solar Zenith Angle (deg)")
    plt.ylabel(f"{column.upper()} (W m$^{{-2}}$)")
    plt.title(title)
    plt.xlim(right=95)
    plt.legend()
    plt.grid()

    return plt.gca()

plot_test_dif

plot_test_dif(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot DIF PPL limits and flagged points against solar zenith angle.

Source code in src/solarpandas/qcontrol/ppl.py
def plot_test_dif(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot DIF PPL limits and flagged points against solar zenith angle."""

    sdf_ = sdf.assign(
        zenith=sdf.solpos.zenith,
        min_value=-4.0,
        max_value=50 + 0.95 * sdf.solpos.etn * (sdf.solpos.cosz**1.2),
        test=test)

    return plot_test(column="dif", sdf=sdf_, **kwargs)

plot_test_dni

plot_test_dni(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot DNI PPL limits and flagged points against solar zenith angle.

Source code in src/solarpandas/qcontrol/ppl.py
def plot_test_dni(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot DNI PPL limits and flagged points against solar zenith angle."""

    sdf_ = sdf.assign(
        zenith=sdf.solpos.zenith,
        min_value=-4.0,
        max_value=sdf.solpos.etn,
        test=test)

    kwargs.setdefault("rc", {"legend.loc": "lower left"})
    return plot_test(column="dni", sdf=sdf_, **kwargs)

plot_test_ghi

plot_test_ghi(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot GHI PPL limits and flagged points against solar zenith angle.

Source code in src/solarpandas/qcontrol/ppl.py
def plot_test_ghi(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot GHI PPL limits and flagged points against solar zenith angle."""

    sdf_ = sdf.assign(
        zenith=sdf.solpos.zenith,
        min_value=-4.0,
        max_value=100 + 1.50 * sdf.solpos.etn * (sdf.solpos.cosz**1.2),
        test=test)

    return plot_test(column="ghi", sdf=sdf_, **kwargs)

test_dif

test_dif(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate physically-possible limits test for DIF.

Source code in src/solarpandas/qcontrol/ppl.py
def test_dif(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate physically-possible limits test for DIF."""

    # check that I have what I need
    if "dif" not in sdf.columns:
        logger.warning("`dif` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    dif = sdf["dif"]
    min_value = -4.0  # W m-2
    max_value = 50 + 0.95 * sdf.solpos.etn * (sdf.solpos.cosz**1.2)

    # compute where the test fails and where it passes
    notna = dif.notna()
    failed = notna & (dif.lt(min_value) | dif.gt(max_value))
    passed = notna & (dif.ge(min_value) & dif.le(max_value))

    return construct_qcflag_array(failed, passed)

test_dni

test_dni(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate physically-possible limits test for DNI.

Source code in src/solarpandas/qcontrol/ppl.py
def test_dni(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate physically-possible limits test for DNI."""

    # check that I have what I need
    if "dni" not in sdf.columns:
        logger.warning("`dni` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    dni = sdf["dni"]
    min_value = -4.0  # W m-2
    max_value = sdf.solpos.etn

    # compute where the test fails and where it passes
    notna = dni.notna()
    failed = notna & (dni.lt(min_value) | dni.gt(max_value))
    passed = notna & (dni.ge(min_value) & dni.le(max_value))

    return construct_qcflag_array(failed, passed)

test_ghi

test_ghi(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate physically-possible limits test for GHI.

Source code in src/solarpandas/qcontrol/ppl.py
def test_ghi(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate physically-possible limits test for GHI."""

    # check that I have what I need
    if "ghi" not in sdf.columns:
        logger.warning("`ghi` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    etn = sdf.solpos.etn
    cosz = sdf.solpos.cosz
    min_value = -4.0  # W m-2, to allow for measurement noise when the sun is just below the horizon
    max_value = 100 + 1.50 * etn * (cosz**1.2)  # W m-2, empirical upper limit

    # compute where the test fails and where it passes
    notna = ghi.notna()
    failed = notna & (ghi.lt(min_value) | ghi.gt(max_value))
    passed = notna & (ghi.ge(min_value) & ghi.le(max_value))

    return construct_qcflag_array(failed, passed)

erl

Extremely rare limits (ERL) quality-control checks.

This module implements qcrad extremely rare limit tests and plotting helpers for global, diffuse and direct irradiance components.

Functions:

  • plot_test

    Render a standard ERL diagnostic density plot for one irradiance column.

  • plot_test_dif

    Plot DIF ERL limits and flagged points against solar zenith angle.

  • plot_test_dni

    Plot DNI ERL limits and flagged points against solar zenith angle.

  • plot_test_ghi

    Plot GHI ERL limits and flagged points against solar zenith angle.

  • test_dif

    Evaluate extremely-rare-limits test for DIF.

  • test_dni

    Evaluate extremely-rare-limits test for DNI.

  • test_ghi

    Evaluate extremely-rare-limits test for GHI.

plot_test

plot_test(
    column: str, sdf: SolarDataFrame, **kwargs
) -> Axes

Render a standard ERL diagnostic density plot for one irradiance column.

Source code in src/solarpandas/qcontrol/erl.py
def plot_test(column: str, sdf: SolarDataFrame, **kwargs) -> plt.Axes:
    """Render a standard ERL diagnostic density plot for one irradiance column."""

    plt.style.use("solarpandas-qc")
    if "rc" in kwargs:
        mpl.rcParams.update(kwargs["rc"])

    ax = kwargs.pop("ax", None)
    if ax is None:
        _, ax = plt.subplots(1, 1, figsize=(12, 8), layout="constrained")
    ax_box = ax.get_window_extent()

    title = f"{column.upper()} PPL Test Results"
    if "location" in sdf.custom_metadata:
        title += f" at {sdf.custom_metadata['location']}"
    if "station" in sdf.custom_metadata:
        title += f" ({sdf.custom_metadata['station']}"
        if "network" in sdf.custom_metadata:
            title += f", {sdf.custom_metadata['network']}"
        title += ")"
    else:
        if "network" in sdf.custom_metadata:
            title += f" ({sdf.custom_metadata['network']})"
    title += f" (lat={sdf.latitude:.4f}, lon={sdf.longitude:.4f}, alt={sdf.elevation:.0f} m)"

    cvs = ds.Canvas(plot_width=int(ax_box.width), plot_height=int(ax_box.height),
                    x_range=(sdf.solpos.zenith.min(), sdf.solpos.zenith.max()),
                    y_range=(-10, sdf[column].max()))

    plt.scatter("zenith", "max_value", data=sdf, label="Max Limit", color=MAX_VALUE_COLOR, s=2)
    plt.scatter("zenith", "min_value", data=sdf, label="Min Limit", color=MIN_VALUE_COLOR, s=2)
    agg = cvs.points(sdf, "zenith", column, ds.count()).pipe(lambda xa: xa.where(xa > 0))
    mesh = ax.pcolormesh(agg.zenith, agg[column], agg.values, cmap=DENSITY_CMAP, norm=plt.cm.colors.LogNorm())
    plt.colorbar(mesh, ax=ax, pad=0.02, label=f"{column.upper()} Counts Density (log scale)")
    plt.scatter("zenith", column, data=sdf.loc[sdf.test.flag.fails], label="Failed Points",
                color=FAILED_COLOR, s=5, zorder=1003)
    plt.xlabel("Solar Zenith Angle (deg)")
    plt.ylabel(f"{column.upper()} (W m$^{{-2}}$)")
    plt.title(title)
    plt.xlim(right=95)
    plt.legend()
    plt.grid()

    return plt.gca()

plot_test_dif

plot_test_dif(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot DIF ERL limits and flagged points against solar zenith angle.

Source code in src/solarpandas/qcontrol/erl.py
def plot_test_dif(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot DIF ERL limits and flagged points against solar zenith angle."""

    sdf_ = sdf.assign(
        zenith=sdf.solpos.zenith,
        min_value=-2.0,
        max_value=30 + 0.75 * sdf.solpos.etn * (sdf.solpos.cosz**1.2),
        test=test)

    return plot_test(column="dif", sdf=sdf_, **kwargs)

plot_test_dni

plot_test_dni(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot DNI ERL limits and flagged points against solar zenith angle.

Source code in src/solarpandas/qcontrol/erl.py
def plot_test_dni(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot DNI ERL limits and flagged points against solar zenith angle."""

    sdf_ = sdf.assign(
        zenith=sdf.solpos.zenith,
        min_value=-2.0,
        max_value=10 + 0.95 * sdf.solpos.etn * (sdf.solpos.cosz**0.2),
        test=test)

    kwargs.setdefault("rc", {"legend.loc": "lower left"})
    return plot_test(column="dni", sdf=sdf_, **kwargs)

plot_test_ghi

plot_test_ghi(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot GHI ERL limits and flagged points against solar zenith angle.

Source code in src/solarpandas/qcontrol/erl.py
def plot_test_ghi(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot GHI ERL limits and flagged points against solar zenith angle."""

    sdf_ = sdf.assign(
        zenith=sdf.solpos.zenith,
        min_value=-2.0,
        max_value=50 + 1.20 * sdf.solpos.etn * (sdf.solpos.cosz**1.2),
        test=test)

    return plot_test(column="ghi", sdf=sdf_, **kwargs)

test_dif

test_dif(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate extremely-rare-limits test for DIF.

Source code in src/solarpandas/qcontrol/erl.py
def test_dif(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate extremely-rare-limits test for DIF."""

    # check that I have what I need
    if "dif" not in sdf.columns:
        logger.warning("`dif` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    dif = sdf["dif"]
    min_value = -2.0  # W m-2
    max_value = 30 + 0.75 * sdf.solpos.etn * (sdf.solpos.cosz**1.2)

    # compute where the test fails and where it passes
    notna = dif.notna()
    failed = notna & (dif.lt(min_value) | dif.gt(max_value))
    passed = notna & (dif.ge(min_value) & dif.le(max_value))

    return construct_qcflag_array(failed, passed)

test_dni

test_dni(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate extremely-rare-limits test for DNI.

Source code in src/solarpandas/qcontrol/erl.py
def test_dni(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate extremely-rare-limits test for DNI."""

    # check that I have what I need
    if "dni" not in sdf.columns:
        logger.warning("`dni` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    dni = sdf["dni"]
    min_value = -2.0  # W m-2
    max_value = 10 + 0.95 * sdf.solpos.etn * (sdf.solpos.cosz**0.2)

    # compute where the test fails and where it passes
    notna = dni.notna()
    failed = notna & (dni.lt(min_value) | dni.gt(max_value))
    passed = notna & (dni.ge(min_value) & dni.le(max_value))

    return construct_qcflag_array(failed, passed)

test_ghi

test_ghi(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate extremely-rare-limits test for GHI.

Source code in src/solarpandas/qcontrol/erl.py
def test_ghi(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate extremely-rare-limits test for GHI."""

    # check that I have what I need
    if "ghi" not in sdf.columns:
        logger.warning("`ghi` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    min_value = -2.0  # W m-2, to allow for measurement noise when the sun is just below the horizon
    max_value = 50 + 1.20 * sdf.solpos.etn * (sdf.solpos.cosz**1.2)  # W m-2, empirical upper limit

    # compute where the test fails and where it passes
    notna = ghi.notna()
    failed = notna & (ghi.lt(min_value) | ghi.gt(max_value))
    passed = notna & (ghi.ge(min_value) & ghi.le(max_value))

    return construct_qcflag_array(failed, passed)

Kspace

Quality-control tests based on K-space consistency relationships.

Functions:

  • plot_test

    Render a generic K-space QC diagnostic plot with density and flags.

  • plot_test_KT_erl

    Plot KT against zenith with ERL threshold and flagged points.

  • plot_test_K_erl

    Plot K against zenith with ERL threshold and flagged points.

  • plot_test_K_erl_clear

    Plot K versus KT for ERL clear-sky consistency diagnostics.

  • plot_test_Kn_erl

    Plot Kn versus KT for extremely-rare-limit diagnostics.

  • plot_test_Kn_ppl

    Plot Kn versus KT for physically-possible-limit diagnostics.

  • test_KT_erl

    Evaluate the extremely rare limits test for KT.

  • test_K_erl

    Evaluate the extremely rare limits test for K.

  • test_K_erl_clear

    Evaluate K clear-sky consistency test under ERL conditions.

  • test_Kn_erl

    Evaluate the extremely rare limits test for Kn.

  • test_Kn_ppl

    Evaluate the physically possible limits test for Kn.

plot_test

plot_test(
    x: str, y: str, sdf: SolarDataFrame, **kwargs
) -> Axes

Render a generic K-space QC diagnostic plot with density and flags.

Source code in src/solarpandas/qcontrol/Kspace.py
def plot_test(x: str, y: str, sdf: SolarDataFrame, **kwargs) -> plt.Axes:
    """Render a generic K-space QC diagnostic plot with density and flags."""

    plt.style.use("solarpandas-qc")
    if "rc" in kwargs:
        mpl.rcParams.update(kwargs["rc"])

    ax = kwargs.pop("ax", None)
    if ax is None:
        _, ax = plt.subplots(1, 1, figsize=(12, 8), layout="constrained")
    ax_box = ax.get_window_extent()

    title = f"{y} PPL Test Results"
    if "location" in sdf.custom_metadata:
        title += f" at {sdf.custom_metadata['location']}"
    if "station" in sdf.custom_metadata:
        title += f" ({sdf.custom_metadata['station']}"
        if "network" in sdf.custom_metadata:
            title += f", {sdf.custom_metadata['network']}"
        title += ")"
    else:
        if "network" in sdf.custom_metadata:
            title += f" ({sdf.custom_metadata['network']})"
    title += f" (lat={sdf.latitude:.4f}, lon={sdf.longitude:.4f}, alt={sdf.elevation:.0f} m)"

    _BOUNDS = {
        "KT": (-0.05, 1.4),
        "Kn": (-0.05, 1.15),
        "K": (-0.05, 1.15),
        "zenith": (0, 90),
    }

    cvs = ds.Canvas(plot_width=int(ax_box.width), plot_height=int(ax_box.height),
                    x_range=_BOUNDS[x], y_range=_BOUNDS[y])

    if "max_value_artist" in kwargs and kwargs["max_value_artist"] == "scatter":
        plt.scatter(x, "max_value", data=sdf.sort_values(x), label="Max. Limit",
                    color=MAX_VALUE_COLOR, s=2, zorder=1000)
    else:
        plt.plot(x, "max_value", data=sdf.sort_values(x), label="Max. Limit",
                 color=MAX_VALUE_COLOR, lw=2, zorder=1000)
    agg = cvs.points(sdf, x, y, ds.count()).pipe(lambda xa: xa.where(xa > 0))
    mesh = ax.pcolormesh(agg[x], agg[y], agg.values, cmap=DENSITY_CMAP,
                         norm=plt.cm.colors.LogNorm(), zorder=1001)
    plt.colorbar(mesh, ax=ax, pad=0.02, label=f"{y} Counts Density (log scale)")
    plt.scatter(x, y, data=sdf.loc[sdf.test.flag.fails], label="Failed Points",
                color=FAILED_COLOR, s=5, zorder=1003)
    plt.scatter(x, y, data=sdf.loc[sdf.not_verifiable], label="Not verifiable",
                color=NOT_VERIFIABLE_COLOR, s=3, zorder=1002)
    plt.xlabel(f"{x} (-)")
    plt.ylabel(f"{y} (-)")
    plt.title(title)
    plt.xlim(_BOUNDS[x])
    plt.ylim(_BOUNDS[y])
    leg = plt.legend()
    leg.set_zorder(1004)
    plt.grid()

    return plt.gca()

plot_test_KT_erl

plot_test_KT_erl(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot KT against zenith with ERL threshold and flagged points.

Source code in src/solarpandas/qcontrol/Kspace.py
def plot_test_KT_erl(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot KT against zenith with ERL threshold and flagged points."""

    KT = sdf.param.KT

    sdf_ = sdf.assign(
        zenith=sdf.solpos.zenith,
        KT=KT,
        max_value=np.full(len(KT), 1.35),
        not_verifiable=sdf["ghi"].le(50.) | KT.le(0.),
        test=test)

    return plot_test(x="zenith", y="KT", sdf=sdf_, rc={"legend.loc": "lower left"})

plot_test_K_erl

plot_test_K_erl(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot K against zenith with ERL threshold and flagged points.

Source code in src/solarpandas/qcontrol/Kspace.py
def plot_test_K_erl(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot K against zenith with ERL threshold and flagged points."""

    K = sdf.param.K

    sdf_ = sdf.assign(
        zenith=sdf.solpos.zenith,
        K=K,
        max_value=np.where(sdf.solpos.zenith.lt(75.), 1.05, 1.10),
        not_verifiable=sdf["ghi"].le(50.) | K.le(0.),
        test=test)

    return plot_test(x="zenith", y="K", sdf=sdf_, rc={"legend.loc": "center left"})

plot_test_K_erl_clear

plot_test_K_erl_clear(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot K versus KT for ERL clear-sky consistency diagnostics.

Source code in src/solarpandas/qcontrol/Kspace.py
def plot_test_K_erl_clear(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot K versus KT for ERL clear-sky consistency diagnostics."""

    K = sdf.param.K
    KT = sdf.param.KT

    sdf_ = sdf.assign(
        K=K,
        KT=KT,
        max_value=np.where(sdf.solpos.zenith.lt(85.) & KT.gt(0.6), 0.96, float("nan")),
        not_verifiable=sdf["ghi"].le(150.) | K.le(0.),
        test=test)

    return plot_test(x="KT", y="K", sdf=sdf_, rc={"legend.loc": "lower left"})

plot_test_Kn_erl

plot_test_Kn_erl(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot Kn versus KT for extremely-rare-limit diagnostics.

Source code in src/solarpandas/qcontrol/Kspace.py
def plot_test_Kn_erl(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot Kn versus KT for extremely-rare-limit diagnostics."""

    KT = sdf.param.KT
    Kn = sdf.param.Kn
    max_value = (1100. + sdf.elevation*0.03) / sdf.solpos.etn
    not_verifiable = sdf["ghi"].le(50.) | Kn.le(0.)

    sdf_ = sdf.assign(
        KT=KT,
        Kn=Kn,
        max_value=max_value,
        not_verifiable=not_verifiable,
        test=test)

    kwargs.setdefault("max_value_artist", "scatter")
    return plot_test(x="KT", y="Kn", sdf=sdf_, **kwargs)

plot_test_Kn_ppl

plot_test_Kn_ppl(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot Kn versus KT for physically-possible-limit diagnostics.

Source code in src/solarpandas/qcontrol/Kspace.py
def plot_test_Kn_ppl(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot Kn versus KT for physically-possible-limit diagnostics."""

    KT = sdf.param.KT
    Kn = sdf.param.Kn

    sdf_ = sdf.assign(
        KT=KT,
        Kn=Kn,
        max_value=KT,
        not_verifiable=sdf["ghi"].le(50.) | Kn.le(0.) | KT.le(0.),
        test=test)

    kwargs.setdefault("rc", {"legend.loc": "upper left"})
    return plot_test(x="KT", y="Kn", sdf=sdf_, **kwargs)

test_KT_erl

test_KT_erl(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate the extremely rare limits test for KT.

Source code in src/solarpandas/qcontrol/Kspace.py
def test_KT_erl(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate the extremely rare limits test for KT."""

    # flagKt in Table 4 of Forstinger et al.

    # check that I have what I need
    if "ghi" not in sdf.columns:
        logger.warning("`ghi` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    KT = sdf.param.KT
    max_value = 1.35

    # compute where the test fails and where it passes
    notna = ghi.notna() & KT.notna()
    verifiable = notna & ghi.gt(50.) & KT.gt(0.)
    failed = verifiable & KT.ge(max_value)
    passed = verifiable & KT.lt(max_value)

    return construct_qcflag_array(failed, passed)

test_K_erl

test_K_erl(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate the extremely rare limits test for K.

Source code in src/solarpandas/qcontrol/Kspace.py
def test_K_erl(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate the extremely rare limits test for K."""

    # flagKlowSZA and flagKhighSZA in Table 4 of Forstinger et al.

    # check that I have what I need
    if "ghi" not in sdf.columns:
        logger.warning("`ghi` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    if "dif" not in sdf.columns:
        logger.warning("`dif` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    dif = sdf["dif"]
    sza = sdf.solpos.zenith
    K = sdf.param.K
    max_value = ghi.replace_data(1.05).where(sza.lt(75.), 1.10)

    # compute where the test fails and where it passes
    notna = ghi.notna() & dif.notna() & K.notna()
    verifiable = notna & ghi.gt(50.) & K.gt(0.)
    failed = verifiable & K.ge(max_value)
    passed = verifiable & K.lt(max_value)

    return construct_qcflag_array(failed, passed)

test_K_erl_clear

test_K_erl_clear(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate K clear-sky consistency test under ERL conditions.

Source code in src/solarpandas/qcontrol/Kspace.py
def test_K_erl_clear(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate K clear-sky consistency test under ERL conditions."""

    # flagKKt in Table 4 of Forstinger et al.

    # check that I have what I need
    if "ghi" not in sdf.columns:
        logger.warning("`ghi` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    if "dif" not in sdf.columns:
        logger.warning("`dif` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    dif = sdf["dif"]
    sza = sdf.solpos.zenith
    K = sdf.param.K
    KT = sdf.param.KT
    max_value = 0.96

    # compute where the test fails and where it passes
    notna = ghi.notna() & dif.notna() & K.notna()
    verifiable = notna & sza.lt(85.) & ghi.gt(150.) & K.gt(0.) & KT.gt(0.6)
    failed = verifiable & K.ge(max_value)
    passed = verifiable & K.lt(max_value)

    return construct_qcflag_array(failed, passed)

test_Kn_erl

test_Kn_erl(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate the extremely rare limits test for Kn.

Source code in src/solarpandas/qcontrol/Kspace.py
def test_Kn_erl(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate the extremely rare limits test for Kn."""

    # flagKn in Table 4 of Forstinger et al.

    # check that I have what I need
    if "dni" not in sdf.columns:
        logger.warning("`dni` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    Kn = sdf.param.Kn
    max_value = (1100. + sdf.elevation*0.03) / sdf.solpos.etn

    # compute where the test fails and where it passes
    notna = ghi.notna() & Kn.notna()
    verifiable = notna & ghi.gt(50.) & Kn.gt(0.)
    failed = verifiable & Kn.ge(max_value)
    passed = verifiable & Kn.lt(max_value)

    return construct_qcflag_array(failed, passed)

test_Kn_ppl

test_Kn_ppl(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate the physically possible limits test for Kn.

Source code in src/solarpandas/qcontrol/Kspace.py
def test_Kn_ppl(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate the physically possible limits test for Kn."""

    # flagKnKt in Table 4 of Forstinger et al.

    # check that I have what I need
    if "ghi" not in sdf.columns:
        logger.warning("`ghi` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    if "dni" not in sdf.columns:
        logger.warning("`dni` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    Kn = sdf.param.Kn
    KT = sdf.param.KT
    max_value = KT

    # compute where the test fails and where it passes
    notna = ghi.notna() & Kn.notna() & KT.notna()
    verifiable = notna & ghi.gt(50.) & Kn.gt(0.) & KT.gt(0.)
    failed = verifiable & Kn.ge(max_value)
    passed = verifiable & Kn.lt(max_value)

    return construct_qcflag_array(failed, passed)

closure

Quality-control checks based on radiative closure consistency.

Functions:

  • plot_test_closure

    Plot closure ratio diagnostics and flagged points versus zenith.

  • test_closure

    Evaluate radiative closure consistency between GHI, DNI and DIF.

plot_test_closure

plot_test_closure(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot closure ratio diagnostics and flagged points versus zenith.

Source code in src/solarpandas/qcontrol/closure.py
def plot_test_closure(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot closure ratio diagnostics and flagged points versus zenith."""

    plt.style.use("solarpandas-qc")
    mpl.rcParams.update({"legend.loc": "lower left"})

    ax = kwargs.pop("ax", None)
    if ax is None:
        _, ax = plt.subplots(1, 1, figsize=(12, 8), layout="constrained")
    ax_box = ax.get_window_extent()

    title = "Closure Test Results"
    if "location" in sdf.custom_metadata:
        title += f" at {sdf.custom_metadata['location']}"
    if "station" in sdf.custom_metadata:
        title += f" ({sdf.custom_metadata['station']}"
        if "network" in sdf.custom_metadata:
            title += f", {sdf.custom_metadata['network']}"
        title += ")"
    else:
        if "network" in sdf.custom_metadata:
            title += f" ({sdf.custom_metadata['network']})"
    title += f" (lat={sdf.latitude:.4f}, lon={sdf.longitude:.4f}, alt={sdf.elevation:.0f} m)"

    cvs = ds.Canvas(plot_width=int(ax_box.width), plot_height=int(ax_box.height),
                    x_range=(0., 90.), y_range=(0.80, 1.20))

    ghi = sdf["ghi"]
    dni = sdf["dni"]
    dif = sdf["dif"]
    ghi_closure = dni * sdf.solpos.cosz + dif
    not_verifiable = ghi.le(50.)

    df = sdf.assign(
        zenith=sdf.solpos.sza,
        closure_ratio=(ghi / ghi_closure).where(sdf.solpos.sza.lt(87.), 1.),
        min_value=np.where(sdf.solpos.sza.le(75.), 0.92, 0.85),
        max_value=np.where(sdf.solpos.sza.le(75.), 1.08, 1.15))

    plt.plot("zenith", "max_value", data=df.sort_values("zenith"), label="Max. Limit", color=MAX_VALUE_COLOR, lw=2)
    plt.plot("zenith", "min_value", data=df.sort_values("zenith"), label="Min. Limit", color=MIN_VALUE_COLOR, lw=2)
    agg = cvs.points(df, "zenith", "closure_ratio", ds.count()).pipe(lambda xa: xa.where(xa > 0))
    mesh = ax.pcolormesh(agg["zenith"], agg["closure_ratio"], agg.values, cmap=DENSITY_CMAP, norm=plt.cm.colors.LogNorm())
    plt.colorbar(mesh, ax=ax, pad=0.02, label="Closure Ratio Counts Density (log scale)")
    plt.scatter("zenith", "closure_ratio", data=df.loc[test.flag.fails], label="Failed Points", color=FAILED_COLOR, s=5)
    plt.scatter("zenith", "closure_ratio", data=df.loc[not_verifiable], label="Not verifiable", color=NOT_VERIFIABLE_COLOR, s=5)
    plt.xlabel("Solar Zenith Angle (degrees)")
    plt.ylabel("Closure Ratio (GHI / closure(DNI, DIF))")
    plt.title(title)
    plt.xlim(0., 90.)
    plt.ylim(0.80, 1.20)
    plt.legend()
    plt.grid()

    return plt.gca()

test_closure

test_closure(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate radiative closure consistency between GHI, DNI and DIF.

Source code in src/solarpandas/qcontrol/closure.py
def test_closure(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate radiative closure consistency between GHI, DNI and DIF."""

    # flag3lowSZA and flag3highSZA in Table 5 of Forstinger et al.

    # check that I have what I need
    if "ghi" not in sdf.columns:
        logger.warning("`ghi` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    if "dni" not in sdf.columns:
        logger.warning("`dni` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    if "dif" not in sdf.columns:
        logger.warning("`dif` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    dni = sdf["dni"]
    dif = sdf["dif"]
    ghi_closure = dni * sdf.solpos.cosz + dif
    closure_ratio = (ghi / ghi_closure).where(sdf.solpos.sza.lt(87.), 1.)
    min_value = np.where(sdf.solpos.sza.le(75.), 0.92, 0.85)
    max_value = np.where(sdf.solpos.sza.le(75.), 1.08, 1.15)

    # compute where the test fails and where it passes
    notna = ghi.notna() & dni.notna() & dif.notna()
    verifiable = notna & ghi.gt(50.)
    failed = verifiable & (closure_ratio.lt(min_value) | closure_ratio.gt(max_value))
    passed = verifiable & (closure_ratio.ge(min_value) & closure_ratio.le(max_value))

    return construct_qcflag_array(failed, passed)

tracker

Quality-control checks related to tracker behavior and geometry.

Functions:

plot_test_trackeroff

plot_test_trackeroff(
    sdf: SolarDataFrame, test: SolarSeries, **kwargs
) -> Axes

Plot tracker-off diagnostics in KT-K space with flagged points.

Source code in src/solarpandas/qcontrol/tracker.py
def plot_test_trackeroff(sdf: SolarDataFrame, test: SolarSeries, **kwargs) -> plt.Axes:
    """Plot tracker-off diagnostics in KT-K space with flagged points."""

    plt.style.use("solarpandas-qc")
    mpl.rcParams.update({"legend.loc": "lower left"})

    ax = kwargs.pop("ax", None)
    if ax is None:
        _, ax = plt.subplots(1, 1, figsize=(12, 8), layout="constrained")
    ax_box = ax.get_window_extent()

    title = "Tracker-off Test Results"
    if "location" in sdf.custom_metadata:
        title += f" at {sdf.custom_metadata['location']}"
    if "station" in sdf.custom_metadata:
        title += f" ({sdf.custom_metadata['station']}"
        if "network" in sdf.custom_metadata:
            title += f", {sdf.custom_metadata['network']}"
        title += ")"
    else:
        if "network" in sdf.custom_metadata:
            title += f" ({sdf.custom_metadata['network']})"
    title += f" (lat={sdf.latitude:.4f}, lon={sdf.longitude:.4f}, alt={sdf.elevation:.0f} m)"

    cvs = ds.Canvas(plot_width=int(ax_box.width), plot_height=int(ax_box.height),
                    x_range=(-0.05, 1.40), y_range=(-0.05, 1.15))

    not_verifiable = sdf.solpos.sza.ge(85.)

    df = sdf.assign(
        KT=sdf.param.KT,
        K=sdf.param.K)

    agg = cvs.points(df, "KT", "K", ds.count()).pipe(lambda xa: xa.where(xa > 0))
    mesh = ax.pcolormesh(agg["KT"], agg["K"], agg.values, cmap=DENSITY_CMAP,
                         norm=plt.cm.colors.LogNorm(), zorder=1000)
    plt.colorbar(mesh, ax=ax, pad=0.02, label="K Counts Density (log scale)")
    plt.scatter("KT", "K", data=df.loc[test.flag.fails], label="Failed Points",
                color=FAILED_COLOR, s=3, zorder=1002)
    plt.scatter("KT", "K", data=df.loc[not_verifiable], label="Not verifiable",
                color=NOT_VERIFIABLE_COLOR, s=3, zorder=1001)
    plt.xlabel("KT (-)")
    plt.ylabel("K (-)")
    plt.title(title)
    plt.xlim(-0.05, 1.40)
    plt.ylim(-0.05, 1.15)
    leg =plt.legend()
    leg.set_zorder(1003)
    plt.grid()

    return plt.gca()

test_trackeroff

test_trackeroff(sdf: SolarDataFrame) -> ndarray[int8]

Evaluate tracker-off condition test based on CDA ratios.

Source code in src/solarpandas/qcontrol/tracker.py
def test_trackeroff(sdf: SolarDataFrame) -> np.ndarray[np.int8]:
    """Evaluate tracker-off condition test based on CDA ratios."""

    # flagTracker in Table 8 of Forstinger et al.

    # check that I have what I need
    if "ghi" not in sdf.columns:
        logger.warning("`ghi` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    if "dni" not in sdf.columns:
        logger.warning("`dni` column not found in dataframe. Test not possible.")
        return np.full(len(sdf), QCFlagEnum.NOT_VERIFIABLE.value, dtype=np.int8)

    # compute whatever I need to apply the test
    ghi = sdf["ghi"]
    ghic = sdf.cda.ghi
    ghi_ratio = (ghic - ghi) / (ghic + ghi)
    max_value = np.full(len(sdf),0.20)

    dni = sdf["dni"]
    dnic = sdf.cda.dni
    dni_ratio = (dnic - dni) / (dnic + dni)
    min_value = np.full(len(sdf), 0.95)

    # compute where the test fails and where it passes
    notna = ghi.notna() & dni.notna()
    verifiable = notna & sdf.solpos.sza.lt(85.)
    failed = verifiable & dni_ratio.gt(min_value) & ghi_ratio.lt(max_value)
    passed = verifiable & (dni_ratio.le(min_value) | ghi_ratio.ge(max_value))

    return construct_qcflag_array(failed, passed)

timeshift

Quality-control tools to detect and analyze time-shift issues.

Functions:

  • check_timeshift

    Visual check for AM/PM asymmetry to detect possible timestamp shifts.

check_timeshift

check_timeshift(
    sdf: SolarDataFrame, column: str = "auto", **kwargs
) -> Figure

Visual check for AM/PM asymmetry to detect possible timestamp shifts.

Parameters:

  • sdf
    (SolarDataFrame) –

    Input data containing at least dni or ghi.

  • column
    (str, default: "auto" ) –

    Variable to analyze. "auto" prefers dni and falls back to ghi.

  • **kwargs
    (Any, default: {} ) –

    Optional plotting arguments. ax and rc are supported.

Returns:

  • Figure

    Figure with AM and PM density maps versus solar zenith angle.

Examples:

>>> from solarpandas.qcontrol import check_timeshift
>>> fig = check_timeshift(sdf, column="dni")
>>> fig is not None
True
Source code in src/solarpandas/qcontrol/timeshift.py
def check_timeshift(sdf: SolarDataFrame, column: str = "auto", **kwargs) -> plt.Figure:
    """Visual check for AM/PM asymmetry to detect possible timestamp shifts.

    Parameters
    ----------
    sdf : SolarDataFrame
        Input data containing at least ``dni`` or ``ghi``.
    column : str, default "auto"
        Variable to analyze. ``"auto"`` prefers ``dni`` and falls back to ``ghi``.
    **kwargs : Any
        Optional plotting arguments. ``ax`` and ``rc`` are supported.

    Returns
    -------
    matplotlib.figure.Figure
        Figure with AM and PM density maps versus solar zenith angle.

    Examples
    --------
    >>> from solarpandas.qcontrol import check_timeshift
    >>> fig = check_timeshift(sdf, column="dni")
    >>> fig is not None
    True
    """

    if column == "auto":
        if (column := "dni") not in sdf.columns:
            if (column := "ghi") not in sdf.columns:
                raise ValueError("`dni` and `ghi` not found in dataframe. Test not possible.")

    am = (sdf.solpos.tst.dt.hour < 12) & (sdf.solpos.sza < 90)
    pm = (sdf.solpos.tst.dt.hour >= 12) & (sdf.solpos.sza < 90)

    plt.style.use("solarpandas-qc")
    if "rc" in kwargs:
        mpl.rcParams.update(kwargs.pop("rc"))

    if (ax := kwargs.pop("ax", None)) is None:
        fig, ax = plt.subplots(1, 1, figsize=(12, 8))
        fig.subplots_adjust(left=0.08, right=0.94, top=0.95, bottom=0.08)

    pos = ax.get_position()  # posicion original
    ax.set_position([pos.x0, pos.y0, pos.width * 0.95, pos.height])  # reducir el ancho del eje
    pos = ax.get_position()  # nueva posicion del eje
    cb_h = pos.height * 0.48  # altura de la barra de color (45% de la altura del eje)
    cb_w = pos.width * 0.025  # ancho de la barra de color (5% del ancho del eje)
    cax_am = plt.axes([pos.x1 + pos.width*0.01, pos.y1 - cb_h, cb_w, cb_h])  # eje para la barra de color
    cax_pm = plt.axes([pos.x1 + pos.width*0.01, pos.y0, cb_w, cb_h])  # eje para la barra de color

    ax_box = ax.get_window_extent()

    title = "Timeshift Check Results"
    network = sdf.custom_metadata.get("network", None)
    if network is not None and network.casefold() == "bsrn":
        station = sdf.custom_metadata.get("station", "unknown station")
        location = sdf.custom_metadata.get("location", "unknown location")
        acronym = sdf.custom_metadata.get("acronym", "unknown acronym")
        title += f" at {station}, {location} ({acronym.upper()}, BSRN)"
    title += f" (lat={sdf.latitude:.4f}, lon={sdf.longitude:.4f}, alt={sdf.elevation:.0f} m)"

    cvs = ds.Canvas(plot_width=int(ax_box.width), plot_height=int(ax_box.height),
                    x_range=(0., 90.)) #, y_range=(0., None))
    agg = (cvs.points(sdf.assign(sza=sdf.solpos.sza).loc[am], "sza", column, ds.count())
           .pipe(lambda xa: xa.where(xa > 0)))
    mesh = ax.pcolormesh(agg.sza, agg[column], agg.values,
                         cmap="Blues_r", norm=mpl.colors.LogNorm())
    plt.colorbar(mesh, cax=cax_am, label="AM Counts Density (log scale)")

    agg = (cvs.points(sdf.assign(sza=sdf.solpos.sza).loc[pm], "sza", column, ds.count())
           .pipe(lambda xa: xa.where(xa > 0)))
    mesh = ax.pcolormesh(agg.sza, agg[column], agg.values,
                         cmap="Oranges_r", norm=mpl.colors.LogNorm())
    plt.colorbar(mesh, cax=cax_pm, label="PM Counts Density (log scale)")

    ax.set_xlabel("Solar Zenith Angle (degrees)")
    ax.set_ylabel(f"{column.upper()} (W m$^{{-2}}$)")
    ax.set_title(title)
    ax.set_xlim(0., 90.)
    ax.set_ylim(0., None)
    ax.grid()

    return plt.gcf()

BSRN Origin

core

Core public API to inspect, fetch, and load BSRN datasets.

Classes:

  • LogicalRecord

    Representation of one logical record block found in a BSRN file.

Functions:

  • clear_cache

    Clear cached data files for a given site, year and logical record.

  • data_availability

    Inspect the availability of BSRN data on the remote FTP server.

  • get_database_path

    Get the path to the local BSRN database directory.

  • load_data

    Load yearly BSRN data from cache or raw FTP files.

  • load_data_from_bsrn_files

    Load and parse monthly BSRN .dat.gz files from local FTP mirror.

  • load_metadata

    Load cached station metadata, optionally refreshing remote source.

  • parse_bsrn_file

    Parse selected logical records from a BSRN .dat.gz file.

LogicalRecord dataclass

LogicalRecord(
    signature: str,
    first_line: int,
    last_line: int,
    lines: list[str],
    parser: Callable | None = None,
)

Representation of one logical record block found in a BSRN file.

Methods:

  • parse

    Parse record lines with the associated logical-record parser.

parse
parse(**kwargs) -> dict[str, Any]

Parse record lines with the associated logical-record parser.

Source code in src/solarpandas/origin/bsrn/core.py
def parse(self, **kwargs) -> dict[str, Any]:
    """Parse record lines with the associated logical-record parser."""
    if self.parser is None:
        raise ValueError("no parser available for logical record {self.name}")
    logger.debug(f"parsing <blue>{self.name}</blue> with parser <blue>{self.parser.__name__}</blue>")
    return self.parser(self.lines, **kwargs)

clear_cache

clear_cache(
    site: Site | None = None,
    year: Year | None = None,
    logical_record: DataLogicalRecordName | None = None,
) -> None

Clear cached data files for a given site, year and logical record.

Parameters:

  • site
    (Site or None, default: None ) –

    Three-letter station code. If None, all sites are included.

  • year
    (Year or None, default: None ) –

    Year to clear. If None, all years are included.

  • logical_record
    (DataLogicalRecordName or None, default: None ) –

    Logical record to clear. If None, all logical records are included.

Examples:

>>> from solarpandas.origin.bsrn import clear_cache
>>> clear_cache(site="car", year=2016, logical_record="LR0100")
Source code in src/solarpandas/origin/bsrn/core.py
def clear_cache(
    site: Site | None = None,
    year: Year | None = None,
    logical_record: DataLogicalRecordName | None = None
) -> None:
    """Clear cached data files for a given site, year and logical record.

    Parameters
    ----------
    site : Site or None, default None
        Three-letter station code. If ``None``, all sites are included.
    year : Year or None, default None
        Year to clear. If ``None``, all years are included.
    logical_record : DataLogicalRecordName or None, default None
        Logical record to clear. If ``None``, all logical records are included.

    Examples
    --------
    >>> from solarpandas.origin.bsrn import clear_cache
    >>> clear_cache(site="car", year=2016, logical_record="LR0100")
    """

    cache_path = get_database_path() / "cached"

    site_pattern = validate_type(site, Site) if site is not None else "*"
    sites_to_clear = list(cache_path.glob(site_pattern))
    logger.debug(f"Sites to clear: {[site.name for site in sites_to_clear]}")

    year = validate_type(year, Year)
    year_pattern = f"{year}" if year is not None else "*"

    logical_record = validate_type(logical_record, DataLogicalRecordName)
    lr_pattern = logical_record.lower() if logical_record is not None else "*"

    for site_dir in sites_to_clear:
        path = f"{site_dir.name}_{year_pattern}_{lr_pattern}.parquet"
        files_to_clear = list(site_dir.glob(path))
        logger.debug(f"Files to clear: {files_to_clear}")
        for file in files_to_clear:
            file.unlink()
            logger.info(f"Cleared cached file: {file}")
        if not list(site_dir.iterdir()):
            site_dir.rmdir()
            logger.info(f"Removed empty site directory: {site_dir}")

data_availability

data_availability(
    update: Literal["auto"] | bool = "auto",
    as_year_table: bool = False,
    fill_char: str = "#",
    transposed: bool = False,
    year_table_output: str | Path | None = None,
) -> dict[str, list[str]] | str

Inspect the availability of BSRN data on the remote FTP server.

This function connects to the BSRN FTP server and retrieves a list of available data files for each site. The results are cached locally in a JSON file to avoid unnecessary FTP connections. The cache is updated if it is older than 7 days or if the update parameter is set to True.

Parameters:

  • update
    ((auto, bool), default: "auto" ) –

    Whether to refresh the local availability cache. With "auto", the cache is refreshed when older than 7 days.

  • as_year_table
    (bool, default: False ) –

    If True, return a plain-text table with one row per site and one column per year (or transposed when transposed=True).

  • fill_char
    (str, default: "#" ) –

    Character used to mark years with available data in the annual table. Must be a single character.

  • transposed
    (bool, default: False ) –

    If True, the year table is transposed: each row is a year in ascending order and each column is a site. Site acronyms are shown vertically in a three-row header. Has no effect when as_year_table is False.

  • year_table_output
    (str or Path or None, default: None ) –

    Optional output path to persist the annual table as a text file. The table is generated when this argument is provided, even if as_year_table is False.

Returns:

  • dict[str, list[str]] or str

    Mapping from site acronym to list of available monthly files, or a yearly availability table when as_year_table is True.

Examples:

>>> from solarpandas.origin.bsrn import data_availability
>>> table = data_availability(update=False, as_year_table=True)
>>> isinstance(table, str)
True
Source code in src/solarpandas/origin/bsrn/core.py
def data_availability(
    update: Literal["auto"] | bool = "auto",
    as_year_table: bool = False,
    fill_char: str = "#",
    transposed: bool = False,
    year_table_output: str | Path | None = None,
) -> dict[str, list[str]] | str:
    """Inspect the availability of BSRN data on the remote FTP server.

    This function connects to the BSRN FTP server and retrieves a list of
    available data files for each site. The results are cached locally in a
    JSON file to avoid unnecessary FTP connections. The cache is updated if it
    is older than 7 days or if the `update` parameter is set to `True`.

    Parameters
    ----------
    update : {"auto", bool}, default "auto"
        Whether to refresh the local availability cache. With ``"auto"``, the
        cache is refreshed when older than 7 days.
    as_year_table : bool, default False
        If ``True``, return a plain-text table with one row per site and one
        column per year (or transposed when ``transposed=True``).
    fill_char : str, default "#"
        Character used to mark years with available data in the annual table.
        Must be a single character.
    transposed : bool, default False
        If ``True``, the year table is transposed: each row is a year in
        ascending order and each column is a site. Site acronyms are shown
        vertically in a three-row header. Has no effect when
        ``as_year_table`` is ``False``.
    year_table_output : str or pathlib.Path or None, default None
        Optional output path to persist the annual table as a text file. The
        table is generated when this argument is provided, even if
        ``as_year_table`` is ``False``.

    Returns
    -------
    dict[str, list[str]] or str
        Mapping from site acronym to list of available monthly files, or a
        yearly availability table when ``as_year_table`` is ``True``.

    Examples
    --------
    >>> from solarpandas.origin.bsrn import data_availability
    >>> table = data_availability(update=False, as_year_table=True)
    >>> isinstance(table, str)
    True
    """
    availability_path = get_database_path() / "ftp" / "availability.json"
    file_age_days = helpers.get_file_age(availability_path)

    if update == "auto":
        update = False
        if file_age_days > 7:  # update if the file is older than 7 days
            logger.info(
                f"Availability file is {file_age_days:.1f} days old. Updating..."
            )
            update = True

    if update or not availability_path.exists():
        availability = helpers.inspect_data_availability(timeout=30)
        availability_path.parent.mkdir(parents=True, exist_ok=True)
        with availability_path.open("w") as f:
            json.dump(availability, f, indent=4)

    with availability_path.open("r") as f:
        availability = json.load(f)

    year_table = None
    if as_year_table or year_table_output is not None:
        year_table = _availability_to_year_table(availability, fill_char=fill_char, transposed=transposed)

    if year_table_output is not None:
        output_path = Path(year_table_output)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        output_path.write_text(year_table, encoding="utf-8")

    if as_year_table:
        return year_table
    return availability

get_database_path

get_database_path()

Get the path to the local BSRN database directory.

This function retrieves the path from the global configuration. If the path is not set, it returns the default path.

Returns:

  • Path

    Path to the local BSRN database directory.

Examples:

>>> from solarpandas.origin.bsrn import get_database_path
>>> get_database_path().name
'bsrn'
Source code in src/solarpandas/origin/bsrn/core.py
def get_database_path():
    """Get the path to the local BSRN database directory.

    This function retrieves the path from the global configuration. If the
    path is not set, it returns the default path.

    Returns
    -------
    pathlib.Path
        Path to the local BSRN database directory.

    Examples
    --------
    >>> from solarpandas.origin.bsrn import get_database_path
    >>> get_database_path().name
    'bsrn'
    """
    default_path = platformdirs.user_data_path(appname="solarpandas") / "bsrn"
    return get_option("bsrn.data_dir", default=default_path)

load_data

load_data(
    site: Site,
    years: Sequence[Year] | Year,
    logical_record: Literal[
        "LR0100", "LR0300", "LR0500"
    ] = "LR0100",
    group: Literal["essential", "avg", "all"] = "essential",
) -> SolarDataFrame | None

Load yearly BSRN data from cache or raw FTP files.

Parameters:

  • site
    (Site) –

    Three-letter BSRN station code.

  • years
    (Year or sequence of Year) –

    Year or list of years to retrieve.

  • logical_record
    ((LR0100, LR0300, LR0500), default: "LR0100" ) –

    Logical record to load and cache.

  • group
    ((essential, avg, all), default: "essential" ) –

    Variable group selection based on CF metadata tags.

Returns:

  • SolarDataFrame or None

    Combined dataframe with harmonized metadata, or None when no data could be retrieved.

Examples:

>>> from solarpandas.origin.bsrn import load_data
>>> sdf = load_data(site="car", years=2016)
>>> sdf is None or "ghi" in sdf.columns
True
Source code in src/solarpandas/origin/bsrn/core.py
def load_data(
    site: Site,
    years: Sequence[Year] | Year,
    logical_record: Literal["LR0100", "LR0300", "LR0500"] = "LR0100",
    group: Literal["essential", "avg", "all"] = "essential",
) -> SolarDataFrame | None:
    """Load yearly BSRN data from cache or raw FTP files.

    Parameters
    ----------
    site : Site
        Three-letter BSRN station code.
    years : Year or sequence of Year
        Year or list of years to retrieve.
    logical_record : {"LR0100", "LR0300", "LR0500"}, default "LR0100"
        Logical record to load and cache.
    group : {"essential", "avg", "all"}, default "essential"
        Variable group selection based on CF metadata tags.

    Returns
    -------
    SolarDataFrame or None
        Combined dataframe with harmonized metadata, or ``None`` when no data
        could be retrieved.

    Examples
    --------
    >>> from solarpandas.origin.bsrn import load_data
    >>> sdf = load_data(site="car", years=2016)
    >>> sdf is None or "ghi" in sdf.columns
    True
    """

    site = validate_type(site, Site)
    years = [validate_type(year, Year) for year in np.asarray(years, dtype=int).reshape(-1)]
    logical_record = validate_type(logical_record, DataLogicalRecordName)

    # For standard_names see: https://cfconventions.org/Data/cf-standard-names/current/build/cf-standard-name-table.html
    if not (path := files("solarpandas").joinpath("origin/bsrn/cf-metadata.json")).exists():
        logger.warning("CF metadata file not found. Cannot load metadata.")
    cf_metadata = json.loads(path.read_text())

    def collect_variables_metadata(columns: list[str]) -> dict:
        var_metadata ={}
        for varname in columns:
            if varname in cf_metadata:
                values = {key: value for key, value in cf_metadata[varname].items() if not key.startswith("_")}
                var_metadata[values["short_name"]] = values | {"bsrn_name": varname}
            else:
                logger.warning(f"No CF metadata found for variable '{varname}'. Skipping metadata assignment for this variable.")
                var_metadata[varname] = {
                    "standard_name": "unknown",
                    "long_name": "unknown",
                    "short_name": "unknown",
                    "units": "unknown",
                    "bsrn_name": varname,
                }
        return var_metadata

    def filter_columns(columns: list[str], group: str) -> list[str]:
        logger.debug(f"Filtering columns for group '{group}'...")

        group_columns = []

        # 1. Take all group columns in cf-metadata
        if group.casefold() == "all":
            group_columns = [cf_metadata.get(column, {}).get("short_name", column) for column in columns]
        else:
            for vattrs in cf_metadata.values():
                if "_groups" not in vattrs:
                    continue
                if group in map(str.strip, vattrs["_groups"].split(",")):
                    group_columns.append(vattrs["short_name"])

            logger.debug(f"Group columns: {group_columns}")

        # 2. Take the intersection with the columns in the data
        return [col for col in columns if col in group_columns]

    db_path = get_database_path() / "cached" / site
    db_path.mkdir(parents=True, exist_ok=True)

    load_bsrn_files = functools.partial(
        load_data_from_bsrn_files,
        site=site,
        months=range(1, 13),
        filled=True,
        centered=True,
        include_metadata=False,
        extra_records=None if logical_record == "LR0100" else [logical_record])

    paths = []
    for year in years:
        if not (file_path := db_path / f"{site}_{year}_{logical_record.lower()}.parquet").exists():
            logger.info(f"cached file {file_path.name} not found. Loading data from BSRN files...")

            # 1. collect the data
            data = load_bsrn_files(years=year)
            if logical_record != "LR0100":
                data = data[1][logical_record]  # logical_record is in [LR0300, LR0500]
            if data is None:
                logger.warning(f"no data retrieved for {site=}, {year=}, and {logical_record=}. Skipping...")
                continue

            # 2. collect variables metadata
            vmetadata = collect_variables_metadata(data.columns.tolist())
            data.custom_metadata["variables"] = vmetadata.copy()
            rename_map = {meta["bsrn_name"]: var for var, meta in vmetadata.items()}

            # 3. update custom_metadata and column names and save to parquet
            (
                data
                .rename(columns=rename_map)
                .rename_axis("time", axis=0)
                .astype(np.float32)
                .reset_index()
                .to_parquet(file_path)
            )
        paths.append(file_path)

    if not paths:
        logger.warning(f"no data available for {site=}, {years=}, and {logical_record=}. Returning None.")
        return None

    unfiltered_data = pd.concat([read_parquet(path) for path in sorted(paths)], axis=0).set_index("time")
    selected_columns = filter_columns(unfiltered_data.columns.tolist(), group=group)
    data = unfiltered_data.get(selected_columns)
    data.custom_metadata["variables"] = {var: meta for var, meta in data.custom_metadata["variables"].items()
                                         if var in selected_columns}
    return data

load_data_from_bsrn_files

load_data_from_bsrn_files(
    site: Site,
    years: Sequence[Year] | Year,
    months: Sequence[Month] | Month = range(1, 13),
    filled: bool = True,
    centered: bool = True,
    include_metadata: Literal[False] = False,
    extra_records: None = None,
) -> None | SolarDataFrame
load_data_from_bsrn_files(
    site: Site,
    years: Sequence[Year] | Year,
    months: Sequence[Month] | Month = range(1, 13),
    filled: bool = True,
    centered: bool = True,
    include_metadata: Literal[True] = True,
    extra_records: None = None,
) -> None | tuple[SolarDataFrame, DataFrame]
load_data_from_bsrn_files(
    site: Site,
    years: Sequence[Year] | Year,
    months: Sequence[Month] | Month = range(1, 13),
    filled: bool = True,
    centered: bool = True,
    include_metadata: Literal[False] = False,
    extra_records: list[Literal["LR0300", "LR0500"]] = ...,
) -> (
    None | tuple[SolarDataFrame, dict[str, SolarDataFrame]]
)
load_data_from_bsrn_files(
    site: Site,
    years: Sequence[Year] | Year,
    months: Sequence[Month] | Month = range(1, 13),
    filled: bool = True,
    centered: bool = True,
    include_metadata: Literal[True] = True,
    extra_records: list[Literal["LR0300", "LR0500"]] = ...,
) -> (
    None
    | tuple[
        SolarDataFrame, DataFrame, dict[str, SolarDataFrame]
    ]
)
load_data_from_bsrn_files(
    site: Site,
    years: Sequence[Year] | Year,
    months: Sequence[Month] | Month = range(1, 13),
    filled: bool = True,
    centered: bool = True,
    include_metadata: bool = False,
    extra_records: list[Literal["LR0300", "LR0500"]]
    | None = None,
)

Load and parse monthly BSRN .dat.gz files from local FTP mirror.

Parameters:

  • site
    (Site) –

    Three-letter station code.

  • years
    (Year or sequence of Year) –

    Years to load.

  • months
    (Month or sequence of Month, default: ``range(1, 13)`` ) –

    Months to load.

  • filled
    (bool, default: True ) –

    If True, reindex to dense 1-minute frequency.

  • centered
    (bool, default: True ) –

    If True, shift timestamps by 30 seconds to represent minute centers.

  • include_metadata
    (bool, default: False ) –

    Whether to include per-file metadata dataframe in output tuple.

  • extra_records
    (list[{LR0300, LR0500}] or None, default: None ) –

    Additional logical records to parse and return.

Returns:

  • Various

    Return type depends on include_metadata and extra_records according to overload declarations above.

Source code in src/solarpandas/origin/bsrn/core.py
def load_data_from_bsrn_files(
    site: Site,
    years: Sequence[Year] | Year,
    months: Sequence[Month] | Month = range(1, 13),
    filled: bool = True,
    centered: bool = True,
    include_metadata: bool = False,
    extra_records: list[Literal["LR0300", "LR0500"]] | None = None,
):
    """Load and parse monthly BSRN ``.dat.gz`` files from local FTP mirror.

    Parameters
    ----------
    site : Site
        Three-letter station code.
    years : Year or sequence of Year
        Years to load.
    months : Month or sequence of Month, default ``range(1, 13)``
        Months to load.
    filled : bool, default True
        If ``True``, reindex to dense 1-minute frequency.
    centered : bool, default True
        If ``True``, shift timestamps by 30 seconds to represent minute centers.
    include_metadata : bool, default False
        Whether to include per-file metadata dataframe in output tuple.
    extra_records : list[{"LR0300", "LR0500"}] or None, default None
        Additional logical records to parse and return.

    Returns
    -------
    Various
        Return type depends on ``include_metadata`` and ``extra_records``
        according to overload declarations above.
    """

    site = validate_type(site, Site)
    years = [validate_type(year, Year) for year in np.asarray(years, dtype=int).reshape(-1)]
    months = [validate_type(month, Month) for month in np.asarray(months, dtype=int).reshape(-1)]

    if extra_records is not None:
        for lr in extra_records:
            if lr not in ["LR0300", "LR0500"]:
                raise ValueError(f"invalid logical record name in extra_records: {lr}. "
                                 "Supported values are 'LR0300' and 'LR0500'.")

    parse_bsrn_file_with_extra_records = functools.partial(__parse_bsrn_file__, logical_records=extra_records)

    list_of_years_and_months = sorted(itertools.product(years, months), key=lambda x: (x[0], x[1]))

    logger.info(f"loading data for {len(list_of_years_and_months)} BSRN files...")

    tasks = [(site, year, month) for year, month in list_of_years_and_months]
    with mp.Pool(mp.cpu_count()) as workers:
        # starmap keeps the order of the tasks, so the output is ordered by year and month
        retrievals = workers.starmap(parse_bsrn_file_with_extra_records, tasks, chunksize=1)

    # remove empty retrievals (files not found or with no supported logical records)
    if not (retrievals := [retr for retr in retrievals if len(retr) > 0]):
        logger.warning(f"no data retrieved for {site=}, {years=}, and {months=}")
        return None

    #===================================================================================
    # PREPARE THE DATA AND METADATA TO BE RETURNED.
    #   DATA IS A SOLARDATAFRAME
    #   METADATA IS A PANDAS DATAFRAME
    # The metadata included in the data solardataframe, included latitude,
    # longitude and altitude are gathered from `load_metadata`, which retrieves
    # them from Pangaea. The metadata included in the metadata dataframe, included
    # surface type, topography type, horizon azimuth and elevation, are gathered
    # from the logical record LR0004 of each file. The metadata included in the
    # data solardataframe is expected to be consistent across all files, while the
    # metadata included in the metadata dataframe may vary across files (e.g., if
    # there are changes in the surface type or topography type during the period
    # of interest).
    #===================================================================================

    def clean_data_retrieval(retrieval: dict, lr: LogicalRecordName) -> pd.DataFrame:
        if (data := retrieval.get(lr)) is None:
            return None

        year = retrieval.get("year")
        month = retrieval.get("month")

        # set a DatetimeIndex with the time information in the logical record
        time_dict = {"year": year, "month": month, "day": data["day"], "hour": data["hour"], "minute": data["minute"]}
        times_utc = pd.to_datetime(pd.DataFrame(time_dict), utc=True)
        data = data.set_index(times_utc).drop(columns=["day", "hour", "minute"])

        # add missing timestamps with NaN values, if necessary
        start = pd.to_datetime(f"{year}-{month:02d}-01")
        end = start + pd.offsets.MonthBegin(1)
        dense_times = pd.date_range(start, end, freq="1min", inclusive="left", tz="UTC")
        return data.reindex(dense_times)

    data = pd.concat([clean_data_retrieval(retr, lr="LR0100") for retr in retrievals], axis=0)

    if extra_records is not None:
        extra_data = {}
        for lr in extra_records:
            logger.info(f"processing extra logical record {lr}...")
            clean_retrievals = [clean_data_retrieval(retr, lr=lr) for retr in retrievals]
            if not len(this_data := [df for df in clean_retrievals if df is not None]):
                logger.warning(f"no data retrieved for logical record {lr} in {site=}, {years=}, and {months=}")
                extra_data[lr] = None
            else:
                extra_data[lr] = pd.concat(this_data, axis=0)

    if centered:
        data = data.set_index(data.index + pd.to_timedelta("30s"))
        if extra_records is not None:
            for lr, df in extra_data.items():
                if df is not None:
                    extra_data[lr] = df.set_index(df.index + pd.to_timedelta("30s"))

    if filled:
        dense_times = pd.date_range(data.index.min(), data.index.max(), freq="1min", inclusive="both", tz="UTC")
        data = data.reindex(dense_times)
        if extra_records is not None:
            for lr, df in extra_data.items():
                if df is not None:
                    dense_times = pd.date_range(df.index.min(), df.index.max(), freq="1min", inclusive="both", tz="UTC")
                    extra_data[lr] = df.reindex(dense_times)

    variables = ("surface_type", "topography_type", "latitude", "longitude",
                 "altitude", "horizon_azimuth", "horizon_elevation")
    metadata = [{"year": retr["year"], "month": retr["month"]} |
                {key: retr["LR0004"][key] for key in variables}
                for retr in retrievals]
    metadata = pd.DataFrame.from_records(metadata)

    if metadata["latitude"].nunique() > 1:
        logger.warning("the retrieved data contains different latitude values "
                       f"({metadata['latitude'].unique()}). This is not expected.")

    if metadata["longitude"].nunique() > 1:
        logger.warning("the retrieved data contains different longitude values "
                       f"({metadata['longitude'].unique()}). This is not expected.")

    if metadata["altitude"].nunique() > 1:
        logger.warning("the retrieved data contains different altitude values "
                       f"({metadata['altitude'].unique()}). This is not expected.")

    allsite_metadata = load_metadata()
    site_metadata = allsite_metadata.get(site.casefold(), {})

    latitude = site_metadata.get("latitude", metadata["latitude"].iloc[-1] if "latitude" in metadata else None)
    if latitude is None:
        raise ValueError("latitude is missing.")

    longitude = site_metadata.get("longitude", metadata["longitude"].iloc[-1] if "longitude" in metadata else None)
    if longitude is None:
        raise ValueError("longitude is missing.")

    elevation = site_metadata.get("altitude", metadata["altitude"].iloc[-1] if "altitude" in metadata else None)
    if elevation is None:
        logger.warning("elevation is missing. Setting elevation to 0.")

    custom_metadata = {}
    custom_metadata["station"] = site.upper()
    custom_metadata["location"] = site_metadata.get("station", "unknown")
    if "location" in site_metadata:
        province_and_or_country = site_metadata["location"]
        custom_metadata["location"] = custom_metadata["location"] + f", {province_and_or_country}"
    custom_metadata["network"] = "BSRN"
    custom_metadata["source"] = "BSRN FTP server via solarpandas"
    custom_metadata["institution"] = "Jose A Ruiz-Arias (solarpandas dev) and BSRN data providers"
    custom_metadata["contact"] = "jararias@uma.es"

    custom_metadata["timestamp_alignment"] = "center" if centered else "start"
    custom_metadata["surface_type"] = metadata["surface_type"].iloc[-1] if "surface_type" in metadata else "unknown"
    custom_metadata["topography_type"] = metadata["topography_type"].iloc[-1] if "topography_type" in metadata else "unknown"
    custom_metadata["horizon_azimuth"] = metadata["horizon_azimuth"].iloc[-1] if "horizon_azimuth" in metadata else "unknown"
    custom_metadata["horizon_elevation"] = metadata["horizon_elevation"].iloc[-1] if "horizon_elevation" in metadata else "unknown"

    data = SolarDataFrame(
        data,
        latitude=latitude,
        longitude=longitude,
        elevation=elevation,
        custom_metadata=custom_metadata)

    if extra_records is not None:
        for lr, df in extra_data.items():
            if df is not None:
                extra_data[lr] = SolarDataFrame(
                    df,
                    latitude=latitude,
                    longitude=longitude,
                    elevation=elevation,
                    custom_metadata=custom_metadata)

    if not include_metadata:
        if extra_records is None:
            return data  # overload case 1
        return data, extra_data  # overload case 3

    if extra_records is None:
        return data, metadata  # overload case 2
    return data, metadata, extra_data  # overload case 4

load_metadata

load_metadata(update: Literal['auto'] | bool = 'auto')

Load cached station metadata, optionally refreshing remote source.

Parameters:

  • update
    ((auto, bool), default: "auto" ) –

    If True, force refresh from PANGAEA. If "auto", refresh when cache age is older than 7 days.

Returns:

  • dict[str, Any]

    Station metadata dictionary keyed by site acronym.

Examples:

>>> from solarpandas.origin.bsrn import load_metadata
>>> meta = load_metadata(update=False)
>>> isinstance(meta, dict)
True
Source code in src/solarpandas/origin/bsrn/core.py
def load_metadata(update: Literal["auto"] | bool = "auto"):
    """Load cached station metadata, optionally refreshing remote source.

    Parameters
    ----------
    update : {"auto", bool}, default "auto"
        If ``True``, force refresh from PANGAEA. If ``"auto"``, refresh when
        cache age is older than 7 days.

    Returns
    -------
    dict[str, Any]
        Station metadata dictionary keyed by site acronym.

    Examples
    --------
    >>> from solarpandas.origin.bsrn import load_metadata
    >>> meta = load_metadata(update=False)
    >>> isinstance(meta, dict)
    True
    """

    metadata_path = get_database_path() / "ftp" / "metadata.json"
    file_age_days = helpers.get_file_age(metadata_path)

    if update == "auto":
        update = False
        if file_age_days > 7:  # update if the file is older than 7 days
            logger.info(f"Metadata file is {file_age_days:.1f} days old. Updating...")
            update = True

    if update or not metadata_path.exists():
        metadata = helpers.fetch_allsite_metadata_from_pangaea()
        metadata_path.parent.mkdir(parents=True, exist_ok=True)
        with metadata_path.open("w") as f:
            json.dump(metadata, f, indent=4)

    with metadata_path.open("r") as f:
        return json.load(f)

parse_bsrn_file

parse_bsrn_file(
    path: Path,
    check_remote_on_missing_file: bool = True,
    logical_records: LogicalRecordName
    | list[LogicalRecordName]
    | None = None,
    timeout: int = 30,
) -> dict[str, Any]

Parse selected logical records from a BSRN .dat.gz file.

Parameters:

  • path
    (Path) –

    Local path to monthly BSRN file.

  • check_remote_on_missing_file
    (bool, default: True ) –

    If True, attempt remote FTP download when file is missing locally.

  • logical_records
    (LogicalRecordName or list[LogicalRecordName] or None, default: None ) –

    Records to parse. When None, all supported records in file are parsed.

  • timeout
    (int, default: 30 ) –

    FTP timeout in seconds for remote retrieval.

Returns:

  • dict[str, Any]

    Parsed content keyed by logical record name (e.g. "LR0100").

Source code in src/solarpandas/origin/bsrn/core.py
def parse_bsrn_file(
    path: Path,
    check_remote_on_missing_file: bool = True,
    logical_records: LogicalRecordName | list[LogicalRecordName] | None = None,
    timeout: int = 30,
) -> dict[str, Any]:
    """Parse selected logical records from a BSRN ``.dat.gz`` file.

    Parameters
    ----------
    path : pathlib.Path
        Local path to monthly BSRN file.
    check_remote_on_missing_file : bool, default True
        If ``True``, attempt remote FTP download when file is missing locally.
    logical_records : LogicalRecordName or list[LogicalRecordName] or None
        Records to parse. When ``None``, all supported records in file are parsed.
    timeout : int, default 30
        FTP timeout in seconds for remote retrieval.

    Returns
    -------
    dict[str, Any]
        Parsed content keyed by logical record name (e.g. ``"LR0100"``).
    """

    if logical_records is not None:
        if isinstance(logical_records, str):
            logical_records = [logical_records]
        logical_records = [
            validate_type(lr, LogicalRecordName) for lr in logical_records
        ]

    if logical_records is None:
        logger.debug("the user has not specified a list of logical records to parse")
    else:
        logger.debug(
            f"the user has specified the following logical records to parse: {logical_records}"
        )

    logger.debug(f"the supported logical records are: {SUPPORTED_LOGICAL_RECORDS}")

    if check_remote_on_missing_file and not path.exists():
        path = helpers.fetch_site_data_from_ftp(
            path.name, path.parent, timeout=timeout
        )

    if not path.exists():
        logger.error(
            f"BSRN data file {path.name} not found in {path.parent}"
        )
        return {}

    logger.info(f"reading file <blue>{path.name}</blue> (@ {path.parent})")
    with gzip.open(path, "rb") as gz:
        txt_data = [line.rstrip().decode("utf-8") for line in gz.readlines()]

    # find all logical records in the data but keep only the supported ones (if specified)
    logical_records_in_data = LogicalRecord.find_in_data(txt_data)

    logger.debug(
        f"the following logical records are present in the data: {[lr.name for lr in logical_records_in_data]}"
    )

    supported_logical_records_in_data = [
        lr for lr in logical_records_in_data if lr.name in SUPPORTED_LOGICAL_RECORDS
    ]

    # if the user does not specify which logical records to parse, parse all the supported ones
    # that are present in the data.
    if logical_records is None:
        logical_records = supported_logical_records_in_data

    # otherwise, parse only the user specified logical records that are present in the data and
    # raise a warning to the user about the rest.
    else:
        logical_records_to_be_parsed = []
        for lr in logical_records:
            if lr in [lr_.name for lr_ in supported_logical_records_in_data]:
                logical_records_to_be_parsed.append(lr)
            else:
                if lr in SUPPORTED_LOGICAL_RECORDS:
                    logger.warning(f"the logical record {lr} is not in data. Ignoring it.")
                else:
                    logger.warning(f"the logical record {lr} is not supported. Ignoring it.")
        logical_records = [
            lr
            for lr in supported_logical_records_in_data
            if lr.name in logical_records_to_be_parsed
        ]

    logger.debug(
        f"the following logical records will be parsed: {[lr.name for lr in logical_records]}"
    )

    contents = {}

    for logical_record in sorted(logical_records, key=lambda lr: lr.name):  # sort logical records by their name (LRxxxx)
        if not logical_record.parser:
            logger.warning(
                f"unavailable parser for logical record with id {logical_record.name}"
            )
            continue

        lr_data = logical_record.parse(path=path)
        contents[logical_record.name] = lr_data

    return contents

helpers

Helper utilities for parsing, downloading, and preparing BSRN resources.

Functions:

fetch_allsite_metadata_from_pangaea

fetch_allsite_metadata_from_pangaea()

Fetch and normalize BSRN station metadata from the PANGAEA catalog.

Returns:

  • dict[str, dict[str, Any]]

    Mapping keyed by station acronym (lowercase).

Source code in src/solarpandas/origin/bsrn/helpers.py
def fetch_allsite_metadata_from_pangaea():
    """Fetch and normalize BSRN station metadata from the PANGAEA catalog.

    Returns
    -------
    dict[str, dict[str, Any]]
        Mapping keyed by station acronym (lowercase).
    """
    # More tables in:
    #  https://dataportals.pangaea.de/bsrn/
    #  End of: hhttps://bsrn.awi.de/data/data-retrieval-via-pangaea/

    url = "https://www.pangaea.de/ddi?request=bsrn/BSRNEvent&format=html&title=BSRN+Stations"
    logger.debug(f"fetching BSRN station metadata from {url}")
    table = pd.read_html(url)[0]

    column_mapping = {
        "Event, optional label": "station",
        "Event label": "acronym",
        "Location": "location",
        "Station info": "info",
        "Latitude": "latitude",
        "Longitude": "longitude",
        "Elevation": "altitude",
        "Date/Time start": "start",
        "Date/Time end": "end",
        "Comment": "comment",
        "URI of event": "uri",
    }

    table = table.rename(columns=column_mapping)
    table = table[list(column_mapping.values())]
    table["acronym"] = table["acronym"].str.strip().str.lower()
    table["latitude"] = table["latitude"].astype(float)
    table["longitude"] = table["longitude"].astype(float)
    table["altitude"] = table["altitude"].astype(float)
    table["start"] = pd.to_datetime(table["start"], errors="coerce")
    table["end"] = pd.to_datetime(table["end"], errors="coerce")

    records = json.loads(table.to_json(orient="records", date_format="iso"))
    return {record["acronym"]: {key: value for key, value in record.items() if key != "acronym"}
            for record in records}

fetch_site_data_from_ftp

fetch_site_data_from_ftp(
    remote_fn: str,
    local_path: str | Path,
    user: str | None = None,
    password: str | None = None,
    timeout: int | None = None,
) -> None

Download one BSRN monthly file from the FTP server.

Parameters:

  • remote_fn
    (str) –

    Remote filename (for example cab0124.dat.gz).

  • local_path
    (str or Path) –

    Local directory where the file will be stored.

  • user
    (str or None, default: None ) –

    FTP credentials. When omitted, credentials are read from ~/.netrc.

  • password
    (str or None, default: None ) –

    FTP credentials. When omitted, credentials are read from ~/.netrc.

  • timeout
    (int or None, default: None ) –

    FTP connection timeout in seconds.

Returns:

  • Path

    Path to the downloaded local file.

Source code in src/solarpandas/origin/bsrn/helpers.py
def fetch_site_data_from_ftp(
    remote_fn: str,
    local_path: str | Path,  # directory to store the downloaded file
    user: str | None = None,
    password: str | None = None,
    timeout: int | None = None
) -> None:
    """Download one BSRN monthly file from the FTP server.

    Parameters
    ----------
    remote_fn : str
        Remote filename (for example ``cab0124.dat.gz``).
    local_path : str or pathlib.Path
        Local directory where the file will be stored.
    user, password : str or None, optional
        FTP credentials. When omitted, credentials are read from ``~/.netrc``.
    timeout : int or None, optional
        FTP connection timeout in seconds.

    Returns
    -------
    pathlib.Path
        Path to the downloaded local file.
    """

    site = validate_type(remote_fn[:3], Site)

    if isinstance(local_path, str):
        local_path = Path(local_path)

    if (server := get_option("bsrn.server", None)) is None:
        raise ValueError("BSRN server not specified in config file")

    if user is None or password is None:
        logger.debug("User or password not provided. Using credentials from netrc file")
        if (retrieval := netrc().authenticators(server)) is None:
            raise ValueError(f"credentials for server `{server}` not found in netrc file")
        user = user or retrieval[0]
        password = password or retrieval[2]

    try:
        # open the FTP connection and log in with the provided credentials
        with ftplib.FTP(server, timeout=timeout) as ftp:
            ftp.login(user, password)

            # check if there is a directory for this site on the server
            if site not in list(filter(lambda x: len(x) == 3, ftp.nlst())):
                raise ValueError(f"site `{site}` not found on BSRN server")
            ftp.cwd(site)  # change to site directory

            # check if the requested file is available for download
            if remote_fn not in list(filter(lambda x: x.endswith("dat.gz"), ftp.nlst())):
                raise ValueError(f"file `{remote_fn}` not found on BSRN server")

            # prepare the local path to download the file
            local_path.mkdir(parents=True, exist_ok=True)

            # download the file using FTP's RETR command
            logger.info(f"downloading file `{remote_fn}` from BSRN server")
            with (local_path / remote_fn).open("wb") as f:
                ftp.retrbinary(f"RETR {remote_fn}", f.write)
            logger.success(f"<blue>{remote_fn}</blue> added to {local_path}")

    except ftplib.all_errors as exc:
        # catch login, connection, and other FTP-related errors
        raise ValueError(f"loging error: {exc.args[0]}") from exc

    except OSError as exc:
        # catch network-related errors (e.g., DNS failure, refused connection)
        raise ValueError(f"network error: {exc.strerror}") from exc

    return local_path / remote_fn

get_file_age

get_file_age(path: Path)

Return file age in days.

Parameters:

  • path
    (Path) –

    File path to inspect.

Returns:

  • float

    Age in days, or np.inf when the file does not exist.

Source code in src/solarpandas/origin/bsrn/helpers.py
def get_file_age(path: Path):
    """Return file age in days.

    Parameters
    ----------
    path : pathlib.Path
        File path to inspect.

    Returns
    -------
    float
        Age in days, or ``np.inf`` when the file does not exist.
    """
    if not path.exists():
        return np.inf
    datetime_created = datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc)
    file_age = datetime.now(timezone.utc) - datetime_created
    return file_age.total_seconds() / (24 * 3600)  # seconds to days

inspect_data_availability

inspect_data_availability(
    user: str | None = None,
    password: str | None = None,
    timeout: int | None = 30,
) -> dict[str, list[str]]

Inspect remote FTP availability for all BSRN station directories.

Returns:

  • dict[str, list[str]]

    Mapping from station code to list of available .dat.gz files.

Source code in src/solarpandas/origin/bsrn/helpers.py
def inspect_data_availability(
    user: str | None = None,
    password: str | None = None,
    timeout: int | None = 30,
) -> dict[str, list[str]]:
    """Inspect remote FTP availability for all BSRN station directories.

    Returns
    -------
    dict[str, list[str]]
        Mapping from station code to list of available ``.dat.gz`` files.
    """

    if (server := get_option("bsrn.server", None)) is None:
        raise ValueError("BSRN server not specified in config file")

    if user is None or password is None:
        logger.debug("User or password not provided. Using credentials from netrc file")
        if (retrieval := netrc().authenticators(server)) is None:
            raise ValueError(f"credentials for server `{server}` not found in netrc file")
        user = user or retrieval[0]
        password = password or retrieval[2]

    try:
        # open the FTP connection and log in with the provided credentials
        with ftplib.FTP(server, timeout=timeout) as ftp:
            ftp.login(user, password)

            sites = {}
            for site in sorted(filter(lambda x: len(x) == 3, ftp.nlst())):
                regex = re.compile(r"^{0}/{0}\d{{4}}\.dat\.gz$".format(site))
                files = sorted(list(filter(regex.match, ftp.nlst(site))))
                logger.info(f"<blue>{site=}</blue>: {len(files)} files available")
                sites[site] = files

    except ftplib.all_errors as exc:
        # catch login, connection, and other FTP-related errors
        raise ValueError(f"loging error: {exc.args[0]}") from exc

    except OSError as exc:
        # catch network-related errors (e.g., DNS failure, refused connection)
        raise ValueError(f"network error: {exc.strerror}") from exc

    return sites

lr_parsers

Parsers for BSRN logical records and fixed-width monthly files.

Functions:

fortran_pattern_to_colspecs

fortran_pattern_to_colspecs(fortran_pattern: str)

Convert a compact Fortran-like pattern into fixed-width colspecs.

Parameters:

  • fortran_pattern
    (str) –

    Pattern expression using A, I, F and X tokens.

Returns:

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def fortran_pattern_to_colspecs(fortran_pattern: str):
    """Convert a compact Fortran-like pattern into fixed-width colspecs.

    Parameters
    ----------
    fortran_pattern : str
        Pattern expression using ``A``, ``I``, ``F`` and ``X`` tokens.

    Returns
    -------
    tuple[list[tuple[int, int]], list[Callable]]
        Parsed column boundaries and per-column formatter callables.
    """

    def safe_split(pattern: str) -> list[str]:
        # split by comma, but ignore commas inside parentheses
        safe_pattern = re.sub(r',(?=[^()]*\))', ';', pattern) # temporarily replace commas inside parentheses with semicolons
        return [element.replace(";", ",") for element in safe_pattern.split(',')]

    def expand_compounded_multiplicity(fortran_pattern: str) -> str:
        def expand(pattern):
            # expand patterns like 3(X,I2) into X,I2,X,I2,X,I2
            if match := re.match(r"^(\d+)[(](.+)[)]$", pattern):
                multiplicity, pat = match.groups()
                return ",".join(pat.split(",") * int(multiplicity))
            return pattern
        return ",".join([expand(ele) for ele in safe_split(fortran_pattern)])

    def expand_single_multiplicity(fortran_pattern: str) -> str:
        def expand(pattern):
            # expand patterns like 3X into X,X,X
            if match := re.match(r"^(\d+)(.+)$", pattern):
                multiplicity, pat = match.groups()
                return ",".join([pat] * int(multiplicity))
            return pattern
        return ",".join([expand(ele) for ele in safe_split(fortran_pattern)])

    expanded_pattern = expand_compounded_multiplicity(fortran_pattern)
    expanded_pattern = expand_single_multiplicity(expanded_pattern)

    colspecs = []
    formatters = []
    current_pos = 0
    for element in expanded_pattern.upper().split(","):
        if not (match := re.match(r"^([AFIX])(\d*(?:\.\d+)?)$", element)):
            raise ValueError(f"Invalid fortran pattern: {element}")
        type_, width = match.groups()
        width = 0 if width == "" else width
        if type_ == "A":
            colspecs.append((current_pos, current_pos + int(width)))
            formatters.append(str.strip)
            current_pos += (0 if width == "" else int(width))
        elif type_ == "F":
            if not (match := re.match(r"^(\d+)\.(\d+)$", width)): # validate F width format
                raise ValueError(f"invalid fortran float number pattern: {element}")
            width, _ = match.groups()
            colspecs.append((current_pos, current_pos + int(width)))
            formatters.append(float)
            current_pos += (0 if width == "" else int(width))
        elif type_ == "I":
            colspecs.append((current_pos, current_pos + int(width)))
            formatters.append(int)
            current_pos += (0 if width == "" else int(width))
        elif type_ == "X":
            current_pos += (0 if width == "" else int(width)) + 1
        else:
            raise ValueError(f"unsupported fortran type: {type_}")
    return colspecs, formatters

parse

parse(
    txt: str,
    fortran_pattern: str | None = None,
    colspecs: list[tuple[int, int]] | None = None,
    formatter: Callable | list[Callable] | None = None,
    on_error: Literal["raise", "ignore"] = "raise",
    default: Any = "undefined",
) -> list[Any]

Parse one fixed-width text line into typed values.

Parameters:

  • txt
    (str) –

    Input line to parse.

  • fortran_pattern
    (str or None, default: None ) –

    Fortran-like pattern used to infer colspecs.

  • colspecs
    (list[tuple[int, int]] or None, default: None ) –

    Explicit fixed-width column boundaries.

  • formatter
    (Callable or list[Callable] or None, default: None ) –

    Value formatter(s) for extracted fields.

  • on_error
    (('raise', ignore), default: "raise" ) –

    Error strategy while parsing or formatting values.

  • default
    (Any, default: "undefined" ) –

    Fallback value used when on_error='ignore'.

Returns:

  • list[Any]

    Parsed and formatted values.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse(
    txt: str,
    fortran_pattern: str | None = None,
    colspecs: list[tuple[int, int]] | None = None,
    formatter: Callable | list[Callable] | None = None,
    on_error: Literal["raise", "ignore"] = "raise",
    default: Any = "undefined"
) -> list[Any]:
    """Parse one fixed-width text line into typed values.

    Parameters
    ----------
    txt : str
        Input line to parse.
    fortran_pattern : str or None, optional
        Fortran-like pattern used to infer ``colspecs``.
    colspecs : list[tuple[int, int]] or None, optional
        Explicit fixed-width column boundaries.
    formatter : Callable or list[Callable] or None, optional
        Value formatter(s) for extracted fields.
    on_error : {"raise", "ignore"}, default "raise"
        Error strategy while parsing or formatting values.
    default : Any, default "undefined"
        Fallback value used when ``on_error='ignore'``.

    Returns
    -------
    list[Any]
        Parsed and formatted values.
    """

    width = max(len("line being parsed"), len("fortran pattern"), len("colspecs"), len("formatter"))
    logger.debug(f"{'line being parsed':>{width}}: <green>{txt}</green>")

    if fortran_pattern is not None and colspecs is None:
        logger.debug(f"{'fortran pattern':>{width}}: {fortran_pattern}")
        colspecs, formatter_ = fortran_pattern_to_colspecs(fortran_pattern)
        if formatter is None:
            formatter = formatter_

    logger.debug(f"{'colspecs':>{width}}: {colspecs}")
    logger.debug(f"{'formatter':>{width}}: {formatter}")

    try:
        values = [txt[start:end+1] for start, end in colspecs]
    except Exception as e:
        msg = f"there was an error parsing line:\n{txt}\nwith colspecs: {colspecs}"
        if on_error == "raise":
            raise ValueError(msg) from e
        values = [default] * len(colspecs)
        logger.debug(f"      values: {values}")
        logger.warning(msg + ". Returning dummy values.")
        return values

    if formatter is not None:
        if isinstance(formatter, Callable):
            formatter = [formatter] * len(colspecs)
        formatted_values = []
        for k, (fmt, value) in enumerate(zip(formatter, values)):
            try:
                formatted_values.append(fmt(value))
            except Exception as e:
                msg = (f"there was an error applying formatter to element {k}-th in line:\n"
                       f"{txt}\nwith colspecs: {colspecs}, formatter {fmt} and value `{value}`")
                if on_error == "raise":
                    raise ValueError(msg) from e
                formatted_values.append(default)
                logger.warning(msg + ". Returning dummy value for this element.")
        values = formatted_values
    logger.debug(f"      values: {values}")
    return values

parse_logical_record_0001

parse_logical_record_0001(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0001 block with basic station and quantity metadata.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0001(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0001 block with basic station and quantity metadata."""
    elements = {}
    ilines = iter(lines)

    names = ("station_id", "month", "year", "data_version")
    values = parse(next(ilines), fortran_pattern="2(X,I2),X,I4,X,I2")
    elements.update({name: value for name, value in zip(names, values)})

    # second and following data lines
    elements['quantity_measured'] = []
    for line in ilines:
        values = parse(line, fortran_pattern="8(X,I9)")
        elements['quantity_measured'].extend([tables.TableA3[qty_id] for qty_id in values if qty_id in tables.TableA3])

    return elements

parse_logical_record_0002

parse_logical_record_0002(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0002 block with scientist and deputy contact information.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0002(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0002 block with scientist and deputy contact information."""
    elements = {}
    ilines = iter(lines)

    values = parse(next(ilines), fortran_pattern="3(X,I2)")
    elements['scientist_changed_on'] = dict(zip(('day', 'hour', 'minute'), values))

    names = ("scientist_name", "scientist_telephone", "scientist_fax")
    values = parse(next(ilines), fortran_pattern="A38,X,A20,X,A20")
    elements.update({name: value for name, value in zip(names, values)})

    names = ("scientist_tcp/ip", "scientist_email")
    values = parse(next(ilines), fortran_pattern="A15,X,A50")
    elements.update({name: value for name, value in zip(names, values)})

    names = ("scientist_address",)
    values = parse(next(ilines), fortran_pattern="A80")
    elements.update({name: value for name, value in zip(names, values)})

    values = parse(next(ilines), fortran_pattern="3(X,I2)")
    elements['deputy_changed_on'] = dict(zip(('day', 'hour', 'minute'), values))

    names = ("deputy_name", "deputy_telephone", "deputy_fax")
    values = parse(next(ilines), fortran_pattern="A38,X,A20,X,A20")
    elements.update({name: value for name, value in zip(names, values)})

    names = ("deputy_tcp/ip", "deputy_email")
    values = parse(next(ilines), fortran_pattern="A15,X,A50")
    elements.update({name: value for name, value in zip(names, values)})

    names = ("deputy_address",)
    values = parse(next(ilines), fortran_pattern="A80")
    elements.update({name: value for name, value in zip(names, values)})

    return elements

parse_logical_record_0003

parse_logical_record_0003(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0003 free-text message block.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0003(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0003 free-text message block."""
    return {'message': parse(lines[0], fortran_pattern="A80")[0]}

parse_logical_record_0004

parse_logical_record_0004(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0004 station site metadata and horizon profile.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0004(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0004 station site metadata and horizon profile."""
    elements = {}
    ilines = iter(lines)

    values = parse(next(ilines), fortran_pattern="3(X,I2)")
    elements['scientist_changed_on'] = dict(zip(('day', 'hour', 'minute'), values))

    values = parse(next(ilines), fortran_pattern="2(X,I2)")
    elements["surface_type"] = tables.TableA4.get(values[0], f"unknown surface type {values[0]}")
    elements["topography_type"] = tables.TableA5.get(values[1], f"unknown topography type {values[1]}")

    elements['station_address'] = parse(next(ilines), fortran_pattern="A80")[0]

    names = ("station_telephone", "station_fax")
    values = parse(next(ilines), fortran_pattern="A20,X,A20")
    elements.update({name: value for name, value in zip(names, values)})

    names = ("station_tcp/ip", "station_email")
    values = parse(next(ilines), fortran_pattern="A15,X,A50")
    elements.update({name: value for name, value in zip(names, values)})

    values = parse(next(ilines), fortran_pattern="2(X,F7.3),X,I4,X,A5")
    elements['latitude'] = values[0] - 90.
    elements['longitude'] = values[1] - 180.
    elements['altitude'] = values[2]
    elements['synop_id'] = values[3]

    values = parse(next(ilines), fortran_pattern="3(X,I2)")
    elements['horizon_changed_on'] = dict(zip(('day', 'hour', 'minute'), values))

    values = [parse(line, fortran_pattern="11(X,I3,X,I2)") for line in ilines]
    values = [e for e in functools.reduce(lambda a, b: a + b, values) if e != -1]
    elements['horizon_azimuth'] = values[0::2]
    elements['horizon_elevation'] = values[1::2]

    return elements

parse_logical_record_0005

parse_logical_record_0005(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0005 radiosonde instrumentation metadata.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0005(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0005 radiosonde instrumentation metadata."""
    elements = {}
    ilines = iter(lines)

    values = parse(next(ilines), fortran_pattern="3(X,I2),X,A1")
    elements["radiosonde_changed_on"] = dict(zip(('day', 'hour', 'minute'), values[:3]))
    elements["radiosonde_operating"] = values[3].casefold() == "y"

    names = ("radiosonde_manufacturer", "radiosonde_location", "radiosonde_distance_km",
             "radiosonde_hUTC_1st_launch", "radiosonde_hUTC_2nd_launch", "radiosonde_hUTC_3rd_launch",
             "radiosonde_hUTC_4th_launch", "radiosonde_id")
    values = parse(next(ilines), fortran_pattern="A30,X,A25,X,I3,4(X,I2),X,A5")
    elements.update({name: value for name, value in zip(names, values)})

    values = parse(next(ilines), fortran_pattern="A80")
    elements["radiosonde_remarks"] = values[0]

    return elements

parse_logical_record_0006

parse_logical_record_0006(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0006 ozone instrumentation metadata.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0006(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0006 ozone instrumentation metadata."""
    elements = {}
    ilines = iter(lines)

    values = parse(next(ilines), fortran_pattern="3(X,I2),X,A1")
    elements["ozone_changed_on"] = dict(zip(('day', 'hour', 'minute'), values[:3]))
    elements["ozone_operating"] = values[3].casefold() == "y"

    names = ("ozone_manufacturer", "ozone_location", "ozone_distance_km", "ozone_id")
    values = parse(next(ilines), fortran_pattern="A30,X,A25,X,I3,X,I5")
    elements.update({name: value for name, value in zip(names, values)})

    values = parse(next(ilines), fortran_pattern="A80")
    elements["ozone_remarks"] = values[0]

    return elements

parse_logical_record_0007

parse_logical_record_0007(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0007 station history metadata block.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0007(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0007 station history metadata block."""
    elements = {}
    ilines = iter(lines)

    values = parse(next(ilines), fortran_pattern="3(X,I2)")
    elements['station_history_changed_on'] = dict(zip(('day', 'hour', 'minute'), values))

    values = parse(next(ilines), fortran_pattern="A80")
    elements['station_history_cloud_amount'] = values[0]

    # method est. cloud base height (with instrument)
    values = parse(next(ilines), fortran_pattern="A80")
    elements['station_history_cloud_base_height'] = values[0]

    # method est. cloud liquid water content
    values = parse(next(ilines), fortran_pattern="A80")
    elements['station_history_cloud_liquid_water_content'] = values[0]

    # method est. cloud aerosol vertical distribution
    values = parse(next(ilines), fortran_pattern="A80")
    elements['station_history_aerosol_vertical_distribution'] = values[0]

    # method est. water vapor press
    values = parse(next(ilines), fortran_pattern="A80")
    elements['station_history_water_vapor_pressure'] = values[0]

    # 6 flags indicating if the SYNOP and/or the corresponding
    # quantities of the expanded programme are measured
    values = parse(next(ilines), fortran_pattern="6(X,A1)")
    elements['station_history_synop_flags'] = [flag.casefold() == "y" for flag in values]

    return elements

parse_logical_record_0008

parse_logical_record_0008(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0008 radiation instrument calibration metadata.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0008(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0008 radiation instrument calibration metadata."""
    elements = {}
    ilines = iter(lines)

    elements['instruments'] = []

    while True:
        try:
            line = next(ilines)
        except StopIteration:
            break

        instrument = {}
        values = parse(line, fortran_pattern="3(X,I2)")
        instrument['changed_on'] = dict(zip(('day', 'hour', 'minute'), values))

        names = ("manufacturer", "model", "serial_number", "purchase_date", "wrmc_id")
        values = parse(next(ilines), fortran_pattern="A30,X,A15,X,A18,X,A8,X,I5")
        instrument.update({name: value for name, value in zip(names, values)})

        values = parse(next(ilines), fortran_pattern="A80")
        instrument['remarks'] = values[0]

        names = ("pyrgeometer_body_compensation_code", "pyrgeometer_dome_compensation_code",
                 "wavelength_of_band_1", "bandwidth_of_band_1", "wavelength_of_band_2", "bandwidth_of_band_2",
                 "wavelength_of_band_3", "bandwidth_of_band_3", "max_xx_zenith_angle_direct_degrees",
                 "min_xx_spectral_instrument")
        values = parse(next(ilines), fortran_pattern="2(X,I2),6(X,F7.3),2(X,I2)")
        instrument.update({name: value for name, value in zip(names, values)})

        names = ("location_of_calibration", "person_doing_calibration")
        values = parse(next(ilines), fortran_pattern="A30,X,A40")
        instrument.update({name: value for name, value in zip(names, values)})

        names = ("start_of_calibration_period_of_band_1", "end_of_calibration_period_of_band_1",
                 "number_of_comparisons_of_band_1", "mean_calibration_coefficient_of_band_1",
                 "standard_error_of_calibration_coefficient_of_band_1")
        values = parse(next(ilines), fortran_pattern="2(A8,X),I2,2(X,F12.4)")
        instrument.update({name: value for name, value in zip(names, values)})

        names = ("start_of_calibration_period_of_band_2", "end_of_calibration_period_of_band_2",
                 "number_of_comparisons_of_band_2", "mean_calibration_coefficient_of_band_2",
                 "standard_error_of_calibration_coefficient_of_band_2")
        values = parse(next(ilines), fortran_pattern="2(A8,X),I2,2(X,F12.4)")
        instrument.update({name: value for name, value in zip(names, values)})

        names = ("start_of_calibration_period_of_band_3", "end_of_calibration_period_of_band_3",
                 "number_of_comparisons_of_band_3", "mean_calibration_coefficient_of_band_3",
                 "standard_error_of_calibration_coefficient_of_band_3")
        values = parse(next(ilines), fortran_pattern="2(A8,X),I2,2(X,F12.4)")
        instrument.update({name: value for name, value in zip(names, values)})

        remarks1 =parse(next(ilines), fortran_pattern="A80")[0]
        remarks2 = parse(next(ilines), fortran_pattern="A80")[0]
        instrument['remarks'] = "\n".join([remarks1, remarks2])

        elements['instruments'].append(instrument)

    return elements

parse_logical_record_0009

parse_logical_record_0009(
    lines: list[str], **kwargs
) -> dict[str, Any]

Parse LR0009 quantity-to-instrument assignment metadata.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0009(lines: list[str], **kwargs) -> dict[str, Any]:
    """Parse LR0009 quantity-to-instrument assignment metadata."""
    elements = {}
    ilines = iter(lines)

    elements['quantities'] = []

    while True:
        try:
            line = next(ilines)
        except StopIteration:
            break

        quantity = {}
        values = parse(line, fortran_pattern="3(X,I2),X,I9,X,I5,X,I2")
        quantity['changed_on'] = dict(zip(('day', 'hour', 'minute'), values[:3]))
        quantity['quantity_measured'] = tables.TableA3.get(values[3], f"unknown quantity with id {values[3]}")
        quantity['instrument_id'] = values[4]
        quantity['spectral_band_id'] = values[5]
        elements['quantities'].append(quantity)

    return elements

parse_logical_record_0100

parse_logical_record_0100(
    lines: list[str], **kwargs
) -> DataFrame

Parse LR0100 block with basic radiation and meteorological measurements.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0100(lines: list[str], **kwargs) -> pd.DataFrame:
    """Parse LR0100 block with basic radiation and meteorological measurements."""

    def warn(msg: str, day: int | None = None):
        header = ""
        if day is not None:
            header += f"<red>LR0100 @ day {day}</red>"
        if day is not None and "path" in kwargs:
            header += f" <red>in {kwargs['path'].name}</red>"
        logger.warning(f"{header}: {msg}")

    def check_day_consistency(df_day: pd.DataFrame) -> pd.DataFrame:

        day_number = df_day.name
        df = df_day.sort_values("minute")

        # normally, the dat.gz files start every day at minute 0 and end at minute 1439,
        # but in some cases (e.g., cam1008.dat.gz) they start at minute 1 and end at minute
        # which can break the logic if the data is read line by line. Hence, I am swithing
        # to read the data in daily blocks of 1440 records
        if (df.iloc[0]["minute"] == 1) and (df.iloc[-1]["minute"] == 1440) and (len(df) == 1440):
            df["minute"] = df["minute"] - 1  # Ajustamos para que el minuto 1 corresponda a 00:00
            warn("minute values start at 1 and end at 1440. Adjusting to start at 0 and end at 1439. "
                 "This happends at some dat.gz files, e.g., cam1008.dat.gz", day_number)

        # drop records with minute values outside the range [0, 1440)
        if not (legal := df["minute"].between(0, 1440, inclusive="left")).all():
            warn(f"minute values are not between 0 and 1439. Skipping {(~legal).sum()} records with "
                 "invalid minute values.", day_number)
            df = df.loc[legal]

        # add hour[0, 23] and minute[0, 59] columns
        hour, minute = np.divmod(df["minute"], 60)
        df["hour"] = hour
        df["minute"] = minute
        df = df.get(["hour", "minute"] + df.columns.drop(["hour", "minute"]).tolist())

        # drop records with hour values outside the range [0, 24)
        if not (legal := df["hour"].between(0, 24, inclusive="left")).all():
            warn(f"hour values are not between 0 and 23. Skipping {(~legal).sum()} records with "
                 "invalid hour values.", day_number)
            df = df.loc[legal]

        # drop records with minute values outside the range [0, 60)
        if not (legal := df["minute"].between(0, 60, inclusive="left")).all():
            warn(f"minute values are not between 0 and 59. Skipping {(~legal).sum()} records with "
                 "invalid minute values.", day_number)
            df = df.loc[legal]

        return df

    COLUMNS_LINE_1 = (
        'day', 'minute', 'global_horizontal_avg', 'global_horizontal_std',
        'global_horizontal_min', 'global_horizontal_max', 'direct_normal_avg',
        'direct_normal_std', 'direct_normal_min', 'direct_normal_max')

    COLUMNS_LINE_2 = (
        'diffuse_horizontal_avg', 'diffuse_horizontal_std', 'diffuse_horizontal_min',
        'diffuse_horizontal_max', 'downward_longwave_avg', 'downward_longwave_std',
        'downward_longwave_min', 'downward_longwave_max', 'air_temperature',
        'relative_humidity', 'atmospheric_pressure')

    buffer = StringIO("\n".join(lines[::2]))
    colspecs, _ = fortran_pattern_to_colspecs("X,I2,X,I4,2(3X,I4,X,F5.1,X,I4,X,I4)")
    line_1 = (pd.read_fwf(buffer, colspecs=colspecs, header=None, na_values=[-999, -99.9])
              .set_axis(COLUMNS_LINE_1, axis=1))

    buffer = StringIO("\n".join(lines[1::2]))
    colspecs, _ = fortran_pattern_to_colspecs("8X,2(3X,I4,X,F5.1,X,I4,X,I4),4X,2(F5.1,X),I4")
    line_2 = (pd.read_fwf(buffer, colspecs=colspecs, header=None, na_values=[-999, -99.9])
              .set_axis(COLUMNS_LINE_2, axis=1))

    data = pd.concat([line_1, line_2], axis=1)
    data = data.groupby("day").apply(lambda df: check_day_consistency(df)).reset_index("day")
    data = data.loc[data["day"].between(1, 31, inclusive="both")]

    return data

parse_logical_record_0300

parse_logical_record_0300(
    lines: list[str], **kwargs
) -> DataFrame

Parse LR0300 block with reflected, upward and net radiation measurements.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0300(lines: list[str], **kwargs) -> pd.DataFrame:
    """Parse LR0300 block with reflected, upward and net radiation measurements."""

    def warn(msg: str, day: int | None = None):
        logger.warning(msg if day is None else f"<red>LR0300 @ day{day}</red>: {msg}")

    def check_day_consistency(df_day: pd.DataFrame) -> pd.DataFrame:

        day_number = df_day.name
        df = df_day.sort_values("minute")

        # normally, the dat.gz files start every day at minute 0 and end at minute 1439,
        # but in some cases (e.g., cam1008.dat.gz) they start at minute 1 and end at minute
        # which can break the logic if the data is read line by line. Hence, I am swithing
        # to read the data in daily blocks of 1440 records
        if (df.iloc[0]["minute"] == 1) and (df.iloc[-1]["minute"] == 1440) and (len(df) == 1440):
            df["minute"] = df["minute"] - 1  # Ajustamos para que el minuto 1 corresponda a 00:00
            warn("minute values start at 1 and end at 1440. Adjusting to start at 0 and end at 1439. "
                 "This happends at some dat.gz files, e.g., cam1008.dat.gz", day_number)

        # drop records with minute values outside the range [0, 1440)
        if not (legal := df["minute"].between(0, 1440, inclusive="left")).all():
            warn(f"minute values are not between 0 and 1439. Skipping {(~legal).sum()} records with "
                 "invalid minute values.", day_number)
            df = df.loc[legal]

        # add hour[0, 23] and minute[0, 59] columns
        hour, minute = np.divmod(df["minute"], 60)
        df["hour"] = hour
        df["minute"] = minute
        df = df.get(["hour", "minute"] + df.columns.drop(["hour", "minute"]).tolist())

        # drop records with hour values outside the range [0, 24)
        if not (legal := df["hour"].between(0, 24, inclusive="left")).all():
            warn(f"hour values are not between 0 and 23. Skipping {(~legal).sum()} records with "
                 "invalid hour values.", day_number)
            df = df.loc[legal]

        # drop records with minute values outside the range [0, 60)
        if not (legal := df["minute"].between(0, 60, inclusive="left")).all():
            warn(f"minute values are not between 0 and 59. Skipping {(~legal).sum()} records with "
                 "invalid minute values.", day_number)
            df = df.loc[legal]

        return df

    COLUMNS = (
        'day', 'minute',
        'upward_shortwave_reflected_avg', 'upward_shortwave_reflected_std',
        'upward_shortwave_reflected_min', "upward_shortwave_reflected_max",
        'upward_longwave_avg', 'upward_longwave_std', "upward_longwave_min",
        'upward_longwave_max', 'net_radiation_avg', "net_radiation_std",
        "net_radiation_min", "net_radiation_max")

    buffer = StringIO("\n".join(lines))
    colspecs, _ = fortran_pattern_to_colspecs("X,I2,X,I4,3(3X,I4,X,F5.1,X,I4,X,I4)")
    data = (pd.read_fwf(buffer, colspecs=colspecs, header=None, na_values=[-999, -99.9])
            .set_axis(COLUMNS, axis=1))

    data = data.groupby("day").apply(lambda df: check_day_consistency(df)).reset_index("day")
    data = data.loc[data["day"].between(1, 31, inclusive="both")]

    return data

parse_logical_record_0500

parse_logical_record_0500(
    lines: list[str], **kwargs
) -> DataFrame

Parse LR0500 block with UV radiation measurements.

Source code in src/solarpandas/origin/bsrn/lr_parsers.py
def parse_logical_record_0500(lines: list[str], **kwargs) -> pd.DataFrame:
    """Parse LR0500 block with UV radiation measurements."""

    def warn(msg: str, day: int | None = None):
        logger.warning(msg if day is None else f"<red>LR0500 @ day{day}</red>: {msg}")

    def check_day_consistency(df_day: pd.DataFrame) -> pd.DataFrame:

        day_number = df_day.name
        df = df_day.sort_values("minute")

        # normally, the dat.gz files start every day at minute 0 and end at minute 1439,
        # but in some cases (e.g., cam1008.dat.gz) they start at minute 1 and end at minute
        # which can break the logic if the data is read line by line. Hence, I am swithing
        # to read the data in daily blocks of 1440 records
        if (df.iloc[0]["minute"] == 1) and (df.iloc[-1]["minute"] == 1440) and (len(df) == 1440):
            df["minute"] = df["minute"] - 1  # Ajustamos para que el minuto 1 corresponda a 00:00
            warn("minute values start at 1 and end at 1440. Adjusting to start at 0 and end at 1439. "
                 "This happends at some dat.gz files, e.g., cam1008.dat.gz", day_number)

        # drop records with minute values outside the range [0, 1440)
        if not (legal := df["minute"].between(0, 1440, inclusive="left")).all():
            warn(f"minute values are not between 0 and 1439. Skipping {(~legal).sum()} records with "
                 "invalid minute values.", day_number)
            df = df.loc[legal]

        # add hour[0, 23] and minute[0, 59] columns
        hour, minute = np.divmod(df["minute"], 60)
        df["hour"] = hour
        df["minute"] = minute
        df = df.get(["hour", "minute"] + df.columns.drop(["hour", "minute"]).tolist())

        # drop records with hour values outside the range [0, 24)
        if not (legal := df["hour"].between(0, 24, inclusive="left")).all():
            warn(f"hour values are not between 0 and 23. Skipping {(~legal).sum()} records with "
                 "invalid hour values.", day_number)
            df = df.loc[legal]

        # drop records with minute values outside the range [0, 60)
        if not (legal := df["minute"].between(0, 60, inclusive="left")).all():
            warn(f"minute values are not between 0 and 59. Skipping {(~legal).sum()} records with "
                 "invalid minute values.", day_number)
            df = df.loc[legal]

        return df

    COLUMNS_LINE_1 = (
        'day', 'minute',
        "uva_global_avg", "uva_global_std", "uva_global_min", "uva_global_max",
        "uvb_direct_avg", "uvb_direct_std", "uvb_direct_min", "uvb_direct_max")

    COLUMNS_LINE_2 = (
        'uvb_global_avg', 'uvb_global_std', 'uvb_global_min', 'uvb_global_max',
        'uvb_diffuse_avg', 'uvb_diffuse_std', 'uvb_diffuse_min', 'uvb_diffuse_max',
        'uvb_reflected_avg', 'uvb_reflected_std', 'uvb_reflected_min', 'uvb_reflected_max')

    buffer = StringIO("\n".join(lines[::2]))
    colspecs, _ = fortran_pattern_to_colspecs("X,I2,X,I4,8(X,F5.1)")
    line_1 = (pd.read_fwf(buffer, colspecs=colspecs, header=None, na_values=[-999, -99.9])
              .set_axis(COLUMNS_LINE_1, axis=1))

    buffer = StringIO("\n".join(lines[1::2]))
    colspecs, _ = fortran_pattern_to_colspecs("8X,12(X,F5.1)")
    line_2 = (pd.read_fwf(buffer, colspecs=colspecs, header=None, na_values=[-999, -99.9])
              .set_axis(COLUMNS_LINE_2, axis=1))

    data = pd.concat([line_1, line_2], axis=1)
    data = data.groupby("day").apply(lambda df: check_day_consistency(df)).reset_index("day")
    data = data.loc[data["day"].between(1, 31, inclusive="both")]

    return data

utils

Low-level utility functions shared by BSRN readers and parsers.

Functions:

guess_time_resolution

guess_time_resolution(df_or_series)

Infer sampling step from a datetime index.

Parameters:

Returns:

  • Timedelta or None

    Inferred time step, or None when no robust estimate is possible.

Notes

First attempts :func:pandas.infer_freq; if unavailable, falls back to the minimum observed lag and validates that it can reconstruct the index.

Source code in src/solarpandas/origin/bsrn/utils.py
def guess_time_resolution(df_or_series):  # , enable_warnings=True):
    """Infer sampling step from a datetime index.

    Parameters
    ----------
    df_or_series : pandas.DataFrame or pandas.Series
        Object with datetime-like index.

    Returns
    -------
    pandas.Timedelta or None
        Inferred time step, or ``None`` when no robust estimate is possible.

    Notes
    -----
    First attempts :func:`pandas.infer_freq`; if unavailable, falls back to the
    minimum observed lag and validates that it can reconstruct the index.
    """

    index = df_or_series.index

    if (inferred_freq := pd.infer_freq(index)) is not None:
        return pd.to_timedelta(to_offset(inferred_freq))

    # for most cases, the following should work. If there are many missings
    # in the series, little can be done...
    step = pd.to_timedelta(np.diff(index.to_numpy()).min())
    reconstructed_index = pd.date_range(index[0], index[-1], freq=step)
    if index.isin(reconstructed_index).all():
        return step

    return None

time_interpolation

time_interpolation(
    data: DataFrame, new_index: DatetimeIndex
)

Interpolate data in time over a target index.

Parameters:

  • data
    (DataFrame) –

    Input data indexed by timestamps.

  • new_index
    (DatetimeIndex) –

    Target index to interpolate onto.

Returns:

  • DataFrame

    Interpolated dataframe aligned to new_index.

Source code in src/solarpandas/origin/bsrn/utils.py
def time_interpolation(data: pd.DataFrame, new_index: pd.DatetimeIndex):
    """Interpolate data in time over a target index.

    Parameters
    ----------
    data : pandas.DataFrame
        Input data indexed by timestamps.
    new_index : pandas.DatetimeIndex
        Target index to interpolate onto.

    Returns
    -------
    pandas.DataFrame
        Interpolated dataframe aligned to ``new_index``.
    """
    # time interpolation
    extended_index = data.index.append(new_index).sort_values()
    new_data = data.reindex(extended_index).interpolate(method='time', limit=1)
    # drop duplicated indices
    new_data = new_data.reset_index()
    index_name = new_data.index.name or 'index'
    new_data = new_data.drop_duplicates(subset=index_name, keep='first')
    new_data.index = new_data[index_name]
    new_data.drop(columns=index_name, inplace=True)
    new_data = new_data.reindex(new_index)
    return new_data

types

Annotated type definitions used by the BSRN origin interface.

Types and Validation

annotated

Annotated domain types and validation entry points used by solarpandas.

Classes:

  • ValidaRange

    Validator for numeric ranges using inclusive and exclusive bounds.

  • ValidaRegex

    Validator for values constrained by a regular-expression pattern.

Functions:

  • validate_type

    Validate a value against an Annotated type alias.

ValidaRange dataclass

ValidaRange(
    le: float | None = None,
    lt: float | None = None,
    ge: float | None = None,
    gt: float | None = None,
)

Validator for numeric ranges using inclusive and exclusive bounds.

Methods:

  • validate

    Validate that a value can be cast to float and satisfies bounds.

validate
validate(value, annotated_type)

Validate that a value can be cast to float and satisfies bounds.

Source code in src/solarpandas/types/annotated.py
def validate(self, value, annotated_type):
    """Validate that a value can be cast to float and satisfies bounds."""
    try:
        value = float(value)
    except Exception:
        raise TypeError(f"{annotated_type.__name__} must be a number")
    if (self.le is not None) and (value > self.le):
        raise ValueError(f"{annotated_type.__name__} must be less or equal than {self.le}")
    if (self.lt is not None) and (value >= self.lt):
        raise ValueError(f"{annotated_type.__name__} must be less than {self.lt}")
    if (self.ge is not None) and (value < self.ge):
        raise ValueError(f"{annotated_type.__name__} must be greater or equal than {self.ge}")
    if (self.gt is not None) and (value <= self.gt):
        raise ValueError(f"{annotated_type.__name__} must be greater than {self.gt}")
    return value

ValidaRegex dataclass

ValidaRegex(pattern: str)

Validator for values constrained by a regular-expression pattern.

Methods:

  • validate

    Validate a string value against the configured regex pattern.

validate
validate(value, annotated_type)

Validate a string value against the configured regex pattern.

Source code in src/solarpandas/types/annotated.py
def validate(self, value, annotated_type):
    """Validate a string value against the configured regex pattern."""
    if not isinstance(value, str):
        raise TypeError(f"{annotated_type.__name__} must be a string")
    if not re.match(self.pattern, value):
        raise ValueError(f"{annotated_type.__name__} must match the regex pattern: {self.pattern}")
    return value

validate_type

validate_type(value, annotated_type)

Validate a value against an Annotated type alias.

Parameters:

  • value
    (Any) –

    Input value to validate.

  • annotated_type
    (Any) –

    Type alias defined as Annotated[base_type, Validator(...)].

Returns:

  • Any

    Validated value, or None when value is None.

Examples:

>>> from solarpandas.types import Latitude, validate_type
>>> validate_type(37.2, Latitude)
37.2
Source code in src/solarpandas/types/annotated.py
def validate_type(value, annotated_type):
    """Validate a value against an ``Annotated`` type alias.

    Parameters
    ----------
    value : Any
        Input value to validate.
    annotated_type : Any
        Type alias defined as ``Annotated[base_type, Validator(...)]``.

    Returns
    -------
    Any
        Validated value, or ``None`` when ``value`` is ``None``.

    Examples
    --------
    >>> from solarpandas.types import Latitude, validate_type
    >>> validate_type(37.2, Latitude)
    37.2
    """
    if value is not None:
        anntype_value = annotated_type.__value__
        if not hasattr(anntype_value, "__origin__") or get_origin(anntype_value) is not Annotated:
            raise TypeError(f"{annotated_type} is not an Annotated type")
        _, validator = get_args(anntype_value)
        return validator.validate(value, annotated_type)
    return None

qcflag

Pandas extension dtype and array implementation for quality-control flags.

Classes:

  • QCFlagArray

    ExtensionArray for QC flag values: -1, 0, 1 (or NA).

  • QCFlagDtype

    Dtype for QC flag arrays.

  • QCFlagEnum

    Enumeration of QC flag semantics used across the package.

QCFlagArray

QCFlagArray(values: ndarray, copy: bool = False)

Bases: ExtensionArray

ExtensionArray for QC flag values: -1, 0, 1 (or NA).

Internally stored as int8 with -128 as a sentinel for NA.

Properties

fails : ndarray[bool] True where the flag value is -1. passes : ndarray[bool] True where the flag value is 1. not_verifiable : ndarray[bool] True where the flag value is 0.

Examples:

>>> arr = QCFlagArray._from_sequence([-1, 0, 1, None])
>>> arr.fails.any()
True

values : np.ndarray of int8 Raw storage array. Use _from_sequence for public construction.

Attributes:

Source code in src/solarpandas/types/qcflag.py
def __init__(self, values: np.ndarray, copy: bool = False) -> None:
    """
    Parameters
    ----------
    values : np.ndarray of int8
        Raw storage array. Use _from_sequence for public construction.
    """
    if not isinstance(values, np.ndarray) or values.dtype != np.int8:
        raise TypeError("values must be a np.ndarray of dtype int8")
    na_mask = values == _NA_SENTINEL
    if (invalid := ~na_mask & ~np.isin(values, _VALID_VALUES)).any():
        raise ValueError(
            f"values must be -1, 0, 1 or NA; got {np.unique(values[invalid])}"
        )
    self._data = values.copy() if copy else values
fails property
fails: ndarray

Boolean mask: True where the flag is -1 (failed).

not_verifiable property
not_verifiable: ndarray

Boolean mask: True where the flag is 0 (not verifiable).

passes property
passes: ndarray

Boolean mask: True where the flag is 1 (passed).

QCFlagDtype

Bases: ExtensionDtype

Dtype for QC flag arrays.

Valid values: -1 (fail), 0 (not verifiable), 1 (passed). NA is represented internally as -128.

Examples:

>>> str(QCFlagDtype())
'QCFlagDtype()'

QCFlagEnum

Bases: IntEnum

Enumeration of QC flag semantics used across the package.

Values

FAILED : -1 Measurement failed the corresponding QC test. NOT_VERIFIABLE : 0 Measurement could not be verified by the test. PASSED : 1 Measurement passed the corresponding QC test.

Examples:

>>> QCFlagEnum.FAILED.value
-1

Methods:

  • values

    Return all valid flag values as a list of ints.

values classmethod
values() -> list[int]

Return all valid flag values as a list of ints.

Source code in src/solarpandas/types/qcflag.py
@classmethod
def values(cls) -> list[int]:
    """Return all valid flag values as a list of ints."""
    return [e.value for e in cls]

validate

Validation helpers and annotated domain types used by solarpandas.

This module provides validator classes to validate strings and numeric values, plus convenience Annotated aliases for common geospatial and SoDA inputs. The helper :func:validate_type executes validator instances attached to an annotated alias declared with the type statement.

Type Aliases:

  • Elevation

    Surface elevation/altitude validator.

  • Latitude

    Geographic latitude coordinate validator.

  • Longitude

    Geographic longitude coordinate validator.

  • SodaStream

    Available data streams from the SoDA service.

  • SodaTimeStep

    Temporal resolution for SoDA API requests.

Classes:

  • ValidaChoices

    Validator for a fixed set of string choices with fuzzy matching.

  • ValidaRange

    Validator for numerical ranges.

  • ValidaRegex

    Validator for string patterns using regular expressions.

Functions:

  • validate_type

    Validate a value against an Annotated type definition.

Elevation

Elevation = float

Surface elevation/altitude validator.

Validates elevation in meters above sea level. Range: -450m < elev < 8900m. Covers from Dead Sea (-430m) to Mt. Everest (8849m).

Examples:

>>> from solarpandas.validate import Elevation, validate_type
>>> validate_type(667, Elevation)
667.0

Latitude

Latitude = float

Geographic latitude coordinate validator.

Validates latitude values in decimal degrees. Range: -90° < lat < 90° (exclusive).

Examples:

>>> from solarpandas.validate import Latitude, validate_type
>>> validate_type(40.4168, Latitude)
40.4168

Longitude

Longitude = float

Geographic longitude coordinate validator.

Validates longitude values in decimal degrees. Range: -180° ≤ lon < 180°.

Examples:

>>> from solarpandas.validate import Longitude, validate_type
>>> validate_type(-3.7038, Longitude)
-3.7038

SodaStream

SodaStream = str

Available data streams from the SoDA service.

Notes

Allowed values are mcclear (McClear clear-sky model) and cams_radiation (CAMS all-sky service).

Examples:

>>> from solarpandas.validate import SodaStream, validate_type
>>> validate_type("mcclear", SodaStream)
'mcclear'

SodaTimeStep

SodaTimeStep = str

Temporal resolution for SoDA API requests.

Notes

Allowed values use ISO 8601 duration format: PT01M, PT15M, PT01H, PT01D and P01M.

Examples:

>>> from solarpandas.validate import SodaTimeStep, validate_type
>>> validate_type("PT01H", SodaTimeStep)
'PT01H'

ValidaChoices dataclass

ValidaChoices(
    choices: list[str],
    parser: Callable[[str], str] | None = None,
)

Validator for a fixed set of string choices with fuzzy matching.

This validator performs case-insensitive matching against a list of allowed values. If an exact match (ignoring case) is not found, it attempts fuzzy matching to correct potential typos. A warning is issued when fuzzy matching is used.

Parameters:

  • choices
    (list[str]) –

    Allowed canonical values.

  • parser
    (Callable[[str], str] or None, default: None ) –

    Optional callable applied after successful validation.

Notes

Matching is case-insensitive. If no direct case-insensitive match is found, fuzzy matching is attempted with a 0.4 similarity cutoff. Returned values are always canonical entries from choices.

Examples:

>>> validator = ValidaChoices(choices=["SPARTA", "BIRD"])
>>> validator.validate("sparta")
'SPARTA'

Methods:

  • validate

    Validate a string against allowed canonical choices.

validate
validate(value: str) -> str

Validate a string against allowed canonical choices.

Parameters:

  • value
    (str) –

    Value to validate.

Returns:

  • str

    Canonical value from choices.

Raises:

Source code in src/solarpandas/validate.py
def validate(self, value: str) -> str:
    """Validate a string against allowed canonical choices.

    Parameters
    ----------
    value : str
        Value to validate.

    Returns
    -------
    str
        Canonical value from ``choices``.

    Raises
    ------
    TypeError
        If ``value`` is not a string.
    ValueError
        If no close match is found.
    """
    if not isinstance(value, str):
        raise TypeError(f"{value} must be a string")
    case_safe_map = {choice.casefold(): choice for choice in self.choices}
    if value.casefold() not in case_safe_map:
        if not (matches := get_close_matches(value.casefold(), case_safe_map, n=1, cutoff=0.4)):
            raise ValueError(f"{value} is not among the allowed choices: {self.choices}")
        best_choice = case_safe_map[matches[0]]
        warnings.warn(f"{value} does not match the allowed choices. I took the closest one: {best_choice}")
        value = best_choice
    else:
        # Return the canonical value from choices, not the user input
        value = case_safe_map[value.casefold()]
    if self.parser is not None:
        return self.parser(value)
    return value

ValidaRange dataclass

ValidaRange(
    le: float | None = None,
    lt: float | None = None,
    ge: float | None = None,
    gt: float | None = None,
    parser: Callable[[float], float] | None = None,
)

Validator for numerical ranges.

Validates that numeric values fall within specified boundaries using inclusive (ge/le) or exclusive (gt/lt) comparisons. Multiple constraints can be combined to define precise ranges.

Parameters:

  • le
    (float or None, default: None ) –

    Inclusive upper bound.

  • lt
    (float or None, default: None ) –

    Exclusive upper bound.

  • ge
    (float or None, default: None ) –

    Inclusive lower bound.

  • gt
    (float or None, default: None ) –

    Exclusive lower bound.

  • parser
    (Callable[[float], float] or None, default: None ) –

    Optional callable applied after successful validation.

Notes

String inputs are converted to float before validation.

Examples:

>>> percentage = ValidaRange(ge=0, le=100)
>>> percentage.validate("75.5")
75.5

Methods:

  • validate

    Validate that a number falls within configured bounds.

validate
validate(value: float | int | str) -> float

Validate that a number falls within configured bounds.

Parameters:

  • value
    (float or int or str) –

    Numeric value to validate. Strings are converted to float.

Returns:

  • float

    Original or parsed value.

Raises:

  • TypeError

    If value cannot be converted to float.

  • ValueError

    If value violates any configured constraint.

Source code in src/solarpandas/validate.py
def validate(self, value: float | int | str ) -> float:
    """Validate that a number falls within configured bounds.

    Parameters
    ----------
    value : float or int or str
        Numeric value to validate. Strings are converted to float.

    Returns
    -------
    float
        Original or parsed value.

    Raises
    ------
    TypeError
        If ``value`` cannot be converted to float.
    ValueError
        If ``value`` violates any configured constraint.
    """
    try:
        value = float(value)
    except Exception:
        raise TypeError(f"{value} must be a number")
    if (self.le is not None) and (value > self.le):
        raise ValueError(f"{value} must be less or equal than {self.le}")
    if (self.lt is not None) and (value >= self.lt):
        raise ValueError(f"{value} must be less than {self.lt}")
    if (self.ge is not None) and (value < self.ge):
        raise ValueError(f"{value} must be greater or equal than {self.ge}")
    if (self.gt is not None) and (value <= self.gt):
        raise ValueError(f"{value} must be greater than {self.gt}")
    if self.parser is not None:
        return self.parser(value)
    return value

ValidaRegex dataclass

ValidaRegex(
    pattern: str, parser: Callable[[str], str] | None = None
)

Validator for string patterns using regular expressions.

This validator checks if a string matches a specified regex pattern. Optionally, a parser function can transform the validated string before returning it.

Parameters:

  • pattern
    (str) –

    Regex pattern to match. Use raw strings (for example r"...") for patterns containing backslashes.

  • parser
    (Callable[[str], str] or None, default: None ) –

    Optional callable applied after successful validation.

Examples:

>>> validator = ValidaRegex(pattern=r"^[A-Z]{3}$", parser=str.upper)
>>> validator.validate("abc")
'ABC'

Methods:

  • validate

    Validate a string against the configured regex pattern.

validate
validate(value: str) -> str

Validate a string against the configured regex pattern.

Parameters:

  • value
    (str) –

    Value to validate.

Returns:

  • str

    Original or parsed string when validation succeeds.

Raises:

Source code in src/solarpandas/validate.py
def validate(self, value: str) -> str:
    """Validate a string against the configured regex pattern.

    Parameters
    ----------
    value : str
        Value to validate.

    Returns
    -------
    str
        Original or parsed string when validation succeeds.

    Raises
    ------
    TypeError
        If ``value`` is not a string.
    ValueError
        If ``value`` does not match ``pattern``.
    """
    if not isinstance(value, str):
        raise TypeError(f"{value} must be a string")
    if not re.match(self.pattern, value):
        raise ValueError(f"{value} must match the regex pattern: {self.pattern}")
    if self.parser is not None:
        return self.parser(value)
    return value

validate_type

validate_type(value: Any, annotated_type: Any) -> Any

Validate a value against an Annotated type definition.

This function is the main entry point for type validation. It extracts the validator from an Annotated type alias and executes its validate method.

This enables declarative type validation using Python's type hints system.

Parameters:

  • value
    (Any) –

    Value to validate.

  • annotated_type
    (Any) –

    Alias defined as Annotated[base_type, Validator(...)] using the type statement.

Returns:

  • Any

    Validated value, possibly transformed by the validator. If value is None, None is returned.

Raises:

  • TypeError

    If annotated_type is not a valid Annotated alias.

  • ValueError

    If validator checks fail.

Examples:

>>> validate_type(40.4, Latitude)
40.4
>>> validate_type("PT01H", SodaTimeStep)
'PT01H'
See Also

Latitude Longitude Elevation SodaTimeStep

Notes

The function expects aliases created with type (PEP 695), for example type Latitude = Annotated[float, ValidaRange(...)].

Source code in src/solarpandas/validate.py
def validate_type(value: Any, annotated_type: Any) -> Any:
    """Validate a value against an ``Annotated`` type definition.

    This function is the main entry point for type validation. It extracts the
    validator from an `Annotated` type alias and executes its `validate` method.

    This enables declarative type validation using Python's type hints system.

    Parameters
    ----------
    value : Any
        Value to validate.
    annotated_type : Any
        Alias defined as ``Annotated[base_type, Validator(...)]`` using the
        ``type`` statement.

    Returns
    -------
    Any
        Validated value, possibly transformed by the validator. If ``value`` is
        ``None``, ``None`` is returned.

    Raises
    ------
    TypeError
        If ``annotated_type`` is not a valid ``Annotated`` alias.
    ValueError
        If validator checks fail.

    Examples
    --------
    >>> validate_type(40.4, Latitude)
    40.4
    >>> validate_type("PT01H", SodaTimeStep)
    'PT01H'

    See Also
    --------
    Latitude
    Longitude
    Elevation
    SodaTimeStep

    Notes
    -----
    The function expects aliases created with ``type`` (PEP 695), for example
    ``type Latitude = Annotated[float, ValidaRange(...)]``.
    """
    if value is not None:
        anntype_value = annotated_type.__value__
        if not hasattr(anntype_value, "__origin__") or get_origin(anntype_value) is not Annotated:
            raise TypeError(f"{annotated_type} is not an Annotated type")
        _, validator = get_args(anntype_value)
        return validator.validate(value)
    return None

Utilities

config

Configuration loading and runtime option access for solarpandas.

This module initializes the user configuration file on first use, loads TOML content into an in-memory dictionary, and exposes helpers to query or override options during the current Python session.

Notes

The persistent config file is stored in the platform-specific user config directory and is named config.toml.

Examples:

>>> from solarpandas.config import get_config_path, get_option, set_option
>>> config_path = get_config_path()
>>> email = get_option("crs_soda.user_email")
>>> set_option("solar-position.algorithm", "spa")

Functions:

  • get_config_path

    Return the path of the user configuration file.

  • get_option

    Retrieve the value of a specific configuration option.

  • load_config

    Load configuration from a TOML file and optionally overwrite global state.

  • reset_config_file

    Reset configuration to defaults by deleting and recreating the file.

  • save_config

    Persist the in-memory configuration to a TOML file.

  • set_option

    Temporarily update a global option for the current session.

  • show_config

    Print all current global options to the console.

get_config_path

get_config_path() -> Path

Return the path of the user configuration file.

Returns:

  • Path

    Absolute path to config.toml in the platform-specific user configuration directory.

Source code in src/solarpandas/config.py
def get_config_path() -> Path:
    """Return the path of the user configuration file.

    Returns
    -------
    pathlib.Path
        Absolute path to ``config.toml`` in the platform-specific user
        configuration directory.
    """
    path = platformdirs.user_config_path(appname="solarpandas", ensure_exists=True)
    return path / "config.toml"

get_option

get_option(name: str, default: Any = None) -> Any

Retrieve the value of a specific configuration option.

Options are organized in tables (sections) within the TOML file. This function uses dot notation to access nested values.

Parameters:

  • name
    (str) –

    Option path in <table>.<option> format (for example, "solar-position.algorithm").

  • default
    (Any, default: None ) –

    Value returned when the table or option is missing.

Returns:

  • Any

    Option value, or default when missing. Options named data_dir are returned as :class:pathlib.Path.

Examples:

>>> from solarpandas.config import get_option
>>> algorithm = get_option("solar-position.algorithm")
>>> server = get_option("bsrn.server", default="ftp.bsrn.awi.de")
>>> data_dir = get_option("bsrn.data_dir")  # returns a Path or None
Source code in src/solarpandas/config.py
def get_option(name: str, default: Any = None) -> Any:
    """Retrieve the value of a specific configuration option.

    Options are organized in tables (sections) within the TOML file.
    This function uses dot notation to access nested values.

    Parameters
    ----------
    name : str
        Option path in ``<table>.<option>`` format
        (for example, ``"solar-position.algorithm"``).
    default : Any, default None
        Value returned when the table or option is missing.

    Returns
    -------
    Any
        Option value, or ``default`` when missing. Options named ``data_dir``
        are returned as :class:`pathlib.Path`.

    Examples
    --------
    >>> from solarpandas.config import get_option
    >>> algorithm = get_option("solar-position.algorithm")
    >>> server = get_option("bsrn.server", default="ftp.bsrn.awi.de")
    >>> data_dir = get_option("bsrn.data_dir")  # returns a Path or None
    """
    table_name, option_name = name.split(".")
    if (table := _GLOBAL_CONFIG.get(table_name, None)) is None:
        logger.warning(f"missing table `{table_name}`")
        return default
    if (value := table.get(option_name, None)) is None:
        return default
    if option_name == "data_dir":
        return Path(value)
    return value

load_config

load_config(
    path: Path | None = None, overwrite: bool = True
) -> dict[str, Any]

Load configuration from a TOML file and optionally overwrite global state.

Parameters:

  • path
    (Path or None, default: None ) –

    Optional path to a TOML file. If None, the default config path is used.

  • overwrite
    (bool, default: True ) –

    If True, replace _GLOBAL_CONFIG entirely. If False, only missing top-level tables are inserted (shallow merge).

Returns:

  • dict[str, Any]

    Loaded configuration dictionary.

Source code in src/solarpandas/config.py
def load_config(path: Path | None = None, overwrite: bool = True) -> dict[str, Any]:
    """Load configuration from a TOML file and optionally overwrite global state.

    Parameters
    ----------
    path : pathlib.Path or None, default None
        Optional path to a TOML file. If ``None``, the default config path is used.
    overwrite : bool, default True
        If ``True``, replace ``_GLOBAL_CONFIG`` entirely. If ``False``, only
        missing top-level tables are inserted (shallow merge).

    Returns
    -------
    dict[str, Any]
        Loaded configuration dictionary.
    """
    cfg_path = get_config_path() if path is None else Path(path)
    if not cfg_path.exists():
        raise FileNotFoundError(f"config file not found: {cfg_path}")

    with cfg_path.open(mode="rb") as f:
        loaded = tomlkit.load(f)

    global _GLOBAL_CONFIG
    if overwrite:
        _GLOBAL_CONFIG = loaded
    else:
        for k, v in loaded.items():
            _GLOBAL_CONFIG.setdefault(k, v)

    logger.success(f"config loaded from <blue>{cfg_path}</blue>")
    return _GLOBAL_CONFIG

reset_config_file

reset_config_file()

Reset configuration to defaults by deleting and recreating the file.

Notes

This operation updates the in-memory global configuration immediately.

Source code in src/solarpandas/config.py
def reset_config_file():
    """Reset configuration to defaults by deleting and recreating the file.

    Notes
    -----
    This operation updates the in-memory global configuration immediately.
    """
    global _GLOBAL_CONFIG
    if get_config_path().exists():
        get_config_path().unlink()
        logger.success(f"config file {get_config_path()} deleted")
    _GLOBAL_CONFIG = _read_config_options()

save_config

save_config(path: Path | None = None) -> None

Persist the in-memory configuration to a TOML file.

Parameters:

  • path
    (Path or None, default: None ) –

    Optional output path. When None, the default location from :func:get_config_path is used.

Notes

Path instances in values are converted to POSIX strings before writing.

Source code in src/solarpandas/config.py
def save_config(path: Path | None = None) -> None:
    """Persist the in-memory configuration to a TOML file.

    Parameters
    ----------
    path : pathlib.Path or None, default None
        Optional output path. When ``None``, the default location from
        :func:`get_config_path` is used.

    Notes
    -----
    Path instances in values are converted to POSIX strings before writing.
    """
    target = get_config_path() if path is None else Path(path)
    target.parent.mkdir(parents=True, exist_ok=True)

    def _serialize(obj: Any):
        if isinstance(obj, Path):
            return obj.as_posix()
        if isinstance(obj, dict):
            return {k: _serialize(v) for k, v in obj.items()}
        if isinstance(obj, list):
            return [_serialize(v) for v in obj]
        return obj

    serializable = _serialize(_GLOBAL_CONFIG)
    with target.open(mode="w", encoding="utf-8") as f:
        f.write(tomlkit.dumps(serializable))
    logger.success(f"config saved at <blue>{target}</blue>")

set_option

set_option(name: str, value: Any) -> None

Temporarily update a global option for the current session.

Modifies configuration values in memory only. Changes are lost when the Python session ends. To make persistent changes, edit the config.toml file directly.

Parameters:

  • name
    (str) –

    Option path in <table>.<option> format.

  • value
    (Any) –

    New value. For data_dir options, Path is converted to string.

Examples:

>>> from solarpandas.config import set_option, get_option
>>> set_option("solar-position.algorithm", "nrel")
>>> get_option("solar-position.algorithm")
'nrel'
>>> from pathlib import Path
>>> set_option("bsrn.data_dir", Path("/tmp/bsrn-cache"))
Notes

Changes are session-local. Call :func:save_config to persist them.

Source code in src/solarpandas/config.py
def set_option(name: str, value: Any) -> None:
    """Temporarily update a global option for the current session.

    Modifies configuration values in memory only. Changes are lost when
    the Python session ends. To make persistent changes, edit the
    config.toml file directly.

    Parameters
    ----------
    name : str
        Option path in ``<table>.<option>`` format.
    value : Any
        New value. For ``data_dir`` options, ``Path`` is converted to string.

    Examples
    --------
    >>> from solarpandas.config import set_option, get_option
    >>> set_option("solar-position.algorithm", "nrel")
    >>> get_option("solar-position.algorithm")
    'nrel'
    >>> from pathlib import Path
    >>> set_option("bsrn.data_dir", Path("/tmp/bsrn-cache"))

    Notes
    -----
    Changes are session-local. Call :func:`save_config` to persist them.
    """

    table_name, option_name = name.split(".")
    if _GLOBAL_CONFIG.get(table_name, None) is None:
        logger.warning(f"missing table `{table_name}`")
        return None
    if option_name == "data_dir" and isinstance(value, Path):
        value = value.as_posix()
    _GLOBAL_CONFIG[table_name][option_name] = value

show_config

show_config() -> None

Print all current global options to the console.

Notes

Uses :func:pprint.pprint for a compact formatted output.

Source code in src/solarpandas/config.py
def show_config() -> None:
    """Print all current global options to the console.

    Notes
    -----
    Uses :func:`pprint.pprint` for a compact formatted output.
    """
    from pprint import pprint

    return pprint(_GLOBAL_CONFIG, indent=2, width=20)

helpers

General helper functions shared across the solarpandas package.

Functions:

  • infer_time_step

    Infer the sampling time step from a datetime-like index.

  • normalize

    Reindex data to have complete first and last calendar days.

infer_time_step

infer_time_step(
    df_or_s: DataFrame | Series,
) -> Timedelta | None

Infer the sampling time step from a datetime-like index.

Parameters:

  • df_or_s
    (DataFrame or Series) –

    Object with a monotonic datetime-like index.

Returns:

  • Timedelta or None

    Inferred step. Returns None if the time step cannot be inferred and no valid time differences are available.

Notes

The function first tries the index freq attribute and :func:pandas.infer_freq. If that fails, it falls back to the smallest observed lag in index.diff().

Source code in src/solarpandas/helpers.py
def infer_time_step(df_or_s: pd.DataFrame | pd.Series) -> pd.Timedelta | None:
    """Infer the sampling time step from a datetime-like index.

    Parameters
    ----------
    df_or_s : pandas.DataFrame or pandas.Series
        Object with a monotonic datetime-like index.

    Returns
    -------
    pandas.Timedelta or None
        Inferred step. Returns ``None`` if the time step cannot be inferred and
        no valid time differences are available.

    Notes
    -----
    The function first tries the index ``freq`` attribute and
    :func:`pandas.infer_freq`. If that fails, it falls back to the smallest
    observed lag in ``index.diff()``.
    """
    if (freq := (df_or_s.index.freq or pd.infer_freq(df_or_s.index))) is None:
        logger.warning("Could not infer the index frequency using `pd.infer_freq`")
        time_step = df_or_s.index.diff().unique().drop(pd.NaT, errors="ignore")
        if len(time_step) == 0:
            logger.warning("Could not infer the index time step from the shortest lag between consecutive rows")
            logger.error("No valid time steps found.")
            return None
        return time_step.min()
    return pd.to_timedelta(pd.tseries.frequencies.to_offset(freq))

normalize

normalize(
    df_or_s: DataFrame | Series, **kwargs
) -> DataFrame | Series

Reindex data to have complete first and last calendar days.

Parameters:

  • df_or_s
    (DataFrame or Series) –

    Input object indexed by timestamps.

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments forwarded to DataFrame.reindex or Series.reindex (for example method='nearest' or fill_value=0).

Returns:

  • DataFrame or Series

    Reindexed object spanning complete days from the first day start to the day after the last timestamp (left-inclusive).

Examples:

>>> import pandas as pd
>>> s = pd.Series([1, 2], index=pd.to_datetime(["2024-01-01 12:00", "2024-01-01 13:00"]))
>>> out = normalize(s)
>>> out.index.min().hour
0
Source code in src/solarpandas/helpers.py
def normalize(df_or_s: pd.DataFrame | pd.Series, **kwargs) -> pd.DataFrame | pd.Series:
    """Reindex data to have complete first and last calendar days.

    Parameters
    ----------
    df_or_s : pandas.DataFrame or pandas.Series
        Input object indexed by timestamps.
    **kwargs : Any
        Extra keyword arguments forwarded to ``DataFrame.reindex`` or
        ``Series.reindex`` (for example ``method='nearest'`` or
        ``fill_value=0``).

    Returns
    -------
    pandas.DataFrame or pandas.Series
        Reindexed object spanning complete days from the first day start to the
        day after the last timestamp (left-inclusive).

    Examples
    --------
    >>> import pandas as pd
    >>> s = pd.Series([1, 2], index=pd.to_datetime(["2024-01-01 12:00", "2024-01-01 13:00"]))
    >>> out = normalize(s)
    >>> out.index.min().hour
    0
    """

    # determine the dataframe or series index frequency and time step
    time_step = infer_time_step(df_or_s)

    # determine the start of the new index that have a complete first day
    start = df_or_s.index.min()
    midnight_start = start.floor("D")
    lag = (start - midnight_start) % time_step
    new_start = midnight_start + lag

    # determine the timestamp ending for pd.date_range
    end = df_or_s.index.max()
    midnight_end = end.floor("D") + pd.Timedelta(days=1)

    new_index = pd.date_range(start=new_start, end=midnight_end, freq=time_step, inclusive="left")
    return df_or_s.reindex(new_index, **kwargs)

logtools

Logging format helpers used by solarpandas logging setup.

Functions:

disable_logger

disable_logger()

Disable logging for the solarpandas namespace.

Source code in src/solarpandas/logtools.py
def disable_logger():
    """Disable logging for the solarpandas namespace."""
    logger.disable("solarpandas")

enable_logger

enable_logger(
    name: str | None = None,
    with_mp: bool = False,
    filtros: list[str] | None = None,
    **kwargs,
)

Configure and enable package logging.

Parameters:

  • name
    (str or None, default: None ) –

    Logger namespace to enable explicitly. If None, enables __main__.

  • with_mp
    (bool, default: False ) –

    Enable multi-process-friendly logging options.

  • filtros
    (list[str] or None, default: None ) –

    Prefixes to filter via :func:filtrar_logs.

  • **kwargs
    (Any, default: {} ) –

    Extra keyword arguments forwarded to logger.add.

Notes

Existing handlers are removed before installing the new one.

Examples:

>>> from solarpandas.logtools import enable_logger
>>> enable_logger("solarpandas", level="INFO")
Source code in src/solarpandas/logtools.py
def enable_logger(name: str | None = None, with_mp: bool = False, filtros: list[str] | None = None, **kwargs):
    """Configure and enable package logging.

    Parameters
    ----------
    name : str or None, default None
        Logger namespace to enable explicitly. If ``None``, enables ``__main__``.
    with_mp : bool, default False
        Enable multi-process-friendly logging options.
    filtros : list[str] or None, default None
        Prefixes to filter via :func:`filtrar_logs`.
    **kwargs : Any
        Extra keyword arguments forwarded to ``logger.add``.

    Notes
    -----
    Existing handlers are removed before installing the new one.

    Examples
    --------
    >>> from solarpandas.logtools import enable_logger
    >>> enable_logger("solarpandas", level="INFO")
    """
    global logger
    logger.remove()  # Remove the default handler.
    default_kwargs = dict(
        sink=sys.stderr,
        level="INFO",
        format=get_message_format(with_mp=with_mp),
        colorize=True,
        enqueue=with_mp,
        filter=filtrar_logs(filtros or []),
    )
    logger.add(**(default_kwargs | (kwargs or {})))
    logger = logger.opt(colors=True)
    logger.enable(name or "__main__")
    logger.enable("solarpandas")

filtrar_logs

filtrar_logs(filtros: list[str] | None = None)

Create a filter function to suppress noisy namespaces.

Parameters:

  • filtros
    (list[str] or None, default: None ) –

    Module name prefixes to filter. Matching records are only kept when severity is ERROR or higher.

Returns:

  • Callable[[dict], bool]

    Predicate suitable for logger.add(filter=...).

Source code in src/solarpandas/logtools.py
def filtrar_logs(filtros: list[str] | None = None):
    """Create a filter function to suppress noisy namespaces.

    Parameters
    ----------
    filtros : list[str] or None, default None
        Module name prefixes to filter. Matching records are only kept when
        severity is ERROR or higher.

    Returns
    -------
    Callable[[dict], bool]
        Predicate suitable for ``logger.add(filter=...)``.
    """
    def filtro(record):
        if any([record["name"].startswith(name) for name in filtros or []]):
            return record["level"].no >= logger.level("ERROR").no  # only pass if is an ERROR or more severe
        return True  # any other will pass through
    return filtro

get_message_format

get_message_format(with_mp: bool = False)

Build a loguru format callable with level-aware styling.

Parameters:

  • with_mp
    (bool, default: False ) –

    If True, include process name to help identify multi-process logs.

Returns:

  • Callable[[dict], str]

    Formatter callable compatible with logger.add(format=...).

Source code in src/solarpandas/logtools.py
def get_message_format(with_mp: bool = False):
    """Build a loguru format callable with level-aware styling.

    Parameters
    ----------
    with_mp : bool, default False
        If ``True``, include process name to help identify multi-process logs.

    Returns
    -------
    Callable[[dict], str]
        Formatter callable compatible with ``logger.add(format=...)``.
    """
    def level_aware_format(record):
        # see: https://loguru.readthedocs.io/en/stable/api/logger.html#record
        level_icon = "<lvl>{level.icon} {level:<7}</lvl>"
        separator = " <c>|></c> "
        process_info = "<magenta>{process.name:<12}</magenta> " if with_mp else ""
        msg_format = level_icon + process_info + separator
        if record.get("level").name == "DEBUG":
            msg_format += "<g>({function})</g> {message}"
        elif record.get("level").name == "WARNING":
            level_icon = "<lvl>{level.icon}  {level:<7}</lvl>"
            msg_format = level_icon + process_info + separator + "<g>({function})</g> <y>{message}</y>"
        elif record.get("level").name == "SUCCESS":
            msg_format += "<g>{message}</g>"
        elif record.get("level").name == "INFO":
            level_icon = "<lvl>{level.icon}  {level:<7}</lvl>"
            msg_format = level_icon + process_info + separator + "{message}"
        else:
            msg_format += "{message}"
        return msg_format + "\n{exception}"
    return level_aware_format

mplstyles

Registration helpers and constants for bundled Matplotlib styles.

Functions:

  • register_mplstyles

    Register bundled Matplotlib styles under the solarpandas-* prefix.

register_mplstyles

register_mplstyles()

Register bundled Matplotlib styles under the solarpandas-* prefix.

Notes

Calling this function multiple times is safe; style keys are overwritten with the same values.

Examples:

>>> from solarpandas.mplstyles import register
>>> register()
Source code in src/solarpandas/mplstyles/__init__.py
def register_mplstyles():
    """Register bundled Matplotlib styles under the ``solarpandas-*`` prefix.

    Notes
    -----
    Calling this function multiple times is safe; style keys are overwritten
    with the same values.

    Examples
    --------
    >>> from solarpandas.mplstyles import register
    >>> register()
    """
    import matplotlib.pyplot as plt
    path = files("solarpandas.mplstyles")
    styles = plt.style.core.read_style_directory(path)
    for key, value in styles.items():
        name = f"solarpandas-{key}"
        if name not in plt.style.library:
            plt.style.library[name] = value

sample_data

Sample datasets bundled with solarpandas for demos and tests.

Functions:

load_carpentras_data

load_carpentras_data()

Load bundled Carpentras BSRN sample data.

Returns:

  • SolarDataFrame

    Pre-packaged sample dataset stored in Parquet format.

Examples:

>>> import solarpandas as sp
>>> sdf = sp.sample_data.load_carpentras_data()
Notes

This helper is intended for demos, examples and quick local checks.

Source code in src/solarpandas/sample_data/__init__.py
def load_carpentras_data():
    """Load bundled Carpentras BSRN sample data.

    Returns
    -------
    SolarDataFrame
        Pre-packaged sample dataset stored in Parquet format.

    Examples
    --------
    >>> import solarpandas as sp
    >>> sdf = sp.sample_data.load_carpentras_data()

    Notes
    -----
    This helper is intended for demos, examples and quick local checks.
    """
    this_dir = Path(__file__).absolute().parent
    filename = this_dir / "car_bsrn_2016.parquet"
    data = read_parquet(filename)
    logger.success(f"Carpentras BSRN data loaded from {filename.relative_to(this_dir.parent)}")
    return data