# vim: set fileencoding=utf-8 ts=4 sw=4 lw=79
# Copyright 2013 Perttu Luukko
# This file is part of pyeemd.
#
# pyeemd is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pyeemd is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pyeemd. If not, see <http://www.gnu.org/licenses/>.
"""
pyeemd: A Python module for performing the (Ensemble) Empirical Mode
Decomposition on various kinds of data. This Python module exposes the methods
defined in ``libeemd.so`` via a simple ``ctypes`` interface.
.. important::
The path to the library file ``libeemd.so`` is set by the environment
variable LIBEEMD_FILE. If the variable is not set, pyeemd tries to find the
file in the same directory as ``pyeemd.py``. Finally it resorts to
``ctypes.util.find_library`` for finding it. If you have trouble getting
pyeemd to find the libeemd library, check out the documentation of
``ctypes.util.find_library`` to see what the utility actually does on your
platform.
"""
import os
import warnings
import ctypes
from ctypes.util import find_library
import numpy
from numpy.ctypeslib import ndpointer
# Load libeemd.so
LIBEEMD_FILE_VARIABLE = "LIBEEMD_FILE"
def _init():
# Provide a mock library for ReadTheDocs where libeemd is not available
# but the code needs to be imported for autodoc
if os.environ.get('READTHEDOCS', None) == 'True':
from unittest.mock import Mock
return Mock()
# First, try an environment variable
if LIBEEMD_FILE_VARIABLE in os.environ:
return ctypes.CDLL(os.environ[LIBEEMD_FILE_VARIABLE])
# Second, try 'libeemd.so' in current directory
dirname = os.path.dirname(os.path.realpath(__file__))
check_first_names = ["libeemd.so"]
for libfile in [os.path.join(dirname, filename) for filename in check_first_names]:
if os.path.exists(libfile):
return ctypes.CDLL(libfile)
# Finally, try find_library
lib = find_library("eemd")
if lib:
return ctypes.CDLL(lib)
else:
raise RuntimeError("Cannot find libeemd C library. Tried directory '%s' and ctypes.util.find_library" % dirname)
_libeemd = _init()
def libeemd_error_handler(err):
"""
Function for handling error codes reported by libeemd and converting
them to exceptions if needed.
"""
if err == 0:
return
if 1 <= err <= 7:
raise ValueError("libeemd reported a call with incorrect parameters, but the error condition was not handled by pyeemd. The error code was %d. Please see eemd.h to see what the error was and file a bug report." % err)
else:
raise RuntimeError("libeemd reported a runtime error with error code %d. Please see eemd.h to see what the error was and file a bug report." % err)
return
# Call signature for eemd()
_libeemd.eemd.restype = libeemd_error_handler
_libeemd.eemd.argtypes = [ndpointer(float, flags=('C', 'A')),
ctypes.c_size_t,
ndpointer(float, flags=('C', 'A', 'W')),
ctypes.c_size_t,
ctypes.c_uint,
ctypes.c_double,
ctypes.c_uint,
ctypes.c_uint,
ctypes.c_ulong]
# Call signature for ceemdan() (exactly the same as eemd)
_libeemd.ceemdan.restype = libeemd_error_handler
_libeemd.ceemdan.argtypes = [ndpointer(float, flags=('C', 'A')),
ctypes.c_size_t,
ndpointer(float, flags=('C', 'A', 'W')),
ctypes.c_size_t,
ctypes.c_uint,
ctypes.c_double,
ctypes.c_uint,
ctypes.c_uint,
ctypes.c_ulong]
# Call signature for emd_find_extrema()
_libeemd.emd_find_extrema.restype = None
_libeemd.emd_find_extrema.argtypes = [ndpointer(float, flags=('C', 'A')),
ctypes.c_size_t,
ndpointer(float, flags=('C', 'A', 'W')),
ndpointer(float, flags=('C', 'A', 'W')),
ctypes.POINTER(ctypes.c_size_t),
ndpointer(float, flags=('C', 'A', 'W')),
ndpointer(float, flags=('C', 'A', 'W')),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t)]
# Call signature for emd_num_imfs()
_libeemd.emd_num_imfs.restype = ctypes.c_size_t
_libeemd.emd_num_imfs.argtypes = [ctypes.c_size_t]
# Call signature for emd_evaluate_spline()
_libeemd.emd_evaluate_spline.restype = None
_libeemd.emd_evaluate_spline.argtypes = [ndpointer(float, flags=('C', 'A')),
ndpointer(float, flags=('C', 'A')),
ctypes.c_size_t,
ndpointer(float, flags=('C', 'A', 'W')),
ndpointer(float, flags=('C', 'A', 'W'))]
def libeemd_version():
"""
Read the version string from the libeemd library and return it as a tuple of integers.
"""
version_str = ctypes.c_char_p.in_dll(_libeemd, "libeemd_version").value
version_tuple = tuple([ int(s) for s in version_str.split(b'.') ])
return version_tuple
[docs]def eemd(inp, num_imfs=None, ensemble_size=250, noise_strength=0.2, S_number=None,
num_siftings=None, rng_seed=0):
"""
Decompose input data array `inp` to Intrinsic Mode Functions (IMFs) with the
Ensemble Empirical Mode Decomposition algorithm [1]_.
The size of the ensemble and the relative magnitude of the added noise are
given by parameters `ensemble_size` and `noise_strength`, respectively. The
stopping criterion for the decomposition is given by either a S-number or
an absolute number of siftings. In the case that both are positive numbers,
the sifting ends when either of the conditions is fulfilled. By default,
`num_siftings=50` and `S_number=4`. If only `S_number` is set to a positive
value, `num_siftings` defaults to 50. If only `num_siftings` is set to a
positive value, `S_number` defaults to 0.
Parameters
----------
inp : array_like, shape (N,)
The input signal to decompose. Has to be a one-dimensional array-like object.
num_imfs : int, optional
Number of IMFs to extract. If set to `None`, a default value given by
`emd_num_imfs(N)` is used. Note that the residual is also counted in
this number, so num_imfs=1 will simply give you the input signal plus
noise.
ensemble_size : int, optional
Number of copies of the input signal to use as the ensemble.
noise_strength : float, optional
Standard deviation of the Gaussian random numbers used as additional
noise. **This value is relative** to the standard deviation of the input
signal.
S_number : int, optional
Use the S-number stopping criterion [2]_ for the EMD procedure with the given values of `S`.
That is, iterate until the number of extrema and zero crossings in the
signal differ at most by one, and stay the same for `S` consecutive
iterations. Typical values are in the range `3 .. 8`. If `S_number` is
zero, this stopping criterion is ignored.
num_siftings : int, optional
Use a maximum number of siftings as a stopping criterion. If
`num_siftings` is zero, this stopping criterion is ignored.
rng_seed : int, optional
A seed for the random number generator. A value of zero denotes
an implementation-defined default value.
Notes
------
At least one of `S_number` and `num_siftings` must be positive. If both are
positive, the iteration stops when either of the criteria is fulfilled.
Returns
-------
imfs : ndarray, shape (M, N)
A `MxN` array with `M = num_imfs`. The rows of the array are the IMFs
of the input signal, with the last row being the final residual.
References
----------
.. [1] Z. Wu and N. Huang, "Ensemble Empirical Mode Decomposition: A
Noise-Assisted Data Analysis Method", Advances in Adaptive Data Analysis,
Vol. 1 (2009) 1–41
.. [2] N. E. Huang, Z. Shen and S. R. Long, "A new view of nonlinear water
waves: The Hilbert spectrum", Annual Review of Fluid Mechanics, Vol. 31
(1999) 417–457
See also
--------
emd : The regular Empirical Mode Decomposition routine.
emd_num_imfs : The number of IMFs returned for a given input length `N`
unless a specific number is set by `num_imfs`.
"""
# Set default values for S_number and num_siftings
if (S_number is None and num_siftings is None):
S_number = 4
num_siftings = 50
if (S_number is None and num_siftings is not None):
S_number = 0
if (S_number is not None and num_siftings is None):
num_siftings = 50
# Perform some checks on input arguments first
if (num_imfs is not None and num_imfs < 1):
raise ValueError("num_imfs passed to eemd must be >= 1")
if (ensemble_size < 1):
raise ValueError("ensemble_size passed to eemd must be >= 1")
if (S_number < 0):
raise ValueError("S_number passed to eemd must be non-negative")
if (num_siftings < 0):
raise ValueError("num_siftings passed to eemd must be non-negative")
if (S_number == 0 and num_siftings == 0):
raise ValueError("one of S_number or num_siftings must be positive")
if (noise_strength < 0):
raise ValueError("noise_strength passed to eemd must be non-negative")
if (num_siftings == 0):
warnings.warn("(E)EMD with only the S-number stopping criterion (i.e., num_siftings=0) might never finish if stuck in some obscure numerical corner case.", stacklevel=2)
# Initialize numpy arrays
inp = numpy.require(inp, float, ('C', 'A'))
if (inp.ndim != 1):
raise ValueError("input data passed to eemd must be a 1D array")
N = inp.size
M = (num_imfs if num_imfs is not None else emd_num_imfs(N))
outbuffer = numpy.zeros((M, N), dtype=float, order='C')
# Call C routine
_libeemd.eemd(inp, N, outbuffer, M, ensemble_size, noise_strength,
S_number, num_siftings, rng_seed)
return outbuffer
[docs]def ceemdan(inp, num_imfs=None, ensemble_size=250, noise_strength=0.2, S_number=None,
num_siftings=None, rng_seed=0):
"""
Decompose input data array `inp` to Intrinsic Mode Functions (IMFs) with the
Complete Ensemble Empirical Mode Decomposition with Adaptive Noise (CEEMDAN)
algorithm [1]_, a variant of EEMD. For description of the input parameters
and output, please see documentation of :func:`~pyeemd.eemd`.
References
----------
.. [1] M. Torres et al, "A Complete Ensemble Empirical Mode Decomposition
with Adaptive Noise" IEEE Int. Conf. on Acoust., Speech and Signal Proc.
ICASSP-11, (2011) 4144–4147
See also
--------
eemd : The regular Ensemble Empirical Mode Decomposition routine.
emd_num_imfs : The number of IMFs returned for a given input length.
"""
# Set default values for S_number and num_siftings
if (S_number is None and num_siftings is None):
S_number = 4
num_siftings = 50
if (S_number is None and num_siftings is not None):
S_number = 0
if (S_number is not None and num_siftings is None):
num_siftings = 50
# Perform some checks on input arguments first
if (num_imfs is not None and num_imfs < 1):
raise ValueError("num_imfs passed to ceemdan must be >= 1")
if (ensemble_size < 1):
raise ValueError("ensemble_size passed to ceemdan must be >= 1")
if (S_number < 0):
raise ValueError("S_number passed to ceemdan must be non-negative")
if (num_siftings < 0):
raise ValueError("num_siftings passed to ceemdan must be non-negative")
if (S_number == 0 and num_siftings == 0):
raise ValueError("one of S_number or num_siftings must be positive")
if (noise_strength < 0):
raise ValueError("noise_strength passed to ceemdan must be non-negative")
if (num_siftings == 0):
warnings.warn("CEEMDAN with only the S-number stopping criterion (i.e., num_siftings=0) might never finish if stuck in some obscure numerical corner case.", stacklevel=2)
# Initialize numpy arrays
inp = numpy.require(inp, float, ('C', 'A'))
if (inp.ndim != 1):
raise ValueError("input data passed to ceemdan must be a 1D array")
N = inp.size
M = (num_imfs if num_imfs is not None else emd_num_imfs(N))
outbuffer = numpy.zeros((M, N), dtype=float, order='C')
# Call C routine
_libeemd.ceemdan(inp, N, outbuffer, M, ensemble_size, noise_strength, S_number,
num_siftings, rng_seed)
return outbuffer
[docs]def emd(inp, num_imfs=None, S_number=None, num_siftings=None):
"""
A convenience function for performing EMD (not EEMD). This simply calls
function :func:`~pyeemd.eemd` with ``ensemble_size=1`` and ``noise_strength=0``.
"""
return eemd(inp, num_imfs, ensemble_size=1, noise_strength=0,
S_number=S_number, num_siftings=num_siftings)
[docs]def emd_find_extrema(x):
"""
Find the local minima and maxima from input data `x`. This includes the
artificial extrema added to the ends of the data as specified in the
original EEMD article [1]_.
Parameters
----------
x : array_like, shape (N,)
The input data. Has to be a one-dimensional array_like object.
Returns
-------
maxx : ndarray
The x-coordinates of the local maxima.
maxy : ndarray
The y-coordinates of the local maxima.
minx : ndarray
The x-coordinates of the local minima.
miny : ndarray
The y-coordinates of the local minima.
References
----------
.. [1] Z. Wu and N. Huang, "Ensemble Empirical Mode Decomposition: A
Noise-Assisted Data Analysis Method", Advances in Adaptive Data Analysis,
Vol. 1 (2009) 1–41
"""
x = numpy.require(x, float, ('C', 'A'))
if (x.ndim != 1):
raise ValueError("input data passed to eemd must be a 1D array")
N = x.size
maxx = numpy.empty(N, dtype=float, order='C')
maxy = numpy.empty(N, dtype=float, order='C')
num_max = ctypes.c_size_t()
minx = numpy.empty(N, dtype=float, order='C')
miny = numpy.empty(N, dtype=float, order='C')
num_min = ctypes.c_size_t()
num_zc = ctypes.c_size_t()
_libeemd.emd_find_extrema(x, N, maxx, maxy, ctypes.byref(num_max), minx,
miny, ctypes.byref(num_min), ctypes.byref(num_zc))
maxx.resize(num_max.value)
maxy.resize(num_max.value)
minx.resize(num_min.value)
miny.resize(num_min.value)
return [maxx, maxy, minx, miny]
[docs]def emd_num_imfs(N):
"""
Return number of IMFs that will be extracted from input data of
length `N`, including the final residual.
"""
N = int(N)
if (N < 0): # Negative values will overflow silently otherwise
raise ValueError("Negative value passed for N in emd_num_imfs")
return _libeemd.emd_num_imfs(N)
[docs]def emd_evaluate_spline(x, y):
"""
Evaluates a cubic spline with the given (x, y) points as knots.
Parameters
----------
x : array_like, shape (N,)
The x coordinates of the knots. The array must be sorted, start from 0
and end at an integer.
y : array_like, shape (N,)
The y coordinates of the knots.
Returns
-------
spline_y : ndarray
The cubic spline curve defined by the knots and the "not-a-knot" end
conditions, evaluated at integer points from 0 to ``max(x)``.
Notes
-----
As you can see from the definition, this method is tuned to work only in
the case needed by EMD. This method is made available mainly for
visualization and unit testing purposes. Better general purpose spline
methods exist already in :mod:`scipy.interpolate`.
See also
--------
emd_find_extrema : A method of finding the extrema for spline fitting.
"""
x = numpy.require(x, float, ('C', 'A'))
y = numpy.require(y, float, ('C', 'A'))
if (x.ndim != 1):
raise ValueError("input data passed to emd_evaluate_spline must be a 1D array")
if (x.shape != y.shape):
raise ValueError("the shapes of x and y passed to emd_evaluate_spline must match")
if not (x.size >= 2):
raise ValueError("the size of the array x passed to emd_evaluate_spline must be at least two")
if not all(numpy.diff(x) >= 0):
raise ValueError("the input array x passed to emd_evaluate_spline is not sorted")
if not (x[0] == 0 and numpy.modf(x[-1])[0] == 0):
raise ValueError("the input array x passed to emd_evaluate_spline needs to start at zero and end at an integer")
N = x.size
output_N = int(x[-1])+1
spline_y = numpy.empty(output_N, dtype=float, order='C')
ws_size = 5*N-10 if N>=2 else 0
spline_workspace = numpy.empty(ws_size, dtype=float, order='C')
_libeemd.emd_evaluate_spline(x, y, N, spline_y, spline_workspace)
return spline_y