"""Instance-level Luna API.
This module exports :class:`inst`, the per-record wrapper around EDF data,
annotation access, evaluation, and in-memory signal editing operations.
"""
import lunapi.lunapi0 as _luna
import pandas as pd
import numpy as np
from scipy.stats.mstats import winsorize
from scipy.signal import sosfilt
import matplotlib.pyplot as plt
from matplotlib import cm
from IPython.core import display as ICD
import plotly.graph_objects as go
import time
import pathlib
import os
from IPython.display import display
from .project import proj
from .resources import resources
from .results import tables, cmdfile
def hypno(*args, **kwargs):
"""Lazy proxy to :func:`lunapi.viz.hypno` to avoid circular imports."""
from .viz import hypno as _hypno
return _hypno(*args, **kwargs)
def psd(*args, **kwargs):
"""Lazy proxy to :func:`lunapi.viz.psd` to avoid circular imports."""
from .viz import psd as _psd
return _psd(*args, **kwargs)
def spec(*args, **kwargs):
"""Lazy proxy to :func:`lunapi.viz.spec` to avoid circular imports."""
from .viz import spec as _spec
return _spec(*args, **kwargs)
[docs]
class inst:
"""Wrapper around a single EDF record (signals, annotations, and results).
An :class:`inst` object represents one individual/study and provides
methods to:
- attach EDF and annotation files
- inspect channel headers and annotation classes
- set individual-level variables
- run Luna commands and retrieve the resulting tables
- extract raw signal data and annotation events
- insert or update in-memory signals and annotations
- produce quick visualisations (hypnogram, PSD, spectrogram)
Instances are normally obtained through :meth:`proj.inst` or created
directly for stand-alone EDF access::
p = proj()
individual = p.inst(1) # from a sample list
individual = inst('path/to.edf') # direct file path
"""
def __init__(self,p=None):
if ( isinstance(p,str) ):
self.edf = _luna.inst(p)
elif (isinstance(p,_luna.inst)):
self.edf = p
else:
self.edf = _luna.inst()
def __repr__(self):
return f'{self.edf}'
#------------------------------------------------------------------------
[docs]
def id(self):
"""Return the identifier for this instance.
Returns
-------
str
The individual ID string as set in the sample list or EDF header.
"""
return self.edf.get_id()
#------------------------------------------------------------------------
[docs]
def attach_edf( self, f ):
"""Attach an EDF file to this instance.
Parameters
----------
f : str or path-like
Path to the EDF (or EDF+/EDF.gz) file to load.
Returns
-------
object
Status value returned by the C++ backend.
"""
return self.edf.attach_edf( f )
#------------------------------------------------------------------------
[docs]
def attach_annot( self, annot ):
"""Attach an annotation file to this instance.
Parameters
----------
annot : str or path-like
Path to an annotation file (e.g. ``.xml``, ``.annot``, ``.eannot``).
Returns
-------
object
Status value returned by the C++ backend.
"""
return self.edf.attach_annot( annot )
#------------------------------------------------------------------------
[docs]
def stat( self ):
"""Return a DataFrame of basic EDF statistics.
Returns
-------
pandas.DataFrame
Single-column DataFrame (index = statistic name, values in
``'Value'`` column) including record count, sample rates,
duration, etc.
"""
t = pd.DataFrame( self.edf.stat(), index=[0] ).T
t.columns = ["Value"]
return t
#------------------------------------------------------------------------
[docs]
def refresh( self ):
"""Reload the attached EDF from disk and reset the problem flag.
Returns
-------
None
"""
self.edf.refresh()
# also need to reset Luna problem flag
# note: current kludge: problem is proj-wide
# so this will not play well w/ multiple EDFs
# todo: implement inst-specific prob flag
_proj = proj(False)
_proj.reset();
#------------------------------------------------------------------------
[docs]
def clear_vars(self, keys = None ):
"""Clear individual-level variable(s) for this instance.
Parameters
----------
keys : str, list of str, or set of str, optional
Name(s) of the variable(s) to remove. If omitted, **all**
individual-level variables are cleared.
Returns
-------
None
"""
# all
if keys is None:
self.edf.clear_ivar()
return
# one/some
if isinstance(keys, str):
keys = { keys }
elif type( keys ) is not set:
keys = set( keys )
self.edf.clear_selected_ivar( keys )
#------------------------------------------------------------------------
[docs]
def var( self , key = None , value = None ):
"""Get or set one or more individual-level variables.
Thin alias for :meth:`vars`.
Parameters
----------
key : str, dict, or None, optional
Variable name to get/set, or a ``{name: value}`` dict.
value : str, optional
Value to assign when *key* is a single name.
Returns
-------
str, dict, or None
The variable value(s) when getting; ``None`` when setting.
"""
return self.vars( key , value )
#------------------------------------------------------------------------
[docs]
def vars( self , key = None , value = None ):
"""Get or set one or more individual-level variables.
Individual-level variables (i-vars) are scoped to this instance and
override any project-level variable of the same name.
Parameters
----------
key : str, dict, or None, optional
- ``None``: return all i-vars as a dict.
- ``str``: return the value for that variable (if *value* is
omitted), or set it to *value*.
- ``dict``: set multiple variables from a ``{name: value}`` dict.
value : str, optional
Value to assign when *key* is a single variable name.
Returns
-------
str, dict, or None
The variable value(s) when getting; ``None`` when setting.
"""
# return all i-vars
if key is None:
return self.edf.ivars()
# return one i-var
if value is None and type( key ) is str:
return self.edf.get_ivar( key )
# set from a dict of key-value pairs
if isinstance(key, dict):
for k, v in key.items():
self.vars(k,v)
return
# set a single pair
self.edf.ivar( key , str(value) )
#------------------------------------------------------------------------
[docs]
def desc( self ):
"""Display a one-row summary of this instance's EDF.
Prints a DataFrame with columns:
``['ID', 'Gapped', 'Date', 'Start(hms)', 'Stop(hms)', 'Dur(hms)',
'Dur(s)', '# sigs', '# annots', 'Signals']``.
Returns
-------
None
Output is rendered via ``IPython.display``.
"""
t = pd.DataFrame( self.edf.desc() ).T
t.index = t.index + 1
if len( t ) == 0: return t
t.columns = ["ID","Gapped","Date","Start(hms)","Stop(hms)","Dur(hms)","Dur(s)","# sigs","# annots","Signals" ]
with pd.option_context('display.max_colwidth',None):
display(t)
#------------------------------------------------------------------------
[docs]
def channels( self ):
"""Return a DataFrame listing the channels in this EDF.
Returns
-------
pandas.DataFrame
Single-column DataFrame with column ``'Channels'``.
"""
t = pd.DataFrame( self.edf.channels() )
if len( t ) == 0: return t
t.columns = ["Channels"]
return t
#------------------------------------------------------------------------
[docs]
def chs( self ):
"""Return a DataFrame listing the channels in this EDF.
Alias for :meth:`channels`.
Returns
-------
pandas.DataFrame
Single-column DataFrame with column ``'Channels'``.
"""
t = pd.DataFrame( self.edf.channels() )
if len( t ) == 0: return t
t.columns = ["Channels"]
return t
#------------------------------------------------------------------------
#------------------------------------------------------------------------
[docs]
def annots( self ):
"""Return a DataFrame listing the annotation classes in this dataset.
Returns
-------
pandas.DataFrame
Single-column DataFrame with column ``'Annotations'``.
"""
t = pd.DataFrame( self.edf.annots() )
if len( t ) == 0: return t
t.columns = ["Annotations"]
return t
#------------------------------------------------------------------------
[docs]
def fetch_annots( self , anns , interp = -1 ):
"""Return annotation events for one or more classes.
Parameters
----------
anns : str or list of str
Annotation class name(s) to retrieve.
interp : int, optional
Interpolation mode passed to the backend. Default ``-1``
(no interpolation).
Returns
-------
pandas.DataFrame
DataFrame with columns ``['Class', 'Start', 'Stop']`` sorted by
start time. Times are in seconds (rounded to 3 decimal places).
"""
if type( anns ) is not list: anns = [ anns ]
t = pd.DataFrame( self.edf.fetch_annots( anns , interp ) )
if len( t ) == 0: return t
t.columns = ['Class', 'Start', 'Stop' ]
t = t.sort_values(by=['Start', 'Stop', 'Class'])
t['Start'] = t['Start'].round(decimals=3)
t['Stop'] = t['Stop'].round(decimals=3)
return t
#------------------------------------------------------------------------
[docs]
def fetch_fulls_annots( self , anns ):
"""Return annotation events including instance ID, channel, and metadata.
Parameters
----------
anns : str or list of str
Annotation class name(s) to retrieve.
Returns
-------
pandas.DataFrame
DataFrame with columns
``['Class', 'Instance', 'Channel', 'Meta', 'Start', 'Stop']``
sorted by start time. Times are in seconds.
"""
if type( anns ) is not list: anns = [ anns ]
t = pd.DataFrame( self.edf.fetch_full_annots( anns ) )
if len( t ) == 0: return t
t.columns = ['Class', 'Instance','Channel','Meta','Start', 'Stop' ]
t = t.sort_values(by=['Start', 'Stop', 'Class','Instance'])
t['Start'] = t['Start'].round(decimals=3)
t['Stop'] = t['Stop'].round(decimals=3)
return t
#------------------------------------------------------------------------
[docs]
def eval( self, cmdstr ):
"""Evaluate one or more Luna commands and store results internally.
Results are accumulated in the instance result store and can be
retrieved with :meth:`strata` and :meth:`table`.
Parameters
----------
cmdstr : str
One or more Luna commands, optionally separated by newlines.
Returns
-------
pandas.DataFrame or None
DataFrame of command/strata pairs from the result store after
evaluation (i.e. the result of :meth:`strata`).
"""
self.edf.eval( cmdstr )
return self.strata()
#------------------------------------------------------------------------
[docs]
def eval_dummy( self, cmdstr ):
"""Evaluate commands in dummy (dry-run) mode and return the log text.
Parameters
----------
cmdstr : str
One or more Luna commands.
Returns
-------
str
Console/log text produced by the backend during dry-run
evaluation.
"""
return self.edf.eval_dummy( cmdstr )
#------------------------------------------------------------------------
[docs]
def eval_lunascope( self, cmdstr ):
"""Evaluate commands and return the console log along with results.
Parameters
----------
cmdstr : str
One or more Luna commands.
Returns
-------
object
Console log text returned by the LunaScope backend.
"""
return self.edf.eval_lunascope( cmdstr )
#------------------------------------------------------------------------
[docs]
def proc( self, cmdstr ):
"""Evaluate one or more Luna commands and return results as a dict.
Unlike :meth:`eval`, this method returns the result tables directly
as a dict rather than storing them internally.
Parameters
----------
cmdstr : str
One or more Luna commands, optionally separated by newlines.
Returns
-------
dict
Mapping of ``"COMMAND: STRATA"`` string keys to
``pandas.DataFrame`` result tables.
"""
# < log , tables >
r = self.edf.proc( cmdstr )
# extract and return result tables
return tables( r[1] )
#------------------------------------------------------------------------
[docs]
def silent_proc( self, cmdstr ):
"""Evaluate Luna commands silently and return results as a dict.
Suppresses log output for the duration of the call, then restores
the previous silence state. Primarily used by internal helper
methods (e.g. :meth:`stages`, :meth:`has_staging`).
Parameters
----------
cmdstr : str
One or more Luna commands.
Returns
-------
dict
Mapping of ``"COMMAND: STRATA"`` keys to ``pandas.DataFrame``
result tables.
"""
_proj = proj(False)
silence_mode = _proj.is_silenced()
_proj.silence(True,False)
r = self.edf.proc( cmdstr )
_proj.silence( silence_mode , False )
# extract and return result tables
return tables( r[1] )
#------------------------------------------------------------------------
[docs]
def silent_proc_lunascope( self, cmdstr ):
"""Evaluate Luna commands silently via LunaScope and return results.
Parameters
----------
cmdstr : str
One or more Luna commands.
Returns
-------
dict
Mapping of ``"COMMAND: STRATA"`` keys to ``pandas.DataFrame``
result tables.
"""
_proj = proj(False)
silence_mode = _proj.is_silenced()
_proj.silence(True,False)
r = self.edf.proc_lunascope( cmdstr )
_proj.silence( silence_mode , False )
# extract and return result tables
return tables( r[1] )
#------------------------------------------------------------------------
[docs]
def empty_result_set( self ):
"""Return ``True`` if the instance result store contains no tables.
Returns
-------
bool
``True`` when no results are stored; ``False`` otherwise.
"""
return len( self.edf.strata() ) == 0
#------------------------------------------------------------------------
[docs]
def strata( self ):
"""Return a DataFrame of command/strata pairs from the result store.
Returns
-------
pandas.DataFrame or None
DataFrame with columns ``['Command', 'Strata']``, or ``None``
if the result store is empty.
"""
if ( self.empty_result_set() ): return None
t = pd.DataFrame( self.edf.strata() )
t.columns = ["Command","Strata"]
return t
#------------------------------------------------------------------------
[docs]
def table( self, cmd , strata = 'BL' ):
"""Return a specific output table as a DataFrame.
Parameters
----------
cmd : str
Luna command name (e.g. ``'PSD'``, ``'STAGE'``).
strata : str, optional
Stratum label (e.g. ``'CH_F'``, ``'E'``). Default ``'BL'``.
Returns
-------
pandas.DataFrame or None
Result table, or ``None`` if the result store is empty.
"""
if ( self.empty_result_set() ): return None
r = self.edf.table( cmd , strata )
t = pd.DataFrame( r[1] ).T
t.columns = r[0]
return t
#------------------------------------------------------------------------
[docs]
def variables( self, cmd , strata = 'BL' ):
"""Return the variable names present in a specific output table.
Parameters
----------
cmd : str
Luna command name.
strata : str, optional
Stratum label. Default ``'BL'``.
Returns
-------
list of str or None
Variable (column) names, or ``None`` if the result store is
empty.
"""
if ( self.empty_result_set() ): return None
return self.edf.variables( cmd , strata )
#------------------------------------------------------------------------
[docs]
def e2i( self, epochs ):
"""Convert 1-based epoch numbers to time intervals (nanoseconds).
Parameters
----------
epochs : int or list of int
One or more 1-based epoch indices.
Returns
-------
list of tuple
List of ``(start_ns, stop_ns)`` tuples in internal nanosecond
time units.
"""
if type( epochs ) is not list: epochs = [ epochs ]
return self.edf.e2i( epochs )
# --------------------------------------------------------------------------------
[docs]
def s2i( self, secs ):
"""Convert a duration in seconds to an internal time interval.
Parameters
----------
secs : float
Duration in seconds.
Returns
-------
tuple
``(start_ns, stop_ns)`` interval tuple in nanosecond time units.
"""
return self.edf.s2i( secs )
# --------------------------------------------------------------------------------
[docs]
def data( self, chs , annots = None , time = False ):
"""Return all signal and annotation data for the specified channels.
Parameters
----------
chs : str or list of str
Channel label(s) to extract.
annots : str or list of str, optional
Annotation class(es) to include as binary indicator columns.
time : bool, optional
If ``True``, prepend a time-in-seconds column to the returned
matrix. Default ``False``.
Returns
-------
tuple
``(column_names, data_matrix)`` where *data_matrix* is a
NumPy array with one row per sample.
"""
if type( chs ) is not list: chs = [ chs ]
if annots is not None:
if type( annots ) is not list: annots = [ annots ]
if annots is None: annots = [ ]
return self.edf.data( chs , annots , time )
# --------------------------------------------------------------------------------
[docs]
def slice( self, intervals, chs , annots = None , time = False ):
"""Return signal/annotation data aggregated over a set of intervals.
Concatenates all samples that fall within any of the supplied
intervals into a single matrix.
Parameters
----------
intervals : list of tuple
List of ``(start_ns, stop_ns)`` interval tuples (as returned by
:meth:`e2i` or :meth:`s2i`).
chs : str or list of str
Channel label(s) to extract.
annots : str or list of str, optional
Annotation class(es) to include as indicator columns.
time : bool, optional
If ``True``, prepend a time column. Default ``False``.
Returns
-------
tuple
``(column_names, data_matrix)`` NumPy array for the concatenated
intervals.
"""
if type( chs ) is not list: chs = [ chs ]
if annots is not None:
if type( annots ) is not list: annots = [ annots ]
if annots is None: annots = [ ]
return self.edf.slice( intervals, chs , annots , time )
# --------------------------------------------------------------------------------
[docs]
def slices( self, intervals, chs , annots = None , time = False ):
"""Return separate signal/annotation matrices for each interval.
Unlike :meth:`slice`, each interval produces its own matrix rather
than being concatenated together.
Parameters
----------
intervals : list of tuple
List of ``(start_ns, stop_ns)`` interval tuples.
chs : str or list of str
Channel label(s) to extract.
annots : str or list of str, optional
Annotation class(es) to include as indicator columns.
time : bool, optional
If ``True``, prepend a time column to each matrix. Default
``False``.
Returns
-------
list of tuple
One ``(column_names, data_matrix)`` tuple per interval.
"""
if type( chs ) is not list: chs = [ chs ]
if annots is not None:
if type( annots ) is not list: annots = [ annots ]
if annots is None: annots = [ ]
return self.edf.slices( intervals, chs , annots , time )
# --------------------------------------------------------------------------------
[docs]
def insert_signal( self, label , data , sr ):
"""Insert a new signal into the in-memory EDF.
Parameters
----------
label : str
Channel label for the new signal.
data : array-like
Signal samples as a 1-D sequence.
sr : int
Sample rate in Hz.
Returns
-------
None
"""
return self.edf.insert_signal( label , data , sr )
# --------------------------------------------------------------------------------
[docs]
def update_signal( self, label , data ):
"""Overwrite an existing in-memory signal's sample values.
Parameters
----------
label : str
Channel label of the signal to update.
data : array-like
New sample values (must match the existing channel length).
Returns
-------
None
"""
return self.edf.update_signal( label , data )
# --------------------------------------------------------------------------------
[docs]
def insert_annot( self, label , intervals, durcol2 = False ):
"""Insert annotation events into the in-memory dataset.
Parameters
----------
label : str
Annotation class label.
intervals : list of tuple
List of ``(start, stop)`` or ``(start, duration)`` tuples
depending on *durcol2*.
durcol2 : bool, optional
If ``True``, the second element of each tuple is interpreted as
a duration rather than a stop time. Default ``False``.
Returns
-------
None
"""
return self.edf.insert_annot( label , intervals , durcol2 )
# --------------------------------------------------------------------------------
#
# Luna function wrappers
#
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
[docs]
def freeze( self , f ):
"""Persist the current timeline mask to a named freezer tag.
Parameters
----------
f : str
Freezer tag name.
Returns
-------
None
"""
self.eval( 'FREEZE ' + f )
# --------------------------------------------------------------------------------
[docs]
def thaw( self , f , remove = False ):
"""Restore a previously saved freezer tag.
Parameters
----------
f : str
Freezer tag name.
remove : bool, optional
If ``True``, remove the tag after thawing. Default ``False``.
Returns
-------
None
"""
if remove:
self.eval( 'THAW tag=' + f + 'remove' )
else:
self.eval( 'THAW ' + f )
# --------------------------------------------------------------------------------
[docs]
def empty_freezer( self ):
"""Clear all persisted freezer tags for this instance.
Returns
-------
object
Result of the ``CLEAN-FREEZER`` command evaluation.
"""
self.eval( 'CLEAN-FREEZER' )
# --------------------------------------------------------------------------------
[docs]
def mask( self , f = None ):
"""Apply one or more Luna mask expressions and rebuild epochs.
Parameters
----------
f : str or list of str, optional
One or more Luna ``MASK`` expressions or files to apply.
Epochs are rebuilt (``RE``) after all masks are applied.
Returns
-------
None
"""
if f is None: return
if type(f) is not list: f = [ f ]
[ self.eval( 'MASK ' + _f ) for _f in f ]
self.eval( 'RE' )
# --------------------------------------------------------------------------------
[docs]
def segments( self ):
"""Run ``SEGMENTS`` and return the contiguous segment table.
Returns
-------
pandas.DataFrame
``SEGMENTS: SEG`` table with segment start/stop times.
"""
self.eval( 'SEGMENTS' )
return self.table( 'SEGMENTS' , 'SEG' )
# --------------------------------------------------------------------------------
[docs]
def epoch( self , f = '' ):
"""Run the ``EPOCH`` command with optional arguments.
Parameters
----------
f : str, optional
Additional ``EPOCH`` arguments (e.g. ``'dur=30'``).
Returns
-------
None
"""
self.eval( 'EPOCH ' + f )
# --------------------------------------------------------------------------------
[docs]
def epochs( self ):
"""Run ``EPOCH table`` and return a compact epoch summary DataFrame.
Returns
-------
pandas.DataFrame
Columns ``['E', 'E1', 'LABEL', 'HMS', 'START', 'STOP', 'DUR']``
with one row per epoch.
"""
self.eval( 'EPOCH table' )
df = self.table( 'EPOCH' , 'E' )
df = df[[ 'E', 'E1', 'LABEL', 'HMS', 'START','STOP','DUR' ]]
#df = df.drop(columns = ['ID','TP','MID','INTERVAL'] )
return df
# --------------------------------------------------------------------------------
# tfview : spectral regional viewer
# for high-def plots:
# import matplotlib as mpl
# mpl.rcParams['figure.dpi'] = 300
[docs]
def tfview( self , ch,
e = None , t = None , a = None,
tw = 2, sec = 2 , inc = 0.1 ,
f = ( 0.5 , 30 ) , winsor = 0.025 ,
anns = None , norm = None ,
traces = True,
xlines = None , ylines = None , silent = True , pal = 'turbo' ):
"""Generate an MTM time-frequency spectrogram for a single channel.
Runs the Luna ``MTM`` command over the requested time window and
renders a Matplotlib spectrogram, optionally with the raw signal
trace above it.
Parameters
----------
ch : str
Channel label (single channel only).
Selection of intervals
~~~~~~~~~~~~~~~~~~~~~~
e : int or list of two ints, optional
Epoch range: a single epoch number or ``[start_epoch, stop_epoch]``
(both 1-based).
t : list of two floats, optional
Explicit time window ``[start_sec, stop_sec]``. Used when *e*
is not provided.
a : object, optional
Reserved for future use (annotation-based selection).
Spectrogram parameters
~~~~~~~~~~~~~~~~~~~~~~
tw : float, optional
MTM half-bandwidth parameter. Default ``2``.
sec : float, optional
Segment length in seconds for MTM sliding window. Default ``2``.
inc : float, optional
Increment between segment centres in seconds. Default ``0.1``.
f : tuple of (float, float), optional
Frequency range ``(min_hz, max_hz)``. Default ``(0.5, 30)``.
winsor : float, optional
Winsorisation proportion applied to the power values before
colour-mapping. Default ``0.025``.
norm : str, optional
Normalisation mode. Use ``'t'`` for time-wise z-scoring.
Default ``None`` (no normalisation).
Display options
~~~~~~~~~~~~~~~
traces : bool, optional
If ``True`` (default), show the raw signal trace above the
spectrogram.
anns : str or list of str, optional
Annotation class(es) to overlay on the trace panel.
xlines : object, optional
Reserved for future use.
ylines : object, optional
Reserved for future use.
silent : bool, optional
Run the MTM command silently. Default ``True``.
pal : str, optional
Matplotlib colour map name for the spectrogram. Default
``'turbo'``.
Returns
-------
None
The plot is rendered inline.
"""
# for now, accept only a single channel
assert type(ch) is str
# units
hdr = self.headers()
units = dict( zip( hdr.CH , hdr.PDIM ) )
# define window
w = None
if type(e) is list and len(e) == 2 :
w = self.e2i( e )
w = [ i for tuple in w for i in tuple ]
w = [ min(w) , max(w) ]
elif type(e) is int:
w = self.e2i( e )
w = [ i for tuple in w for i in tuple ]
elif type( t ) is list and len( t ) == 2:
w = t
if w is None: return
# window in seconds
ws = [ x * 1e-9 for x in w ]
ls = ws[1] - ws[0]
# build command
cstr = 'MTM dB segment-sec=' + str(sec) + ' segment-inc=' + str(inc) + ' tw=' + str(tw)
cstr += ' segment-spectra segment-output sig=' + ','.join( [ ch ] )
cstr += ' start=' + str(ws[0]) + ' stop=' + str(ws[1])
if f is not None: cstr += ' min=' + str(f[0]) + ' max=' + str(f[1])
# run MTM
if silent is True: self.silent_proc( cstr )
else: self.proc( cstr )
if self.empty_result_set(): return
# extract
tf = self.table( 'MTM' , 'CH_F_SEG' )
tf = tf.astype({'SEG': int })
tf.drop( 'ID' , axis=1, inplace=True)
tt = self.table('MTM','CH_SEG')
tt = tt.astype({'SEG': int })
tt['T'] = tt[['START', 'STOP']].mean(axis=1)
tt.drop( ['ID','DISC','START','STOP'] , axis=1, inplace=True)
m = pd.merge( tt ,tf , on= ['CH','SEG'] )
x = m['T'].to_numpy(dtype=float)
y = m['F'].to_numpy(dtype=float)
z = m['MTM'].to_numpy(dtype=float)
u = m['T'].unique()
# normalize?
if norm == 't':
groups = m.groupby(['CH','F'])[['MTM']]
mean, std = groups.transform("mean"), groups.transform("std")
mz = (m[mean.columns] - mean) / std
z = mz['MTM'].to_numpy(dtype=float)
# clip y-axes to observed
if max(y) < f[1]: f = (f[0] , max(y))
if min(y) > f[0]: f = (min(y) , f[1])
# get time domain signal/annotations
d = self.slices( [ (w[0] , w[1] )] , chs = ch , annots = anns , time = True )
dt = d[1][0]
tx = dt[:,0]
dvs = d[0][1:]
# make spectrogram
xn = np.unique(x).size
yn = np.unique(y).size
# winsorize power
z = winsorize( z , limits=[winsor, winsor] )
zi, yi, xi = np.histogram2d(y, x, bins=(yn,xn), weights=z, density=False )
counts, _, _ = np.histogram2d(y, x, bins=(yn,xn))
with np.errstate(divide='ignore', invalid='ignore'): zi = zi / counts
zi = np.ma.masked_invalid(zi)
# do plot
if traces is True:
fig, axs = plt.subplots( nrows = 2 , ncols = 1 , sharex=True, height_ratios=[1, 2] )
axs[0].set_title( self.id() )
fig.set_figheight(5)
fig.set_figwidth(15)
axs[0].set_ylabel( ch + ' (' + units[ch] + ')' )
axs[0].set(xlim=ws)
axs[1].set(xlim=ws, ylim=f)
axs[1].set_xlabel('Time (secs)')
axs[1].set_ylabel('Frequency (Hz)')
p1 = axs[1].pcolormesh(xi, yi, zi, cmap = pal )
fig.colorbar(p1, orientation="horizontal", drawedges = False, shrink = 0.2 , pad = 0.3)
[ axs[0].plot( tx , dt[:,di+1] , label = dvs[di] , linewidth=0.5 ) for di in range(0,len(dvs)) if dvs[di] in [ ch ] ]
if traces is False:
fig, ax = plt.subplots( nrows = 1 , ncols = 1 , sharex=True )
ax.set_title( self.id() )
fig.set_figheight(5)
fig.set_figwidth(15)
ax.set(xlim=ws, ylim=f)
ax.set_xlabel('Time (secs)')
ax.set_ylabel('Frequency (Hz)')
p1 = ax.pcolormesh(xi, yi, zi, cmap = pal )
fig.colorbar(p1, orientation="horizontal", drawedges = False, shrink = 0.2 , pad = 0.3)
return
# --------------------------------------------------------------------------------
[docs]
def pops( self, s = None, s1 = None , s2 = None,
path = None , lib = None ,
do_edger = True ,
no_filter = False ,
do_reref = False ,
m = None , m1 = None , m2 = None ,
lights_off = '.' , lights_on = '.' ,
ignore_obs = False,
args = '' ):
"""Run the POPS automatic sleep stager on this individual.
POPS (Population-based sleep staging) assigns sleep stages to each
epoch using a pre-trained model. Use *s* alone for single-channel
mode, or *s1*/*s2* together for two-channel mode.
Parameters
----------
s : str, optional
EEG channel label for single-channel staging.
s1 : str, optional
First EEG channel for two-channel staging.
s2 : str, optional
Second EEG channel for two-channel staging.
path : str, optional
Path to the POPS resource folder. Defaults to
``resources.POPS_PATH``.
lib : str, optional
POPS library name. Defaults to ``resources.POPS_LIB`` (``'s2'``).
do_edger : bool, optional
Apply EDGER artifact detection. Default ``True``.
no_filter : bool, optional
Skip bandpass pre-filtering. Default ``False``.
do_reref : bool, optional
Re-reference the EEG to a mastoid channel. Default ``False``.
m : str, optional
Mastoid channel for single-channel re-referencing (required when
*do_reref* is ``True`` and *s* is set).
m1 : str, optional
First mastoid for two-channel re-referencing.
m2 : str, optional
Second mastoid for two-channel re-referencing.
lights_off : str, optional
Lights-off time ``'HH:MM:SS'`` or ``'.'`` if unknown.
Default ``'.'``.
lights_on : str, optional
Lights-on time ``'HH:MM:SS'`` or ``'.'`` if unknown.
Default ``'.'``.
ignore_obs : bool, optional
Ignore observed (manual) staging annotations. Default ``False``.
args : str, optional
Additional options to append to the ``POPS`` command.
Default ``''``.
Returns
-------
pandas.DataFrame or str
Per-epoch staging results from the ``POPS: E`` table, or an
error string if the resource path cannot be opened.
"""
if path is None: path = resources.POPS_PATH
if lib is None: lib = resources.POPS_LIB
import os
if not os.path.isdir( path ):
return 'could not open POPS resource path ' + path
if s is None and s1 is None:
print( 'must set s or s1 and s2 to EEGs' )
return
if ( s1 is None ) != ( s2 is None ):
print( 'must set s or s1 and s2 to EEGs' )
return
# POPS templates may reference mastoid vars even when do_reref is false.
# Provide safe placeholders unless rereferencing is explicitly requested.
if do_reref:
if s is not None and m is None:
print( 'must set m when do_reref is True in single-channel mode' )
return
if s1 is not None and ( m1 is None or m2 is None ):
print( 'must set m1 and m2 when do_reref is True in two-channel mode' )
return
else:
if m is None: m = '.'
if m1 is None: m1 = '.'
if m2 is None: m2 = '.'
# set options
self.var( 'mpath' , path )
self.var( 'lib' , lib )
self.var( 'do_edger' , '1' if do_edger else '0' )
self.var( 'do_reref' , '1' if do_reref else '0' )
self.var( 'no_filter' , '1' if no_filter else '0' )
self.var( 'LOFF' , lights_off )
self.var( 'LON' , lights_on )
if s is not None: self.var( 's' , s )
else: self.clear_vars( 's' )
if m is not None: self.var( 'm' , m )
else: self.clear_vars( 'm' )
if s1 is not None: self.var( 's1' , s1 )
else: self.clear_vars( 's1' )
if s2 is not None: self.var( 's2' , s2 )
else: self.clear_vars( 's2' )
if m1 is not None: self.var( 'm1' , m1 )
else: self.clear_vars( 'm1' )
if m2 is not None: self.var( 'm2' , m2 )
else: self.clear_vars( 'm2' )
# get either one- or two-channel mode Luna script from POPS folder
twoch = s1 is not None and s2 is not None;
if twoch: cmdstr = cmdfile( path + '/s2.ch2.txt' )
else: cmdstr = cmdfile( path + '/s2.ch1.txt' )
# swap in any additional options to POPS
if ignore_obs is True:
args = args + ' ignore-obs-staging';
if do_edger is True:
cmdstr = cmdstr.replace( 'EDGER' , 'EDGER all' )
if args != '':
cmdstr = cmdstr.replace( 'POPS' , 'POPS ' + args + ' ')
# run the command
self.proc( cmdstr )
# return of results
return self.table( 'POPS' , 'E' )
# --------------------------------------------------------------------------------
[docs]
def predict_SUN2019( self, cen , age = None , th = '3' , path = None ):
"""Run the SUN2019 EEG-based brain-age prediction model for this individual.
Parameters
----------
cen : str or list of str
EEG centroid channel label(s). A list is joined with commas.
age : float or str
Chronological age (years) for this individual. Required.
th : str or int, optional
Outlier threshold in standard deviations. Default ``'3'``.
path : str, optional
Path to the Luna models folder. Defaults to
``resources.MODEL_PATH``.
Returns
-------
pandas.DataFrame
Prediction results from the ``PREDICT`` command.
"""
if path is None: path = resources.MODEL_PATH
if type( cen ) is list : cen = ','.join( cen )
# set i-vars
if age is None:
print( 'need to set age indiv-var' )
return
self.var( 'age' , str(age) )
self.var( 'cen' , cen )
self.var( 'mpath' , path )
self.var( 'th' , str(th) )
self.eval( cmdfile( resources.MODEL_PATH + '/m1-adult-age-luna.txt' ) )
return self.table( 'PREDICT' )
# --------------------------------------------------------------------------------
[docs]
def stages(self):
"""Return a DataFrame of per-epoch sleep stage assignments.
Runs the Luna ``STAGE`` command silently and extracts the
``STAGE: E`` result table.
Returns
-------
pandas.DataFrame or None
Table with one row per epoch and a ``STAGE`` column, or
``None`` if no staging annotations are present.
"""
hyp = self.silent_proc( "STAGE" )
if type(hyp) is type(None): return
if 'STAGE: E' in hyp:
return hyp[ 'STAGE: E' ]
return
# --------------------------------------------------------------------------------
[docs]
def hypno(self):
"""Plot a hypnogram of the sleep stages for this individual.
Requires that staging annotations are already attached.
Returns
-------
object
Return value from :func:`lunapi.viz.hypno` (typically a
Plotly figure).
"""
if self.has_staging() is not True:
print( "no staging attached" )
return
return hypno( self.stages()[ 'STAGE' ] )
# --------------------------------------------------------------------------------
[docs]
def has_staging(self):
"""Return whether sleep staging annotations are present for this instance.
Returns
-------
bool
``True`` if staging annotations are attached; ``False`` otherwise.
"""
_proj = proj(False)
silence_mode = _proj.is_silenced()
_proj.silence(True,False)
res = self.edf.has_staging()
_proj.silence( silence_mode , False )
return res
# --------------------------------------------------------------------------------
[docs]
def has_annots(self,anns):
"""Return a boolean vector indicating which annotation classes are present.
Parameters
----------
anns : str or list of str
One or more annotation class names to check.
Returns
-------
list of bool
One element per entry in *anns*: ``True`` if that class exists.
"""
if anns is None: return
if type( anns ) is not list: anns = [ anns ]
return self.edf.has_annots( anns )
# --------------------------------------------------------------------------------
[docs]
def has_annot(self,anns):
"""Return a boolean vector indicating which annotation classes are present.
Alias for :meth:`has_annots`.
Parameters
----------
anns : str or list of str
One or more annotation class names to check.
Returns
-------
list of bool
One element per entry in *anns*: ``True`` if that class exists.
"""
return self.has_annots(anns)
# --------------------------------------------------------------------------------
[docs]
def has_channels(self,ch):
"""Return a boolean vector indicating which channels are present.
Parameters
----------
ch : str or list of str
One or more channel labels to check.
Returns
-------
list of bool
One element per entry in *ch*: ``True`` if that channel exists.
"""
if ch is None: return
if type(ch) is not list: ch = [ ch ]
return self.edf.has_channels( ch )
# --------------------------------------------------------------------------------
[docs]
def has(self,ch):
"""Return a boolean vector indicating which channels are present.
Alias for :meth:`has_channels`.
Parameters
----------
ch : str or list of str
One or more channel labels to check.
Returns
-------
list of bool
One element per entry in *ch*: ``True`` if that channel exists.
"""
if ch is None: return
if type(ch) is not list: ch = [ ch ]
return self.edf.has_channels( ch )
# --------------------------------------------------------------------------------
# def psd(self, ch, minf = None, maxf = 25, minp = None, maxp = None , xlines = None , ylines = None ):
# """Spectrogram plot for a given channel 'ch'"""
# if type( ch ) is not str: return
# if all( self.has( ch ) ) is not True: return
# res = self.silent_proc( 'PSD spectrum dB max=' + str(maxf) + ' sig=' + ','.join(ch) )[ 'PSD: CH_F' ]
# return psd( res , ch, minf = minf, maxf = maxf, minp = minp, maxp = maxp , xlines = xlines , ylines = ylines )
# --------------------------------------------------------------------------------
[docs]
def psd( self, ch, var = 'PSD' , minf = None, maxf = 25, minp = None, maxp = None , xlines = None , ylines = None ):
"""Plot the power spectral density for one or more channels.
Runs the Luna ``PSD`` (default) or ``MTM`` command and renders the
result via :func:`lunapi.viz.psd`.
Parameters
----------
ch : str or list of str
Channel label(s) to plot.
var : str, optional
Spectral estimator to use: ``'PSD'`` (Welch) or ``'MTM'``
(multitaper). Default ``'PSD'``.
minf : float, optional
Minimum frequency (Hz) for the x-axis. Default ``None``
(use estimator minimum).
maxf : float, optional
Maximum frequency (Hz) for the x-axis and Luna command.
Default ``25``.
minp : float, optional
Minimum power for the y-axis. Default ``None`` (auto-scale).
maxp : float, optional
Maximum power for the y-axis. Default ``None`` (auto-scale).
xlines : list of float, optional
Vertical reference lines at these frequencies.
ylines : list of float, optional
Horizontal reference lines at these power values.
Returns
-------
None
The plot is rendered inline.
"""
if ch is None: return
if type(ch) is not list: ch = [ ch ]
if var == 'PSD':
res = self.silent_proc( 'PSD spectrum dB max=' + str(maxf) + ' sig=' + ','.join(ch) )
df = res[ 'PSD: CH_F' ]
else:
res = self.silent_proc( 'MTM tw=15 dB max=' + str(maxf) + ' sig=' + ','.join(ch) )
df = res[ 'MTM: CH_F' ]
psd( df = df , ch = ch , var = var ,
minf = minf , maxf = maxf , minp = minp , maxp = maxp ,
xlines = xlines , ylines = ylines )
# --------------------------------------------------------------------------------
# def spec( self, ch, var = 'PSD' , mine = None, maxe = None, minf = None, maxf = 25 , w = 0.025 ):
# """Generates an epoch-level PSD spectrogram (from PSD or MTM)"""
# if ch is None: return
# if type(ch) is not list: ch = [ ch ]
#
# if var == 'PSD':
# self.eval( 'PSD epoch-spectrum dB max=' + str(maxf) + ' sig=' + ','.join(ch) )
# df = self.table( 'PSD' , 'CH_E_F' )
# else:
# self.eval( 'MTM epoch-spectra epoch epoch-output dB tw=15 max=' + str(maxf) + ' sig=' + ','.join(ch) )
# df = self.table( 'MTM' , 'CH_E_F' )
#
# spec( df = df , ch = None , var = var ,
# mine = mine , maxe = maxe , minf = minf , maxf = maxf , w = w )
# --------------------------------------------------------------------------------
[docs]
def spec(self,ch,mine = None , maxe = None , minf = None, maxf = None, w = 0.025 ):
"""Plot an epoch-by-frequency PSD spectrogram for a single channel.
Runs Luna ``PSD epoch-spectrum`` silently and renders the result via
:func:`lunapi.viz.spec`.
Parameters
----------
ch : str
Channel label (single channel).
mine : int, optional
First epoch to include. Default ``None`` (all epochs).
maxe : int, optional
Last epoch to include. Default ``None`` (all epochs).
minf : float, optional
Minimum frequency (Hz). Default ``0.5``.
maxf : float, optional
Maximum frequency (Hz). Default ``25``.
w : float, optional
Winsorisation proportion applied to power values.
Default ``0.025``.
Returns
-------
object
Return value from :func:`lunapi.viz.spec`.
"""
if type( ch ) is not str:
return
if all( self.has( ch ) ) is not True:
return
if minf is None:
minf=0.5
if maxf is None:
maxf=25
res = self.silent_proc( "PSD epoch-spectrum dB sig="+ch+" min="+str(minf)+" max="+str(maxf) )[ 'PSD: CH_E_F' ]
return spec( res , ch=ch, var='PSD', mine=mine,maxe=maxe,minf=minf,maxf=maxf,w=w)
__all__ = ["inst"]