"""Instance-level Luna API.
This module exports :class:`inst`, the per-record wrapper around EDF data,
annotation access, evaluation, and in-memory signal editing operations.
"""
import lunapi.lunapi0 as _luna
import pandas as pd
import numpy as np
from scipy.stats.mstats import winsorize
from scipy.signal import sosfilt
import matplotlib.pyplot as plt
from matplotlib import cm
try:
from IPython.core import display as ICD
from IPython.display import display as _ipy_display
except ImportError:
ICD = None
_ipy_display = None
import plotly.graph_objects as go
import time
import pathlib
import os
from .project import proj
from .resources import resources
from .results import tables, cmdfile
def hypno(*args, **kwargs):
"""Lazy proxy to :func:`lunapi.viz.hypno` to avoid circular imports."""
from .viz import hypno as _hypno
return _hypno(*args, **kwargs)
def psd(*args, **kwargs):
"""Lazy proxy to :func:`lunapi.viz.psd` to avoid circular imports."""
from .viz import psd as _psd
return _psd(*args, **kwargs)
def spec(*args, **kwargs):
"""Lazy proxy to :func:`lunapi.viz.spec` to avoid circular imports."""
from .viz import spec as _spec
return _spec(*args, **kwargs)
[docs]
class inst:
"""Wrapper around a single EDF record (signals, annotations, and results).
An :class:`inst` object represents one individual/study and provides
methods to:
- attach EDF and annotation files
- inspect channel headers and annotation classes
- set individual-level variables
- run Luna commands and retrieve the resulting tables
- extract raw signal data and annotation events
- insert or update in-memory signals and annotations
- produce quick visualisations (hypnogram, PSD, spectrogram)
Instances are normally obtained through :meth:`proj.inst` or created
directly for stand-alone EDF access::
p = proj()
individual = p.inst(1) # from a sample list
individual = inst('path/to.edf') # direct file path
"""
def __init__(self,p=None):
if ( isinstance(p,str) ):
self.edf = _luna.inst(p)
elif (isinstance(p,_luna.inst)):
self.edf = p
else:
self.edf = _luna.inst()
def __repr__(self):
return f'{self.edf}'
#------------------------------------------------------------------------
[docs]
def id(self):
"""Return the identifier for this instance.
Returns
-------
str
The individual ID string as set in the sample list or EDF header.
"""
return self.edf.get_id()
#------------------------------------------------------------------------
[docs]
def attach_edf( self, f ):
"""Attach an EDF file to this instance.
Parameters
----------
f : str or path-like
Path to the EDF (or EDF+/EDF.gz) file to load.
Returns
-------
object
Status value returned by the C++ backend.
"""
return self.edf.attach_edf( f )
#------------------------------------------------------------------------
[docs]
def attach_annot( self, annot ):
"""Attach an annotation file to this instance.
Parameters
----------
annot : str or path-like
Path to an annotation file (e.g. ``.xml``, ``.annot``, ``.eannot``).
Returns
-------
object
Status value returned by the C++ backend.
"""
return self.edf.attach_annot( annot )
#------------------------------------------------------------------------
[docs]
def stat( self ):
"""Return a DataFrame of basic EDF statistics.
Returns
-------
pandas.DataFrame
Single-column DataFrame (index = statistic name, values in
``'Value'`` column) including record count, sample rates,
duration, etc.
"""
t = pd.DataFrame( self.edf.stat(), index=[0] ).T
t.columns = ["Value"]
return t
#------------------------------------------------------------------------
[docs]
def refresh( self ):
"""Reload the attached EDF from disk and reset the problem flag.
Returns
-------
None
"""
self.edf.refresh()
# also need to reset Luna problem flag
# note: current kludge: problem is proj-wide
# so this will not play well w/ multiple EDFs
# todo: implement inst-specific prob flag
_proj = proj(False)
_proj.reset();
#------------------------------------------------------------------------
[docs]
def refresh_channel_vars( self ):
"""Re-populate channel-type variables (e.g. ${eeg}, ${ecg}) without
re-reading the EDF from disk. Call this after proj.reinit() to restore
the channel-type variables that reinit() clears.
Returns
-------
None
"""
self.edf.refresh_channel_vars()
#------------------------------------------------------------------------
[docs]
def clear_vars(self, keys = None ):
"""Clear individual-level variable(s) for this instance.
Parameters
----------
keys : str, list of str, or set of str, optional
Name(s) of the variable(s) to remove. If omitted, **all**
individual-level variables are cleared.
Returns
-------
None
"""
# all
if keys is None:
self.edf.clear_ivar()
return
# one/some
if isinstance(keys, str):
keys = { keys }
elif type( keys ) is not set:
keys = set( keys )
self.edf.clear_selected_ivar( keys )
#------------------------------------------------------------------------
[docs]
def var( self , key = None , value = None ):
"""Get or set one or more individual-level variables.
Thin alias for :meth:`vars`.
Parameters
----------
key : str, dict, or None, optional
Variable name to get/set, or a ``{name: value}`` dict.
value : str, optional
Value to assign when *key* is a single name.
Returns
-------
str, dict, or None
The variable value(s) when getting; ``None`` when setting.
"""
return self.vars( key , value )
#------------------------------------------------------------------------
[docs]
def vars( self , key = None , value = None ):
"""Get or set one or more individual-level variables.
Individual-level variables (i-vars) are scoped to this instance and
override any project-level variable of the same name.
Parameters
----------
key : str, dict, or None, optional
- ``None``: return all i-vars as a dict.
- ``str``: return the value for that variable (if *value* is
omitted), or set it to *value*.
- ``dict``: set multiple variables from a ``{name: value}`` dict.
value : str, optional
Value to assign when *key* is a single variable name.
Returns
-------
str, dict, or None
The variable value(s) when getting; ``None`` when setting.
"""
# return all i-vars
if key is None:
return self.edf.ivars()
# return one i-var
if value is None and type( key ) is str:
return self.edf.get_ivar( key )
# set from a dict of key-value pairs
if isinstance(key, dict):
for k, v in key.items():
self.vars(k,v)
return
# set a single pair
self.edf.ivar( key , str(value) )
#------------------------------------------------------------------------
[docs]
def desc( self ):
"""Display a one-row summary of this instance's EDF.
Prints a DataFrame with columns:
``['ID', 'Gapped', 'Date', 'Start(hms)', 'Stop(hms)', 'Dur(hms)',
'Dur(s)', '# sigs', '# annots', 'Signals']``.
Returns
-------
None
Output is rendered via ``IPython.display``.
"""
t = pd.DataFrame( self.edf.desc() ).T
t.index = t.index + 1
if len( t ) == 0: return t
t.columns = ["ID","Gapped","Date","Start(hms)","Stop(hms)","Dur(hms)","Dur(s)","# sigs","# annots","Signals" ]
with pd.option_context('display.max_colwidth',None):
if _ipy_display is not None:
_ipy_display(t)
#------------------------------------------------------------------------
[docs]
def channels( self ):
"""Return a DataFrame listing the channels in this EDF.
Returns
-------
pandas.DataFrame
Single-column DataFrame with column ``'Channels'``.
"""
t = pd.DataFrame( self.edf.channels() )
if len( t ) == 0: return t
t.columns = ["Channels"]
return t
#------------------------------------------------------------------------
[docs]
def chs( self ):
"""Return a DataFrame listing the channels in this EDF.
Alias for :meth:`channels`.
Returns
-------
pandas.DataFrame
Single-column DataFrame with column ``'Channels'``.
"""
t = pd.DataFrame( self.edf.channels() )
if len( t ) == 0: return t
t.columns = ["Channels"]
return t
#------------------------------------------------------------------------
#------------------------------------------------------------------------
[docs]
def annots( self ):
"""Return a DataFrame listing the annotation classes in this dataset.
Returns
-------
pandas.DataFrame
Single-column DataFrame with column ``'Annotations'``.
"""
t = pd.DataFrame( self.edf.annots() )
if len( t ) == 0: return t
t.columns = ["Annotations"]
return t
#------------------------------------------------------------------------
[docs]
def fetch_annots( self , anns , interp = -1 ):
"""Return annotation events for one or more classes.
Parameters
----------
anns : str or list of str
Annotation class name(s) to retrieve.
interp : int, optional
Interpolation mode passed to the backend. Default ``-1``
(no interpolation).
Returns
-------
pandas.DataFrame
DataFrame with columns ``['Class', 'Start', 'Stop']`` sorted by
start time. Times are in seconds (rounded to 3 decimal places).
"""
if type( anns ) is not list: anns = [ anns ]
t = pd.DataFrame( self.edf.fetch_annots( anns , interp ) )
if len( t ) == 0: return t
t.columns = ['Class', 'Start', 'Stop' ]
t = t.sort_values(by=['Start', 'Stop', 'Class']).copy()
t['Start'] = t['Start'].round(decimals=3)
t['Stop'] = t['Stop'].round(decimals=3)
return t
#------------------------------------------------------------------------
[docs]
def fetch_fulls_annots( self , anns , add_keys = False ):
"""Return annotation events including instance ID, channel, and metadata.
Parameters
----------
anns : str or list of str
Annotation class name(s) to retrieve.
add_keys : bool, default False
If True, return metadata as ``key=value;key2=value2`` instead of the
legacy value-only ``v1|v2`` format.
Returns
-------
pandas.DataFrame
DataFrame with columns
``['Class', 'Instance', 'Channel', 'Meta', 'Start', 'Stop']``
sorted by start time. Times are in seconds.
"""
if type( anns ) is not list: anns = [ anns ]
t = pd.DataFrame( self.edf.fetch_full_annots( anns , add_keys ) )
if len( t ) == 0: return t
t.columns = ['Class', 'Instance','Channel','Meta','Start', 'Stop' ]
t = t.sort_values(by=['Start', 'Stop', 'Class','Instance']).copy()
t['Start'] = t['Start'].round(decimals=3)
t['Stop'] = t['Stop'].round(decimals=3)
return t
#------------------------------------------------------------------------
[docs]
def eval( self, cmdstr ):
"""Evaluate one or more Luna commands and store results internally.
Results are accumulated in the instance result store and can be
retrieved with :meth:`strata` and :meth:`table`.
Parameters
----------
cmdstr : str
One or more Luna commands, optionally separated by newlines.
Returns
-------
pandas.DataFrame or None
DataFrame of command/strata pairs from the result store after
evaluation (i.e. the result of :meth:`strata`).
"""
self.edf.eval( cmdstr )
return self.strata()
#------------------------------------------------------------------------
[docs]
def eval_dummy( self, cmdstr ):
"""Evaluate commands in dummy (dry-run) mode and return the log text.
Parameters
----------
cmdstr : str
One or more Luna commands.
Returns
-------
str
Console/log text produced by the backend during dry-run
evaluation.
"""
return self.edf.eval_dummy( cmdstr )
#------------------------------------------------------------------------
[docs]
def eval_lunascope( self, cmdstr ):
"""Evaluate commands and return the console log along with results.
Parameters
----------
cmdstr : str
One or more Luna commands.
Returns
-------
object
Console log text returned by the LunaScope backend.
"""
return self.edf.eval_lunascope( cmdstr )
#------------------------------------------------------------------------
[docs]
def proc( self, cmdstr ):
"""Evaluate one or more Luna commands and return results as a dict.
Unlike :meth:`eval`, this method returns the result tables directly
as a dict rather than storing them internally.
Parameters
----------
cmdstr : str
One or more Luna commands, optionally separated by newlines.
Returns
-------
dict
Mapping of ``"COMMAND: STRATA"`` string keys to
``pandas.DataFrame`` result tables.
"""
# < log , tables >
r = self.edf.proc( cmdstr )
# extract and return result tables
return tables( r[1] )
#------------------------------------------------------------------------
[docs]
def silent_proc( self, cmdstr ):
"""Evaluate Luna commands silently and return results as a dict.
Suppresses log output for the duration of the call, then restores
the previous silence state. Primarily used by internal helper
methods (e.g. :meth:`stages`, :meth:`has_staging`).
Parameters
----------
cmdstr : str
One or more Luna commands.
Returns
-------
dict
Mapping of ``"COMMAND: STRATA"`` keys to ``pandas.DataFrame``
result tables.
"""
_proj = proj(False)
silence_mode = _proj.is_silenced()
_proj.silence(True,False)
r = self.edf.proc( cmdstr )
_proj.silence( silence_mode , False )
# extract and return result tables
return tables( r[1] )
#------------------------------------------------------------------------
[docs]
def silent_proc_lunascope( self, cmdstr ):
"""Evaluate Luna commands silently via LunaScope and return results.
Parameters
----------
cmdstr : str
One or more Luna commands.
Returns
-------
dict
Mapping of ``"COMMAND: STRATA"`` keys to ``pandas.DataFrame``
result tables.
"""
_proj = proj(False)
silence_mode = _proj.is_silenced()
_proj.silence(True,False)
r = self.edf.proc_lunascope( cmdstr )
_proj.silence( silence_mode , False )
# extract and return result tables
return tables( r[1] )
#------------------------------------------------------------------------
[docs]
def empty_result_set( self ):
"""Return ``True`` if the instance result store contains no tables.
Returns
-------
bool
``True`` when no results are stored; ``False`` otherwise.
"""
return len( self.edf.strata() ) == 0
#------------------------------------------------------------------------
[docs]
def strata( self ):
"""Return a DataFrame of command/strata pairs from the result store.
Returns
-------
pandas.DataFrame or None
DataFrame with columns ``['Command', 'Strata']``, or ``None``
if the result store is empty.
"""
if ( self.empty_result_set() ): return None
t = pd.DataFrame( self.edf.strata() )
t.columns = ["Command","Strata"]
return t
#------------------------------------------------------------------------
[docs]
def table( self, cmd , strata = 'BL' ):
"""Return a specific output table as a DataFrame.
Parameters
----------
cmd : str
Luna command name (e.g. ``'PSD'``, ``'STAGE'``).
strata : str, optional
Stratum label (e.g. ``'CH_F'``, ``'E'``). Default ``'BL'``.
Returns
-------
pandas.DataFrame or None
Result table, or ``None`` if the result store is empty.
"""
if ( self.empty_result_set() ): return None
r = self.edf.table( cmd , strata )
t = pd.DataFrame( r[1] ).T
t.columns = r[0]
return t
#------------------------------------------------------------------------
[docs]
def variables( self, cmd , strata = 'BL' ):
"""Return the variable names present in a specific output table.
Parameters
----------
cmd : str
Luna command name.
strata : str, optional
Stratum label. Default ``'BL'``.
Returns
-------
list of str or None
Variable (column) names, or ``None`` if the result store is
empty.
"""
if ( self.empty_result_set() ): return None
return self.edf.variables( cmd , strata )
#------------------------------------------------------------------------
[docs]
def e2i( self, epochs ):
"""Convert 1-based epoch numbers to time intervals (nanoseconds).
Parameters
----------
epochs : int or list of int
One or more 1-based epoch indices.
Returns
-------
list of tuple
List of ``(start_ns, stop_ns)`` tuples in internal nanosecond
time units.
"""
if type( epochs ) is not list: epochs = [ epochs ]
return self.edf.e2i( epochs )
# --------------------------------------------------------------------------------
[docs]
def s2i( self, secs ):
"""Convert a duration in seconds to an internal time interval.
Parameters
----------
secs : float
Duration in seconds.
Returns
-------
tuple
``(start_ns, stop_ns)`` interval tuple in nanosecond time units.
"""
return self.edf.s2i( secs )
# --------------------------------------------------------------------------------
[docs]
def data( self, chs , annots = None , time = False ):
"""Return all signal and annotation data for the specified channels.
Parameters
----------
chs : str or list of str
Channel label(s) to extract.
annots : str or list of str, optional
Annotation class(es) to include as binary indicator columns.
time : bool, optional
If ``True``, prepend a time-in-seconds column to the returned
matrix. Default ``False``.
Returns
-------
tuple
``(column_names, data_matrix)`` where *data_matrix* is a
NumPy array with one row per sample.
"""
if type( chs ) is not list: chs = [ chs ]
if annots is not None:
if type( annots ) is not list: annots = [ annots ]
if annots is None: annots = [ ]
return self.edf.data( chs , annots , time )
# --------------------------------------------------------------------------------
[docs]
def slice( self, intervals, chs , annots = None , time = False ):
"""Return signal/annotation data aggregated over a set of intervals.
Concatenates all samples that fall within any of the supplied
intervals into a single matrix.
Parameters
----------
intervals : list of tuple
List of ``(start_ns, stop_ns)`` interval tuples (as returned by
:meth:`e2i` or :meth:`s2i`).
chs : str or list of str
Channel label(s) to extract.
annots : str or list of str, optional
Annotation class(es) to include as indicator columns.
time : bool, optional
If ``True``, prepend a time column. Default ``False``.
Returns
-------
tuple
``(column_names, data_matrix)`` NumPy array for the concatenated
intervals.
"""
if type( chs ) is not list: chs = [ chs ]
if annots is not None:
if type( annots ) is not list: annots = [ annots ]
if annots is None: annots = [ ]
return self.edf.slice( intervals, chs , annots , time )
# --------------------------------------------------------------------------------
[docs]
def slices( self, intervals, chs , annots = None , time = False ):
"""Return separate signal/annotation matrices for each interval.
Unlike :meth:`slice`, each interval produces its own matrix rather
than being concatenated together.
Parameters
----------
intervals : list of tuple
List of ``(start_ns, stop_ns)`` interval tuples.
chs : str or list of str
Channel label(s) to extract.
annots : str or list of str, optional
Annotation class(es) to include as indicator columns.
time : bool, optional
If ``True``, prepend a time column to each matrix. Default
``False``.
Returns
-------
list of tuple
One ``(column_names, data_matrix)`` tuple per interval.
"""
if type( chs ) is not list: chs = [ chs ]
if annots is not None:
if type( annots ) is not list: annots = [ annots ]
if annots is None: annots = [ ]
return self.edf.slices( intervals, chs , annots , time )
# --------------------------------------------------------------------------------
[docs]
def insert_signal( self, label , data , sr ):
"""Insert a new signal into the in-memory EDF.
Parameters
----------
label : str
Channel label for the new signal.
data : array-like
Signal samples as a 1-D sequence.
sr : int
Sample rate in Hz.
Returns
-------
None
"""
return self.edf.insert_signal( label , data , sr )
# --------------------------------------------------------------------------------
[docs]
def update_signal( self, label , data ):
"""Overwrite an existing in-memory signal's sample values.
Parameters
----------
label : str
Channel label of the signal to update.
data : array-like
New sample values (must match the existing channel length).
Returns
-------
None
"""
return self.edf.update_signal( label , data )
# --------------------------------------------------------------------------------
[docs]
def insert_annot( self, label , intervals, durcol2 = False ):
"""Insert annotation events into the in-memory dataset.
Parameters
----------
label : str
Annotation class label.
intervals : list of tuple
List of ``(start, stop)`` or ``(start, duration)`` tuples
depending on *durcol2*.
durcol2 : bool, optional
If ``True``, the second element of each tuple is interpreted as
a duration rather than a stop time. Default ``False``.
Returns
-------
None
"""
return self.edf.insert_annot( label , intervals , durcol2 )
# --------------------------------------------------------------------------------
#
# Luna function wrappers
#
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
[docs]
def freeze( self , f ):
"""Persist the current timeline mask to a named freezer tag.
Parameters
----------
f : str
Freezer tag name.
Returns
-------
None
"""
self.eval( 'FREEZE ' + f )
# --------------------------------------------------------------------------------
[docs]
def thaw( self , f , remove = False ):
"""Restore a previously saved freezer tag.
Parameters
----------
f : str
Freezer tag name.
remove : bool, optional
If ``True``, remove the tag after thawing. Default ``False``.
Returns
-------
None
"""
if remove:
self.eval( 'THAW tag=' + f + ' remove' )
else:
self.eval( 'THAW ' + f )
# --------------------------------------------------------------------------------
[docs]
def empty_freezer( self ):
"""Clear all persisted freezer tags for this instance.
Returns
-------
object
Result of the ``CLEAN-FREEZER`` command evaluation.
"""
self.eval( 'CLEAN-FREEZER' )
# --------------------------------------------------------------------------------
[docs]
def mask( self , f = None ):
"""Apply one or more Luna mask expressions and rebuild epochs.
Parameters
----------
f : str or list of str, optional
One or more Luna ``MASK`` expressions or files to apply.
Epochs are rebuilt (``RE``) after all masks are applied.
Returns
-------
None
"""
if f is None: return
if type(f) is not list: f = [ f ]
[ self.eval( 'MASK ' + _f ) for _f in f ]
self.eval( 'RE' )
# --------------------------------------------------------------------------------
[docs]
def segments( self ):
"""Run ``SEGMENTS`` and return the contiguous segment table.
Returns
-------
pandas.DataFrame
``SEGMENTS: SEG`` table with segment start/stop times.
"""
self.eval( 'SEGMENTS' )
return self.table( 'SEGMENTS' , 'SEG' )
# --------------------------------------------------------------------------------
[docs]
def epoch( self , f = '' ):
"""Run the ``EPOCH`` command with optional arguments.
Parameters
----------
f : str, optional
Additional ``EPOCH`` arguments (e.g. ``'dur=30'``).
Returns
-------
None
"""
self.eval( 'EPOCH ' + f )
# --------------------------------------------------------------------------------
[docs]
def epochs( self ):
"""Run ``EPOCH table`` and return a compact epoch summary DataFrame.
Returns
-------
pandas.DataFrame
Columns ``['E', 'E1', 'LABEL', 'HMS', 'START', 'STOP', 'DUR']``
with one row per epoch.
"""
self.eval( 'EPOCH table' )
df = self.table( 'EPOCH' , 'E' )
df = df[[ 'E', 'E1', 'LABEL', 'HMS', 'START','STOP','DUR' ]]
#df = df.drop(columns = ['ID','TP','MID','INTERVAL'] )
return df
# --------------------------------------------------------------------------------
# tfview : spectral regional viewer
# for high-def plots:
# import matplotlib as mpl
# mpl.rcParams['figure.dpi'] = 300
[docs]
def tfview( self , ch,
e = None , t = None , a = None,
tw = 2, sec = 2 , inc = 0.1 ,
f = ( 0.5 , 30 ) , winsor = 0.025 ,
anns = None , norm = None ,
traces = True,
xlines = None , ylines = None , silent = True , pal = 'turbo' ):
"""Generate an MTM time-frequency spectrogram for a single channel.
Runs the Luna ``MTM`` command over the requested time window and
renders a Matplotlib spectrogram, optionally with the raw signal
trace above it.
Parameters
----------
ch : str
Channel label (single channel only).
Selection of intervals
~~~~~~~~~~~~~~~~~~~~~~
e : int or list of two ints, optional
Epoch range: a single epoch number or ``[start_epoch, stop_epoch]``
(both 1-based).
t : list of two floats, optional
Explicit time window ``[start_sec, stop_sec]``. Used when *e*
is not provided.
a : object, optional
Reserved for future use (annotation-based selection).
Spectrogram parameters
~~~~~~~~~~~~~~~~~~~~~~
tw : float, optional
MTM half-bandwidth parameter. Default ``2``.
sec : float, optional
Segment length in seconds for MTM sliding window. Default ``2``.
inc : float, optional
Increment between segment centres in seconds. Default ``0.1``.
f : tuple of (float, float), optional
Frequency range ``(min_hz, max_hz)``. Default ``(0.5, 30)``.
winsor : float, optional
Winsorisation proportion applied to the power values before
colour-mapping. Default ``0.025``.
norm : str, optional
Normalisation mode. Use ``'t'`` for time-wise z-scoring.
Default ``None`` (no normalisation).
Display options
~~~~~~~~~~~~~~~
traces : bool, optional
If ``True`` (default), show the raw signal trace above the
spectrogram.
anns : str or list of str, optional
Annotation class(es) to overlay on the trace panel.
xlines : object, optional
Reserved for future use.
ylines : object, optional
Reserved for future use.
silent : bool, optional
Run the MTM command silently. Default ``True``.
pal : str, optional
Matplotlib colour map name for the spectrogram. Default
``'turbo'``.
Returns
-------
None
The plot is rendered inline.
"""
# for now, accept only a single channel
assert type(ch) is str
# units
hdr = self.headers()
units = dict( zip( hdr.CH , hdr.PDIM ) )
# define window
w = None
if type(e) is list and len(e) == 2 :
w = self.e2i( e )
w = [ i for tuple in w for i in tuple ]
w = [ min(w) , max(w) ]
elif type(e) is int:
w = self.e2i( e )
w = [ i for tuple in w for i in tuple ]
elif type( t ) is list and len( t ) == 2:
w = t
if w is None: return
# window in seconds
ws = [ x * 1e-9 for x in w ]
ls = ws[1] - ws[0]
# build command
cstr = 'MTM dB segment-sec=' + str(sec) + ' segment-inc=' + str(inc) + ' tw=' + str(tw)
cstr += ' segment-spectra segment-output sig=' + ','.join( [ ch ] )
cstr += ' start=' + str(ws[0]) + ' stop=' + str(ws[1])
if f is not None: cstr += ' min=' + str(f[0]) + ' max=' + str(f[1])
# run MTM
if silent is True: self.silent_proc( cstr )
else: self.proc( cstr )
if self.empty_result_set(): return
# extract
tf = self.table( 'MTM' , 'CH_F_SEG' )
tf = tf.astype({'SEG': int })
tf.drop( 'ID' , axis=1, inplace=True)
tt = self.table('MTM','CH_SEG')
tt = tt.astype({'SEG': int })
tt['T'] = tt[['START', 'STOP']].mean(axis=1)
tt.drop( ['ID','DISC','START','STOP'] , axis=1, inplace=True)
m = pd.merge( tt ,tf , on= ['CH','SEG'] )
x = m['T'].to_numpy(dtype=float)
y = m['F'].to_numpy(dtype=float)
z = m['MTM'].to_numpy(dtype=float)
u = m['T'].unique()
# normalize?
if norm == 't':
groups = m.groupby(['CH','F'])[['MTM']]
mean, std = groups.transform("mean"), groups.transform("std")
mz = (m[mean.columns] - mean) / std
z = mz['MTM'].to_numpy(dtype=float)
# clip y-axes to observed
if max(y) < f[1]: f = (f[0] , max(y))
if min(y) > f[0]: f = (min(y) , f[1])
# get time domain signal/annotations
d = self.slices( [ (w[0] , w[1] )] , chs = ch , annots = anns , time = True )
dt = d[1][0]
tx = dt[:,0]
dvs = d[0][1:]
# make spectrogram
xn = np.unique(x).size
yn = np.unique(y).size
# winsorize power
z = winsorize( z , limits=[winsor, winsor] )
zi, yi, xi = np.histogram2d(y, x, bins=(yn,xn), weights=z, density=False )
counts, _, _ = np.histogram2d(y, x, bins=(yn,xn))
with np.errstate(divide='ignore', invalid='ignore'): zi = zi / counts
zi = np.ma.masked_invalid(zi)
# do plot
if traces is True:
fig, axs = plt.subplots( nrows = 2 , ncols = 1 , sharex=True, height_ratios=[1, 2] )
axs[0].set_title( self.id() )
fig.set_figheight(5)
fig.set_figwidth(15)
axs[0].set_ylabel( ch + ' (' + units[ch] + ')' )
axs[0].set(xlim=ws)
axs[1].set(xlim=ws, ylim=f)
axs[1].set_xlabel('Time (secs)')
axs[1].set_ylabel('Frequency (Hz)')
p1 = axs[1].pcolormesh(xi, yi, zi, cmap = pal )
fig.colorbar(p1, orientation="horizontal", drawedges = False, shrink = 0.2 , pad = 0.3)
[ axs[0].plot( tx , dt[:,di+1] , label = dvs[di] , linewidth=0.5 ) for di in range(0,len(dvs)) if dvs[di] in [ ch ] ]
if traces is False:
fig, ax = plt.subplots( nrows = 1 , ncols = 1 , sharex=True )
ax.set_title( self.id() )
fig.set_figheight(5)
fig.set_figwidth(15)
ax.set(xlim=ws, ylim=f)
ax.set_xlabel('Time (secs)')
ax.set_ylabel('Frequency (Hz)')
p1 = ax.pcolormesh(xi, yi, zi, cmap = pal )
fig.colorbar(p1, orientation="horizontal", drawedges = False, shrink = 0.2 , pad = 0.3)
return
# --------------------------------------------------------------------------------
[docs]
def pops( self, s = None, s1 = None , s2 = None,
path = None , lib = None ,
do_edger = True ,
no_filter = False ,
do_reref = False ,
m = None , m1 = None , m2 = None ,
lights_off = '.' , lights_on = '.' ,
ignore_obs = False,
args = '' ):
"""Run the POPS automatic sleep stager on this individual.
POPS (Population-based sleep staging) assigns sleep stages to each
epoch using a pre-trained model. Use *s* alone for single-channel
mode, or *s1*/*s2* together for two-channel mode.
Parameters
----------
s : str, optional
EEG channel label for single-channel staging.
s1 : str, optional
First EEG channel for two-channel staging.
s2 : str, optional
Second EEG channel for two-channel staging.
path : str, optional
Path to the POPS resource folder. Defaults to
``resources.POPS_PATH``.
lib : str, optional
POPS library name. Defaults to ``resources.POPS_LIB`` (``'s2'``).
do_edger : bool, optional
Apply EDGER artifact detection. Default ``True``.
no_filter : bool, optional
Skip bandpass pre-filtering. Default ``False``.
do_reref : bool, optional
Re-reference the EEG to a mastoid channel. Default ``False``.
m : str, optional
Mastoid channel for single-channel re-referencing (required when
*do_reref* is ``True`` and *s* is set).
m1 : str, optional
First mastoid for two-channel re-referencing.
m2 : str, optional
Second mastoid for two-channel re-referencing.
lights_off : str, optional
Lights-off time ``'HH:MM:SS'`` or ``'.'`` if unknown.
Default ``'.'``.
lights_on : str, optional
Lights-on time ``'HH:MM:SS'`` or ``'.'`` if unknown.
Default ``'.'``.
ignore_obs : bool, optional
Ignore observed (manual) staging annotations. Default ``False``.
args : str, optional
Additional options to append to the ``POPS`` command.
Default ``''``.
Returns
-------
pandas.DataFrame or str
Per-epoch staging results from the ``POPS: E`` table, or an
error string if the resource path cannot be opened.
"""
if path is None: path = resources.POPS_PATH
if lib is None: lib = resources.POPS_LIB
import os
if not os.path.isdir( path ):
return 'could not open POPS resource path ' + path
if s is None and s1 is None:
print( 'must set s or s1 and s2 to EEGs' )
return
if ( s1 is None ) != ( s2 is None ):
print( 'must set s or s1 and s2 to EEGs' )
return
# POPS templates may reference mastoid vars even when do_reref is false.
# Provide safe placeholders unless rereferencing is explicitly requested.
if do_reref:
if s is not None and m is None:
print( 'must set m when do_reref is True in single-channel mode' )
return
if s1 is not None and ( m1 is None or m2 is None ):
print( 'must set m1 and m2 when do_reref is True in two-channel mode' )
return
else:
if m is None: m = '.'
if m1 is None: m1 = '.'
if m2 is None: m2 = '.'
# set options
self.var( 'mpath' , path )
self.var( 'lib' , lib )
self.var( 'do_edger' , '1' if do_edger else '0' )
self.var( 'do_reref' , '1' if do_reref else '0' )
self.var( 'no_filter' , '1' if no_filter else '0' )
self.var( 'LOFF' , lights_off )
self.var( 'LON' , lights_on )
if s is not None: self.var( 's' , s )
else: self.clear_vars( 's' )
if m is not None: self.var( 'm' , m )
else: self.clear_vars( 'm' )
if s1 is not None: self.var( 's1' , s1 )
else: self.clear_vars( 's1' )
if s2 is not None: self.var( 's2' , s2 )
else: self.clear_vars( 's2' )
if m1 is not None: self.var( 'm1' , m1 )
else: self.clear_vars( 'm1' )
if m2 is not None: self.var( 'm2' , m2 )
else: self.clear_vars( 'm2' )
# get either one- or two-channel mode Luna script from POPS folder
twoch = s1 is not None and s2 is not None;
if twoch: cmdstr = cmdfile( path + '/s2.ch2.txt' )
else: cmdstr = cmdfile( path + '/s2.ch1.txt' )
# swap in any additional options to POPS
if ignore_obs is True:
args = args + ' ignore-obs-staging';
if do_edger is True:
cmdstr = cmdstr.replace( 'EDGER' , 'EDGER all' )
if args != '':
cmdstr = cmdstr.replace( 'POPS' , 'POPS ' + args + ' ')
# run the command
self.proc( cmdstr )
# return of results
return self.table( 'POPS' , 'E' )
# --------------------------------------------------------------------------------
[docs]
def predict_SUN2019( self, cen , age = None , th = '3' , path = None ):
"""Run the SUN2019 EEG-based brain-age prediction model for this individual.
Parameters
----------
cen : str or list of str
EEG centroid channel label(s). A list is joined with commas.
age : float or str
Chronological age (years) for this individual. Required.
th : str or int, optional
Outlier threshold in standard deviations. Default ``'3'``.
path : str, optional
Path to the Luna models folder. Defaults to
``resources.MODEL_PATH``.
Returns
-------
pandas.DataFrame
Prediction results from the ``PREDICT`` command.
"""
if path is None: path = resources.MODEL_PATH
if type( cen ) is list : cen = ','.join( cen )
# set i-vars
if age is None:
print( 'need to set age indiv-var' )
return
self.var( 'age' , str(age) )
self.var( 'cen' , cen )
self.var( 'mpath' , path )
self.var( 'th' , str(th) )
self.eval( cmdfile( resources.MODEL_PATH + '/m1-adult-age-luna.txt' ) )
return self.table( 'PREDICT' )
# --------------------------------------------------------------------------------
[docs]
def stages(self):
"""Return a DataFrame of per-epoch sleep stage assignments.
Runs the Luna ``STAGE`` command silently and extracts the
``STAGE: E`` result table.
Returns
-------
pandas.DataFrame or None
Table with one row per epoch and a ``STAGE`` column, or
``None`` if no staging annotations are present.
"""
hyp = self.silent_proc( "STAGE" )
if type(hyp) is type(None): return
if 'STAGE: E' in hyp:
return hyp[ 'STAGE: E' ]
return
# --------------------------------------------------------------------------------
[docs]
def hypno(self):
"""Plot a hypnogram of the sleep stages for this individual.
Requires that staging annotations are already attached.
Returns
-------
object
Return value from :func:`lunapi.viz.hypno` (typically a
Plotly figure).
"""
if self.has_staging() is not True:
print( "no staging attached" )
return
return hypno( self.stages()[ 'STAGE' ] )
# --------------------------------------------------------------------------------
[docs]
def has_staging(self):
"""Return whether sleep staging annotations are present for this instance.
Returns
-------
bool
``True`` if staging annotations are attached; ``False`` otherwise.
"""
_proj = proj(False)
silence_mode = _proj.is_silenced()
_proj.silence(True,False)
res = self.edf.has_staging()
_proj.silence( silence_mode , False )
return res
# --------------------------------------------------------------------------------
[docs]
def has_annots(self,anns):
"""Return a boolean vector indicating which annotation classes are present.
Parameters
----------
anns : str or list of str
One or more annotation class names to check.
Returns
-------
list of bool
One element per entry in *anns*: ``True`` if that class exists.
"""
if anns is None: return
if type( anns ) is not list: anns = [ anns ]
return self.edf.has_annots( anns )
# --------------------------------------------------------------------------------
[docs]
def has_annot(self,anns):
"""Return a boolean vector indicating which annotation classes are present.
Alias for :meth:`has_annots`.
Parameters
----------
anns : str or list of str
One or more annotation class names to check.
Returns
-------
list of bool
One element per entry in *anns*: ``True`` if that class exists.
"""
return self.has_annots(anns)
# --------------------------------------------------------------------------------
[docs]
def has_channels(self,ch):
"""Return a boolean vector indicating which channels are present.
Parameters
----------
ch : str or list of str
One or more channel labels to check.
Returns
-------
list of bool
One element per entry in *ch*: ``True`` if that channel exists.
"""
if ch is None: return
if type(ch) is not list: ch = [ ch ]
return self.edf.has_channels( ch )
# --------------------------------------------------------------------------------
[docs]
def has(self,ch):
"""Return a boolean vector indicating which channels are present.
Alias for :meth:`has_channels`.
Parameters
----------
ch : str or list of str
One or more channel labels to check.
Returns
-------
list of bool
One element per entry in *ch*: ``True`` if that channel exists.
"""
if ch is None: return
if type(ch) is not list: ch = [ ch ]
return self.edf.has_channels( ch )
# --------------------------------------------------------------------------------
# def psd(self, ch, minf = None, maxf = 25, minp = None, maxp = None , xlines = None , ylines = None ):
# """Spectrogram plot for a given channel 'ch'"""
# if type( ch ) is not str: return
# if all( self.has( ch ) ) is not True: return
# res = self.silent_proc( 'PSD spectrum dB max=' + str(maxf) + ' sig=' + ','.join(ch) )[ 'PSD: CH_F' ]
# return psd( res , ch, minf = minf, maxf = maxf, minp = minp, maxp = maxp , xlines = xlines , ylines = ylines )
# --------------------------------------------------------------------------------
[docs]
def psd( self, ch, var = 'PSD' , minf = None, maxf = 25, minp = None, maxp = None , xlines = None , ylines = None ):
"""Plot the power spectral density for one or more channels.
Runs the Luna ``PSD`` (default) or ``MTM`` command and renders the
result via :func:`lunapi.viz.psd`.
Parameters
----------
ch : str or list of str
Channel label(s) to plot.
var : str, optional
Spectral estimator to use: ``'PSD'`` (Welch) or ``'MTM'``
(multitaper). Default ``'PSD'``.
minf : float, optional
Minimum frequency (Hz) for the x-axis. Default ``None``
(use estimator minimum).
maxf : float, optional
Maximum frequency (Hz) for the x-axis and Luna command.
Default ``25``.
minp : float, optional
Minimum power for the y-axis. Default ``None`` (auto-scale).
maxp : float, optional
Maximum power for the y-axis. Default ``None`` (auto-scale).
xlines : list of float, optional
Vertical reference lines at these frequencies.
ylines : list of float, optional
Horizontal reference lines at these power values.
Returns
-------
None
The plot is rendered inline.
"""
if ch is None: return
if type(ch) is not list: ch = [ ch ]
if var == 'PSD':
res = self.silent_proc( 'PSD spectrum dB max=' + str(maxf) + ' sig=' + ','.join(ch) )
df = res[ 'PSD: CH_F' ]
else:
res = self.silent_proc( 'MTM tw=15 dB max=' + str(maxf) + ' sig=' + ','.join(ch) )
df = res[ 'MTM: CH_F' ]
psd( df = df , ch = ch , var = var ,
minf = minf , maxf = maxf , minp = minp , maxp = maxp ,
xlines = xlines , ylines = ylines )
# --------------------------------------------------------------------------------
# def spec( self, ch, var = 'PSD' , mine = None, maxe = None, minf = None, maxf = 25 , w = 0.025 ):
# """Generates an epoch-level PSD spectrogram (from PSD or MTM)"""
# if ch is None: return
# if type(ch) is not list: ch = [ ch ]
#
# if var == 'PSD':
# self.eval( 'PSD epoch-spectrum dB max=' + str(maxf) + ' sig=' + ','.join(ch) )
# df = self.table( 'PSD' , 'CH_E_F' )
# else:
# self.eval( 'MTM epoch-spectra epoch epoch-output dB tw=15 max=' + str(maxf) + ' sig=' + ','.join(ch) )
# df = self.table( 'MTM' , 'CH_E_F' )
#
# spec( df = df , ch = None , var = var ,
# mine = mine , maxe = maxe , minf = minf , maxf = maxf , w = w )
# --------------------------------------------------------------------------------
[docs]
def spec(self,ch,mine = None , maxe = None , minf = None, maxf = None, w = 0.025 ):
"""Plot an epoch-by-frequency PSD spectrogram for a single channel.
Runs Luna ``PSD epoch-spectrum`` silently and renders the result via
:func:`lunapi.viz.spec`.
Parameters
----------
ch : str
Channel label (single channel).
mine : int, optional
First epoch to include. Default ``None`` (all epochs).
maxe : int, optional
Last epoch to include. Default ``None`` (all epochs).
minf : float, optional
Minimum frequency (Hz). Default ``0.5``.
maxf : float, optional
Maximum frequency (Hz). Default ``25``.
w : float, optional
Winsorisation proportion applied to power values.
Default ``0.025``.
Returns
-------
object
Return value from :func:`lunapi.viz.spec`.
"""
if type( ch ) is not str:
return
if all( self.has( ch ) ) is not True:
return
if minf is None:
minf=0.5
if maxf is None:
maxf=25
res = self.silent_proc( "PSD epoch-spectrum dB sig="+ch+" min="+str(minf)+" max="+str(maxf) )[ 'PSD: CH_E_F' ]
return spec( res , ch=ch, var='PSD', mine=mine,maxe=maxe,minf=minf,maxf=maxf,w=w)
__all__ = ["inst"]