"""
Smartmove robotic
-----------------
These recordings come from the `smartmove robotic TMS <https://www.ant-neuro.com/products/smartmove>`_. This input format uses three files:
- :code:`.cnt` for EEG
- :code:`.cnt` for EMG
- :code:`.txt` for Coordinates
.. note::
Load the :class:`TraceData` with :func:`~.load_ephys_file` and the :class:`Coords` with :func:`load_documentation_txt`
Data
****
EEG and EMG data is stored in the native file-format of the eego recording software. It can be loaded with `libeep <https://github.com/translationalneurosurgery/libeep>`_. During robotic TMS, the 64 EEG channels and the 8 EMG channels are stored in
separate :code:`.cnt` files.
Coordinates
***********
During the mapping procedure, the coordinates of the target positions, i.e. where the robot will be moved to, are saved in a :code:`documentation.txt`-file. Please note that these are the targets for the robotic movement,
recorded, not the actual coordinates of stimulation. The actual coordinates at the time of stimulation do not appear to have been stored at all.
.. admonition:: Documentation.txt
The file documentation.txt stores the coordinates of each target the robot arm moved to. It does not contain information regarding manual adjustments (i.e. adjusting distance of coil to the head) or the actual coil position at the time of stimulation. Target coordinates are given in medical image coordinates (CT / 3D image).
- Target counter: Counts the number of successfully reached targets, including this one.
- Target number: Point number in the total list of all targets.
- Target label: The ‘name’ of the target point. Usually the same as the target number.
- X-vector [<m11> <m21> <m31>]
- Y-vector [<m12> <m22> <m32>]
- Z-vector [<m13> <m23> <m33>]
- Position [<x> <y> <z>]
- Date & time point [dd.mm.yy hh:mm:ss]
- Experiment name [always ‘TMS exp’]
- Subject ID [NnVv]
The coordinates of the targets are stored in one or multiple :code:`targets_*.sav`-files in xml format. The filename of this save
file encodes experiment, subject pseudonym, date and hour, e.g.:
:code:`targets_<experiment>_<VvNn>_20190603_1624.sav`. These coordinates are the e.g. the grid of targets predefined before starting the mapping.
The file success.txt stores the coordinates of only the last target the robot arm moved to. The first line reads ‘success’ (move ended at desired position), ‘start’ (move started but not ended) or ‘fail’ (move ended before reaching the target due to error). The second line contains the timestamp of when the status was updated. For line 4 to 10, same notation as in documentation.txt.
The file target_TMS_exp_[NnVv]_[yyyymmdd_hhmm] stores the coordinates of all created targets. It contains the position (<x>, <y> and <z>), matrix operations (<m11>, <m12>... until <m33>) and target label (<label>), each labeled as such.
Module Content
**************
"""
from offspect import release
from offspect.types import FileName, Coordinate, TraceData, Annotations, MetaData
from pathlib import Path
from ast import literal_eval
from typing import List, Tuple, Dict, Generator, Any
from datetime import datetime
import numpy as np
from math import inf, nan
from os import environ
from offspect.cache.attrs import AnnotationFactory, decode
if not environ.get("READTHEDOCS", False):
from libeep import cnt_file
[docs]def load_documentation_txt(fname: FileName) -> Dict[int, Coordinate]:
"load a documentation.txt and return Coords"
fname = Path(fname).expanduser().absolute()
if not fname.name == "documentation.txt":
raise ValueError(f"{fname} is not a valid documentation.txt")
with fname.open("r") as f:
lines = f.readlines()
if lines[-1] != "\n":
lines.append("\n") # otherwise, last target is ignored
coords = dict()
target: List[str]
target = []
experiment = None
subject = None
idx = -1
for line in lines:
# a target block is complete
if line == "\n":
# make sure everything is from the same experiment and subject
if subject is None:
subject = target[-1]
assert subject == target[-1]
if experiment is None:
experiment = target[-2]
assert experiment == target[-2]
# make sure the targets are consecutive and monoton
if int(target[0]) != idx + 2:
raise ValueError(f"{fname} is malformed")
# parse the target data and add to the dictionary
# take line four according to documentation,see above
tmp = ", ".join(target[3].split(" "))
xyz = literal_eval(f"[{tmp}]")
target = []
idx += 1
# take the last three entries according to documentation,see above
coords[idx] = xyz[-3:]
else: # info not complete, we need to collect more lines
target.append(line.strip())
return coords
[docs]def is_eegfile_valid(fname: FileName) -> bool:
try:
assert Path(fname).suffix == ".cnt"
parts = Path(fname).stem.split("_") # stem, so without the suffix
# make sure both subject names are the same
assert parts[0] == parts[1]
subject = parts[0]
assert len(subject) == 4
assert subject[0::2].isupper()
assert subject[1::2].islower()
return True
except AssertionError:
return False
[docs]def is_eeg_file(fname: FileName) -> bool:
"return true if this is the eeg-file"
parts: List[str] = Path(fname).stem.split("_")
# VvNn_VvNn_YYYY-MM-DD_HH-MM-SS.cnt
return parts[0] == parts[1]
[docs]def parse_recording_date(fname: FileName) -> datetime:
"""
The eeg cnt-file should have the following format:
VvNn_VvNn_YYYY-MM-DD_HH-MM-SS.cnt
"""
parts: List[str] = []
for part in Path(fname).stem.split("_"):
parts += part.split(" ")
delements = parts[-2].split("-") + parts[-1].split("-")
return datetime(*(int(d) for d in delements)) # type: ignore
[docs]def load_triggers(fname: FileName) -> List[Tuple[str, int]]:
c = cnt_file(fname)
triggers = [c.get_trigger(i) for i in range(c.get_trigger_count())]
events = []
for t in triggers:
m = t[0]
idx = t[1]
events.append((m, idx))
return events
[docs]def assert_equal_recording_day(eeg_fname: FileName, emg_fname: FileName):
eeg_day = parse_recording_date(eeg_fname)
emg_day = parse_recording_date(emg_fname)
assert eeg_day.year == emg_day.year
assert eeg_day.month == emg_day.month
assert eeg_day.day == emg_day.day
[docs]def load_ephys_file(
eeg_fname: FileName,
emg_fname: FileName,
pre_in_ms: float = 100,
post_in_ms: float = 100,
select_events: List[str] = ["0001"],
select_channel: str = "Ch1",
) -> Dict[str, Any]:
"""load the electophysiological data for a specific channel for a smartmove file-pair
args
----
eeg_fname: FileName
the path to the EEG file. The file is expected to have the following format: VvNn_VvNn_YYYY-MM-DD_HH-MM-SS.cnt
emg_fname: FileName
the path to the EMG file. The file is expected to have the following format: VvNn<qualifier> YYYY-MM-DD_HH-MM-SS.cnt
pre_in_ms: float = 100
how much time before the TMS
post_in_ms: float = 100
how much time after the TMS
select_events: List[str] = ["0001"]
which events indicate the occurence of a TMS-pulse
select_channel: str = "Ch1"
the channel to use. Note that EMG channel labes only offer a selection of :code:`'Ch1', 'Ch2', 'Ch3', 'Ch4', 'Ch5', 'Ch6', 'Ch7', 'Ch8'`, while EEG channels are recording from a standard wavecap and should offer ['Fp1', 'Fpz', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'FC5', 'FC1', 'FC2', 'FC6', 'M1', 'T7', 'C3', 'Cz', 'C4', 'T8', 'M2', 'CP5', 'CP1', 'CP2', 'CP6', 'P7', 'P3', 'Pz', 'P4', 'P8', 'POz', 'O1', 'O2', 'EOG', 'AF7', 'AF3', 'AF4', 'AF8', 'F5', 'F1', 'F2', 'F6', 'FC3', 'FCz', 'FC4', 'C5', 'C1', 'C2', 'C6', 'CP3', 'CP4', 'P5', 'P1', 'P2', 'P6', 'PO5', 'PO3', 'PO4', 'PO6', 'FT7', 'FT8', 'TP7', 'TP8', 'PO7', 'PO8', 'Oz'] all in reference to Cpz.
returns
-------
Traces: List[TraceData]
the TraceData for each event fitting to select_events in the file
pre_in_samples:int
how many samples before the trigger
post_in_samples:int
how many samples after the trigger
sampling_rate: float
the sampling rate of the trace
"""
if not is_eegfile_valid(eeg_fname):
raise ValueError(
f"{eeg_fname} has not the correct file signature for a smartmove eeg file"
)
assert_equal_recording_day(eeg_fname, emg_fname)
filedate = str(parse_recording_date(eeg_fname))
eeg = cnt_file(eeg_fname)
eeg_labels = [eeg.get_channel_info(i)[0] for i in range(eeg.get_channel_count())]
emg = cnt_file(emg_fname)
emg_labels = [emg.get_channel_info(i)[0] for i in range(emg.get_channel_count())]
# the triggers are always recorded with EEG, so we need this Fs
triggers = load_triggers(eeg_fname)
eeg_fs = eeg.get_sample_frequency()
if select_channel in eeg_labels:
cnt = eeg
elif select_channel in emg_labels:
cnt = emg
else:
raise IndexError(f"Selected channel {select_channel} not found")
origin = Path(cnt._fname).name
subject = Path(eeg_fname).stem.split("_")[0]
fs = cnt.get_sample_frequency()
pre = int(pre_in_ms * fs / 1000)
post = int(post_in_ms * fs / 1000)
scale = fs / eeg_fs
enames = []
onsets = []
tstamps = []
for event, sample in triggers:
if event in select_events:
onset = int(sample * scale)
onsets.append(onset)
tstamps.append(sample / eeg_fs)
enames.append(event)
print("Selected", len(onsets), "of", len(triggers), "trigger events")
time_since_last_pulse = [inf] + [a - b for a, b in zip(tstamps[1:], tstamps[0:-1])]
info = {
"origin": origin,
"event_samples": onsets,
"event_times": tstamps,
"event_names": enames,
"samples_pre_event": pre,
"samples_post_event": post,
"samplingrate": fs,
"subject": subject,
"channel_labels": [select_channel],
"time_since_last_pulse": time_since_last_pulse,
"filedate": filedate,
}
return info
# -----------------------------------------------------------------------------
[docs]def prepare_annotations(
docfile: FileName,
cntfiles: List[FileName],
channel: str,
pre_in_ms: float,
post_in_ms: float,
select_events: List[str] = ["0001"],
) -> Annotations:
"""load a documentation.txt and cnt-files and distill annotations from them
args
----
docfile: FileName
the documentation.txt with the target coordintes
cntfiles: List[FileName]
a list of the :code:`.cnt`-file with the EEG data and triggers and the
the :code:`.cnt`-file with the EMG data
channel: str
which channel to pick
pre_in_ms: float
how many ms to cut before the tms
post_in_ms: float
how many ms to cut after the tms
returns
-------
annotation: Annotations
the annotations for this origin files
"""
for f in cntfiles:
if is_eeg_file(f):
eegfile = f
else:
emgfile = f
return _prepare_annotations(
docfile, eegfile, emgfile, channel, pre_in_ms, post_in_ms, select_events,
)
def _prepare_annotations(
docfile: FileName,
eegfile: FileName,
emgfile: FileName,
channel: str,
pre_in_ms: float,
post_in_ms: float,
select_events: List[str] = ["0001"],
) -> Annotations:
# collect data
info = load_ephys_file(
eeg_fname=eegfile,
emg_fname=emgfile,
pre_in_ms=pre_in_ms,
post_in_ms=post_in_ms,
select_events=select_events,
select_channel=channel,
)
coords = load_documentation_txt(docfile)
stimulation_intensity_mso = nan
stimulation_intensity_didt = nan
anno = AnnotationFactory(readin="tms", readout="cmep", origin=info["origin"])
anno.set("filedate", info["filedate"])
anno.set("subject", info["subject"])
anno.set("samplingrate", info["samplingrate"])
anno.set("samples_pre_event", info["samples_pre_event"])
anno.set("samples_post_event", info["samples_post_event"])
anno.set("channel_of_interest", [channel])
anno.set("channel_labels", info["channel_labels"])
# trace fields
event_names = info["event_names"]
event_samples = info["event_samples"]
event_times = info["event_times"]
time_since_last_pulse = info["time_since_last_pulse"]
for idx, t in enumerate(event_samples):
try:
xyz_coords = coords[idx]
except KeyError:
xyz_coords = [nan, nan, nan]
tattr = {
"id": idx,
"event_name": f"'{event_names[idx]}'",
"event_sample": event_samples[idx],
"event_time": event_times[idx],
"xyz_coords": xyz_coords,
"time_since_last_pulse_in_s": time_since_last_pulse[idx],
"stimulation_intensity_mso": stimulation_intensity_mso,
"stimulation_intensity_didt": stimulation_intensity_didt,
"reject": False,
"onset_shift": 0,
}
anno.append_trace_attr(tattr)
return anno.anno
[docs]def cut_traces(cntfile: FileName, annotation: Annotations) -> List[TraceData]:
"""cut the tracedate from a matfile given Annotations
args
----
cntfile: FileName
the cntfile for cutting the data. must correspond in name to the one specified in the annotation
annotation: Annotations
the annotations specifying e.g. onsets as well as pre and post durations
returns
-------
traces: List[TraceData]
"""
cnt = cnt_file(cntfile)
pre = decode(annotation["attrs"]["samples_pre_event"])
post = decode(annotation["attrs"]["samples_post_event"])
cix = [cnt.get_channel_info(c)[0] for c in range(cnt.get_channel_count())].index(
decode(annotation["attrs"]["channel_of_interest"])[0]
)
traces = []
for attrs in annotation["traces"]:
onset = decode(attrs["event_sample"])
trace = cnt.get_samples(fro=onset - pre, to=onset + post)
trace = np.asanyarray(trace)[:, cix]
traces.append(trace)
return traces