"""
XDF based protocols
-------------------
This kind of file format is our preferred file format. It is `open-source, well-defined and extensible <https://github.com/sccn/xdf/wiki/Specifications>`_ and has `pxdf <https://pypi.org/project/pyxdf/>`_ to load it with Python. You will need one file.
- :code:`.xdf`
Data
****
Because LabRecorder can record multiple streams into a single :code:`.xdf`-file. These files can contain therefore not only EEG and EMG, but also e.g. pupilometric data, respiration effort, grip force, and many more. As it allows to record multiple streams, it also offers the option to record coordinates (as e.g. sent with every pulse from localite version 4.0) together with the raw data (as sent e.g. by eego or bvr) and additional markers.
Coordinates
***********
In the optimal case, the :code:`.xdf`-file contains already sufficient information about the coordinates, and pairing is automatic. Yet, there will be some :code:`.xdf`-files, where not all streams were recorded. This might have happened e.g. due to errors in the recording script, an erroneous automated recording, or during manual recording with LabRecorder. In these cases, information about coordinates or other markers can be missing. The pairing of coordinates with a specific trace needs to be reconstructed manually (see :ref:`support-link-coords`).
If multiple protocols were recorded in one :code:`xdf`-file, as often happened during manual recording, we will have hundreds of stimuli. Worse, it can be that even marker-streams are missing, and there is no information when a protocol started within the long recording. Linking them to the correct coordinates is tricky, and the best chance is probably taking account of the relative latency between subsequent stimuli.
"""
import numpy as np
from offspect.types import Annotations, FileName
from typing import List, Union, Any, Dict
from liesl.api import XDFFile
from liesl.files.xdf.load import XDFStream
from offspect.types import FileName, Coordinate, MetaData, Annotations, TraceData
from pathlib import Path
from math import nan, inf
import time
import json
import numpy as np
from offspect.cache.attrs import AnnotationFactory, decode
from offspect.protocols.xdf import (
get_coords_from_xml,
decode_marker,
pick_stream_with_channel,
find_closest_samples,
yield_timestamps,
yield_comments,
list_nan,
list_nan_coords,
yield_loc_coords,
yield_loc_mso,
yield_loc_didt,
)
# -----------------------------------------------------------------------------
def concat_multifile(xdffiles: List[FileName]):
files = []
origin = Path(xdffiles[0]).name
filedate = time.ctime(Path(xdffiles[0]).stat().st_mtime)
for fname in xdffiles:
streams = XDFFile(fname)
files.append(streams)
return files, origin, filedate
[docs]def prepare_annotations_multfile(
files,
origin,
filedate,
channel: str,
pre_in_ms: float,
post_in_ms: float,
comment_name=None,
):
# we assume that all mapping was done after any spongebob assessments,
# therefore everything earlier than the latest spongebob trigger is
# considered irrelevant
# alternatively, everything in an xdf-file earlier than a reiz_marker of
# "" or earlier than any other marker is irrelevant. This is because every
# start of a reiz-marker sends out an "" to see whether the marker-server
# is running, while for all other assessments but mapping, the reiz-marker
# was used to send markers
for streams in files:
if streams["reiz_marker_sa"].time_series[-1] == [""]:
iu1 = 0
else:
iu1 = streams["reiz_marker_sa"].time_stamps[-1]
spbob = np.where(streams["Spongebob-Data"].time_series[:, 11] == 1.0)[0]
if len(spbob) == 0: # we never triggered Spongebob in this file
iu2 = 0
else:
idx = spbob[-1]
iu2 = streams["Spongebob-Data"].time_stamps[idx]
if max((iu1, iu2)) == 0:
continue
else:
irrelevant_until = max((iu1, iu2))
print(irrelevant_until)
# we concatenate the time_series and time_stamps from the multiple xdf \
# files. Because the clock is continuous and monotonic, we do not need to
# correct for any possible reset between xdf files, and gaps are jumped
# later
time_stamps = []
data_series = None
data_stamps = None
for streams in files:
datastream = pick_stream_with_channel(channel, streams)
if data_series is None:
data_series = datastream.time_series
data_stamps = datastream.time_stamps
else:
data_series = np.concatenate((data_series, datastream.time_series), axis=0)
data_stamps = np.concatenate((data_stamps, datastream.time_stamps), axis=0)
for event in streams["BrainVision RDA Markers"].time_stamps:
if event > irrelevant_until:
time_stamps.append(event)
event_count = len(time_stamps)
coords = list_nan_coords(event_count)
stimulation_intensity_didt = list_nan(event_count)
stimulation_intensity_mso = list_nan(event_count)
print(f"Found {event_count} events")
if event_count == 350:
grid_layout = "5x7"
print("This corresponds to a subject with a 5x7 grid")
elif event_count == 360:
grid_layout = "6x6"
print("This corresponds to a subject with a 6x6 grid")
else:
grid_layout = "Unknown"
print("This does not correspond to a known grid layout")
# global fields
fs = datastream.nominal_srate
anno = AnnotationFactory(readin="tms", readout="cmep", origin=origin)
anno.set("filedate", filedate)
anno.set("subject", "") # TODO parse from correctly organized file
anno.set("samplingrate", fs)
anno.set("samples_pre_event", int(pre_in_ms * fs / 1000))
anno.set("samples_post_event", int(post_in_ms * fs / 1000))
anno.set("channel_of_interest", channel)
anno.set("channel_labels", [channel])
anno.set("global_comment", f"grid_layout={grid_layout}")
# trace fields
event_samples = []
for ts in time_stamps:
idx = int(np.argmin(np.abs(data_stamps - ts)))
event_samples.append(idx)
# New Implementation
# gmfp = np.std(data_series[:, 0:64], 1)
# artifact = tkeo(gmfp)
# aptp = []
# tp = []
# for onset in event_samples:
# hood = gmfp[onset - 50 : onset + 50]
# aptp.append(np.max(hood))
# tp.append(int(np.argmax(hood) - 50 + onset))
# event_samples = tp
gmfp = np.std(data_series[:, 0:64], 1)
aptp = []
tp = []
for onset in event_samples:
artifact = gmfp[onset - 25 : onset + 25]
aptp.append(np.ptp(artifact))
tp.append(int(np.argmax(artifact) - 25 + onset))
event_samples = tp
event_times = [float(t) for t in data_stamps[event_samples] - data_stamps[0]]
time_since_last_pulse = [inf] + [
a - b for a, b in zip(event_times[1:], event_times[0:-1])
]
for idx, t in enumerate(event_samples):
tattr = {
"id": idx,
"comment": f'{{"artifact_amplitude":{aptp[idx]:3.2f}}}',
"event_name": "BrainVision RDA Markers - 'S 2'",
"event_sample": event_samples[idx],
"event_time": event_times[idx],
"xyz_coords": coords[idx],
"time_since_last_pulse_in_s": time_since_last_pulse[idx],
"stimulation_intensity_mso": stimulation_intensity_mso[idx],
"stimulation_intensity_didt": stimulation_intensity_didt[idx],
}
anno.append_trace_attr(tattr)
return anno.anno
def cut_traces_multifile(files, annotation: Annotations) -> List[TraceData]:
"""cut the tracedate from a matfile given Annotations
args
----
xdfile: FileName
the xdffile for cutting the data. must correspond in name to the one specified in the annotation
annotation: Annotations
the annotations specifying e.g. onsets as well as pre and post durations
returns
-------
traces: List[TraceData]
"""
channel = decode(annotation["attrs"]["channel_of_interest"])
print("Selecting traces for channel", channel)
data_series = None
data_stamps = None
for streams in files:
datastream = pick_stream_with_channel(channel, streams)
cix = datastream.channel_labels.index(channel)
if data_series is None:
data_series = datastream.time_series
data_stamps = datastream.time_stamps
else:
"""
XDF based protocols
-------------------
This kind of file format is our preferred file format. It is `open-source, well-defined and extensible <https://github.com/sccn/xdf/wiki/Specifications>`_ and has `pxdf <https://pypi.org/project/pyxdf/>`_ to load it with Python. You will need one file.
- :code:`.xdf`
Data
****
Because LabRecorder can record multiple streams into a single :code:`.xdf`-file. These files can contain therefore not only EEG and EMG, but also e.g. pupilometric data, respiration effort, grip force, and many more. As it allows to record multiple streams, it also offers the option to record coordinates (as e.g. sent with every pulse from localite version 4.0) together with the raw data (as sent e.g. by eego or bvr) and additional markers.
Coordinates
***********
In the optimal case, the :code:`.xdf`-file contains already sufficient information about the coordinates, and pairing is automatic. Yet, there will be some :code:`.xdf`-files, where not all streams were recorded. This might have happened e.g. due to errors in the recording script, an erroneous automated recording, or during manual recording with LabRecorder. In these cases, information about coordinates or other markers can be missing. The pairing of coordinates with a specific trace needs to be reconstructed manually (see :ref:`support-link-coords`).
If multiple protocols were recorded in one :code:`xdf`-file, as often happened during manual recording, we will have hundreds of stimuli. Worse, it can be that even marker-streams are missing, and there is no information when a protocol started within the long recording. Linking them to the correct coordinates is tricky, and the best chance is probably taking account of the relative latency between subsequent stimuli.
"""
import numpy as np
from offspect.types import Annotations, FileName
from typing import List, Union, Any, Dict
from liesl.api import XDFFile
from liesl.files.xdf.load import XDFStream
from offspect.types import FileName, Coordinate, MetaData, Annotations, TraceData
from pathlib import Path
from math import nan, inf
import time
import json
import numpy as np
from offspect.cache.attrs import AnnotationFactory, decode
from offspect.protocols.xdf import (
get_coords_from_xml,
decode_marker,
pick_stream_with_channel,
find_closest_samples,
yield_timestamps,
yield_comments,
list_nan,
list_nan_coords,
yield_loc_coords,
yield_loc_mso,
yield_loc_didt,
)
# -----------------------------------------------------------------------------
[docs]def concat_multifile(xdffiles: List[FileName]):
files = []
origin = Path(xdffiles[0]).name
filedate = time.ctime(Path(xdffiles[0]).stat().st_mtime)
for fname in xdffiles:
streams = XDFFile(fname)
files.append(streams)
return files, origin, filedate
[docs]def prepare_annotations_multifile(
files,
origin,
filedate,
channel: str,
pre_in_ms: float,
post_in_ms: float,
comment_name=None,
):
# we assume that all mapping was done after any spongebob assessments,
# therefore everything earlier than the latest spongebob trigger is
# considered irrelevant
# alternatively, everything in an xdf-file earlier than a reiz_marker of
# "" or earlier than any other marker is irrelevant. This is because every
# start of a reiz-marker sends out an "" to see whether the marker-server
# is running, while for all other assessments but mapping, the reiz-marker
# was used to send markers
for streams in files:
if streams["reiz_marker_sa"].time_series[-1] == [""]:
iu1 = 0
else:
iu1 = streams["reiz_marker_sa"].time_stamps[-1]
spbob = np.where(streams["Spongebob-Data"].time_series[:, 11] == 1.0)[0]
if len(spbob) == 0: # we never triggered Spongebob in this file
iu2 = 0
else:
idx = spbob[-1]
iu2 = streams["Spongebob-Data"].time_stamps[idx]
if iu1 > iu2:
print(
"WARNING: Spongebob data was not transmitted after NMES-IOC. Possible reason: Death of Outlet?"
)
else:
print("Spongebob data was transmitted after NMES-IOC")
if max((iu1, iu2)) == 0:
continue
else:
irrelevant_until = max((iu1, iu2))
print(irrelevant_until)
# we concatenate the time_series and time_stamps from the multiple xdf \
# files. Because the clock is continuous and monotonic, we do not need to
# correct for any possible reset between xdf files, and gaps are jumped
# later
time_stamps = []
data_series = None
data_stamps = None
for streams in files:
datastream = pick_stream_with_channel(channel, streams)
if data_series is None:
data_series = datastream.time_series
data_stamps = datastream.time_stamps
else:
data_series = np.concatenate((data_series, datastream.time_series), axis=0)
data_stamps = np.concatenate((data_stamps, datastream.time_stamps), axis=0)
for event in streams["BrainVision RDA Markers"].time_stamps:
if event > irrelevant_until:
time_stamps.append(event)
event_count = len(time_stamps)
coords = list_nan_coords(event_count)
stimulation_intensity_didt = list_nan(event_count)
stimulation_intensity_mso = list_nan(event_count)
print(f"Found {event_count} events")
if event_count == 350:
grid_layout = "5x7"
print("This corresponds to a subject with a 5x7 grid")
elif event_count == 360:
grid_layout = "6x6"
print("This corresponds to a subject with a 6x6 grid")
else:
grid_layout = "Unknown"
print("This does not correspond to a known grid layout")
# global fields
fs = datastream.nominal_srate
anno = AnnotationFactory(readin="tms", readout="cmep", origin=origin)
anno.set("filedate", filedate)
anno.set("subject", "") # TODO parse from correctly organized file
anno.set("samplingrate", fs)
anno.set("samples_pre_event", int(pre_in_ms * fs / 1000))
anno.set("samples_post_event", int(post_in_ms * fs / 1000))
anno.set("channel_of_interest", channel)
anno.set("channel_labels", [channel])
anno.set("global_comment", f"grid_layout={grid_layout}")
# trace fields
event_samples = []
for ts in time_stamps:
idx = int(np.argmin(np.abs(data_stamps - ts)))
event_samples.append(idx)
# New Implementation
# gmfp = np.std(data_series[:, 0:64], 1)
# artifact = tkeo(gmfp)
# aptp = []
# tp = []
# for onset in event_samples:
# hood = gmfp[onset - 50 : onset + 50]
# aptp.append(np.max(hood))
# tp.append(int(np.argmax(hood) - 50 + onset))
# event_samples = tp
gmfp = np.std(data_series[:, 0:64], 1)
aptp = []
tp = []
for onset in event_samples:
artifact = gmfp[onset - 25 : onset + 25]
aptp.append(np.ptp(artifact))
tp.append(int(np.argmax(artifact) - 25 + onset))
event_samples = tp
event_times = [float(t) for t in data_stamps[event_samples] - data_stamps[0]]
time_since_last_pulse = [inf] + [
a - b for a, b in zip(event_times[1:], event_times[0:-1])
]
# sometimes, we record way too many pulses for a mapping
max_allowed_counts = int(360 * 1.2)
if event_count > max_allowed_counts:
print("WARNING: There are too many pulses, throwing away possible faulty ones")
template = np.ones(180)
template[0::5] = 1.5
template[0] = 100
selection = []
old_xcorr = 0
new_xcorr = 0
to = 1
fr = 181
while fr < max_allowed_counts:
new_xcorr = np.correlate(time_since_last_pulse[-fr:-to], template)
print(fr, to, new_xcorr)
if new_xcorr < old_xcorr / 2:
# the last was a good fit, at least twice better than now
winner = (fr - 1, to - 1)
selection.extend(list(range(to + 1, fr + 1)))
fr += 180
to += 180
old_xcorr = 0
new_xcorr = 0
print("We found a winner", winner)
else:
old_xcorr = new_xcorr
fr += 1
to += 1
if len(selection) != 720:
raise Exception("Did not find a good selection of good events")
event_samples = drop_selection(event_samples, selection)
event_times = drop_selection(event_times, selection)
coords = drop_selection(coords, selection)
time_since_last_pulse = drop_selection(time_since_last_pulse, selection)
stimulation_intensity_mso = drop_selection(stimulation_intensity_mso, selection)
stimulation_intensity_didt = drop_selection(
stimulation_intensity_didt, selection
)
aptp = drop_selection(aptp, selection)
for idx, t in enumerate(event_samples):
tattr = {
"id": idx,
"comment": f'{{"artifact_amplitude":{aptp[idx]:3.2f}}}',
"event_name": "BrainVision RDA Markers - 'S 2'",
"event_sample": event_samples[idx],
"event_time": event_times[idx],
"xyz_coords": coords[idx],
"time_since_last_pulse_in_s": time_since_last_pulse[idx],
"stimulation_intensity_mso": stimulation_intensity_mso[idx],
"stimulation_intensity_didt": stimulation_intensity_didt[idx],
}
anno.append_trace_attr(tattr)
return anno.anno
[docs]def drop_selection(which, selection):
return [which[idx] for idx in selection]
[docs]def cut_traces_multifile(files, annotation: Annotations) -> List[TraceData]:
"""cut the tracedate from a matfile given Annotations
args
----
xdfile: FileName
the xdffile for cutting the data. must correspond in name to the one specified in the annotation
annotation: Annotations
the annotations specifying e.g. onsets as well as pre and post durations
returns
-------
traces: List[TraceData]
"""
channel = decode(annotation["attrs"]["channel_of_interest"])
print("Selecting traces for channel", channel)
data_series = None
data_stamps = None
for streams in files:
datastream = pick_stream_with_channel(channel, streams)
cix = datastream.channel_labels.index(channel)
if data_series is None:
data_series = datastream.time_series
data_stamps = datastream.time_stamps
else:
data_series = np.concatenate((data_series, datastream.time_series), axis=0)
data_stamps = np.concatenate((data_stamps, datastream.time_stamps), axis=0)
pre = decode(annotation["attrs"]["samples_pre_event"])
post = decode(annotation["attrs"]["samples_post_event"])
traces = []
for attrs in annotation["traces"]:
onset = decode(attrs["event_sample"])
trace = data_series[onset - pre : onset + post, cix]
traces.append(trace)
return traces
# %%
def prepare_annotations(
streams,
origin,
filedate,
channel: str,
pre_in_ms: float,
post_in_ms: float,
comment_name=None,
) -> Annotations:
"""load a documentation.txt and cnt-files and distill annotations from them
args
----
xmlfile: FileName
an option xml file with information about the target coordinates
readout: str
which readout to use
channel: str
which channel to pick
pre_in_ms: float
how many ms to cut before the tms
post_in_ms: float
how many ms to cut after the tms
xdffile: FileName
the :code:`.xdf`-file with the recorded streams, e.g. data and markers
returns
-------
annotation: Annotations
the annotations for this origin files
"""
# ------------------
datastream = pick_stream_with_channel(channel, streams)
iu1 = streams["reiz_marker_sa"].time_stamps[-1]
idx = np.where(streams["Spongebob-Data"].time_series[:, 11] == 1.0)[0][-1]
iu2 = streams["Spongebob-Data"].time_stamps[idx]
irrelevant_until = max((iu1, iu2))
time_stamps = []
for event in streams["BrainVision RDA Markers"].time_stamps:
if event > irrelevant_until:
time_stamps.append(event)
event_count = len(time_stamps)
coords = list_nan_coords(event_count)
stimulation_intensity_didt = list_nan(event_count)
stimulation_intensity_mso = list_nan(event_count)
comments = ["" for c in time_stamps]
print(f"Found {event_count} events")
if event_count == 350:
grid_layout = "5x7"
print("This corresponds to a subject with a 5x7 grid")
elif event_count == 360:
grid_layout = "6x6"
print("This corresponds to a subject with a 6x6 grid")
else:
grid_layout = "Unknown"
print("This does not correspond to a known grid layout")
# global fields
fs = datastream.nominal_srate
anno = AnnotationFactory(readin="tms", readout="cmep", origin=origin)
anno.set("filedate", filedate)
anno.set("subject", "") # TODO parse from correctly organized file
anno.set("samplingrate", fs)
anno.set("samples_pre_event", int(pre_in_ms * fs / 1000))
anno.set("samples_post_event", int(post_in_ms * fs / 1000))
anno.set("channel_of_interest", channel)
anno.set("channel_labels", [channel])
anno.set("global_comment", f"grid_layout={grid_layout}")
# trace fields
event_samples = find_closest_samples(datastream, time_stamps)
# shift onset on Peak of artifact
ephys = streams["BrainVision RDA"]
gmfp = np.std(ephys.time_series[:, 0:64], 1)
aptp = []
tp = []
for onset in event_samples:
artifact = gmfp[onset - 25 : onset + 25]
aptp.append(np.ptp(artifact))
tp.append(int(np.argmax(artifact) - 25 + onset))
event_samples = tp
event_times = [
float(t)
for t in datastream.time_stamps[event_samples] - datastream.time_stamps[0]
]
time_since_last_pulse = [inf] + [
a - b for a, b in zip(event_times[1:], event_times[0:-1])
]
for idx, t in enumerate(event_samples):
tattr = {
"id": idx,
"comment": f'{{"artifact_amplitude":{aptp[idx]:3.2f}}}',
"event_name": "BrainVision RDA Markers - 'S 2'",
"event_sample": event_samples[idx],
"event_time": event_times[idx],
"xyz_coords": coords[idx],
"time_since_last_pulse_in_s": time_since_last_pulse[idx],
"stimulation_intensity_mso": stimulation_intensity_mso[idx],
"stimulation_intensity_didt": stimulation_intensity_didt[idx],
}
anno.append_trace_attr(tattr)
return anno.anno
def cut_traces(streams, annotation: Annotations) -> List[TraceData]:
"""cut the tracedate from a matfile given Annotations
args
----
xdfile: FileName
the xdffile for cutting the data. must correspond in name to the one specified in the annotation
annotation: Annotations
the annotations specifying e.g. onsets as well as pre and post durations
returns
-------
traces: List[TraceData]
"""
channel = decode(annotation["attrs"]["channel_of_interest"])
print("Selecting traces for channel", channel)
data_series = None
data_stamps = None
for streams in files:
datastream = pick_stream_with_channel(channel, streams)
cix = datastream.channel_labels.index(channel)
if data_series is None:
data_series = datastream.time_series
data_stamps = datastream.time_stamps
else:
data_series = np.concatenate((data_series, datastream.time_series), axis=0)
data_stamps = np.concatenate((data_stamps, datastream.time_stamps), axis=0)
pre = decode(annotation["attrs"]["samples_pre_event"])
post = decode(annotation["attrs"]["samples_post_event"])
traces = []
for attrs in annotation["traces"]:
onset = decode(attrs["event_sample"])
trace = data_series[onset - pre : onset + post, cix]
traces.append(trace)
return traces
# %%
[docs]def prepare_annotations(
streams,
origin,
filedate,
channel: str,
pre_in_ms: float,
post_in_ms: float,
comment_name=None,
) -> Annotations:
"""load a documentation.txt and cnt-files and distill annotations from them
args
----
xmlfile: FileName
an option xml file with information about the target coordinates
readout: str
which readout to use
channel: str
which channel to pick
pre_in_ms: float
how many ms to cut before the tms
post_in_ms: float
how many ms to cut after the tms
xdffile: FileName
the :code:`.xdf`-file with the recorded streams, e.g. data and markers
returns
-------
annotation: Annotations
the annotations for this origin files
"""
# ------------------
datastream = pick_stream_with_channel(channel, streams)
iu1 = streams["reiz_marker_sa"].time_stamps[-1]
idx = np.where(streams["Spongebob-Data"].time_series[:, 11] == 1.0)[0][-1]
iu2 = streams["Spongebob-Data"].time_stamps[idx]
irrelevant_until = max((iu1, iu2))
time_stamps = []
for event in streams["BrainVision RDA Markers"].time_stamps:
if event > irrelevant_until:
time_stamps.append(event)
event_count = len(time_stamps)
coords = list_nan_coords(event_count)
stimulation_intensity_didt = list_nan(event_count)
stimulation_intensity_mso = list_nan(event_count)
comments = ["" for c in time_stamps]
print(f"Found {event_count} events")
if event_count == 350:
grid_layout = "5x7"
print("This corresponds to a subject with a 5x7 grid")
elif event_count == 360:
grid_layout = "6x6"
print("This corresponds to a subject with a 6x6 grid")
else:
grid_layout = "Unknown"
print("This does not correspond to a known grid layout")
# global fields
fs = datastream.nominal_srate
anno = AnnotationFactory(readin="tms", readout="cmep", origin=origin)
anno.set("filedate", filedate)
anno.set("subject", "") # TODO parse from correctly organized file
anno.set("samplingrate", fs)
anno.set("samples_pre_event", int(pre_in_ms * fs / 1000))
anno.set("samples_post_event", int(post_in_ms * fs / 1000))
anno.set("channel_of_interest", channel)
anno.set("channel_labels", [channel])
anno.set("global_comment", f"grid_layout={grid_layout}")
# trace fields
event_samples = find_closest_samples(datastream, time_stamps)
# shift onset on Peak of artifact
ephys = streams["BrainVision RDA"]
gmfp = np.std(ephys.time_series[:, 0:64], 1)
aptp = []
tp = []
for onset in event_samples:
artifact = gmfp[onset - 25 : onset + 25]
aptp.append(np.ptp(artifact))
tp.append(int(np.argmax(artifact) - 25 + onset))
event_samples = tp
event_times = [
float(t)
for t in datastream.time_stamps[event_samples] - datastream.time_stamps[0]
]
time_since_last_pulse = [inf] + [
a - b for a, b in zip(event_times[1:], event_times[0:-1])
]
for idx, t in enumerate(event_samples):
tattr = {
"id": idx,
"comment": f'{{"artifact_amplitude":{aptp[idx]:3.2f}}}',
"event_name": "BrainVision RDA Markers - 'S 2'",
"event_sample": event_samples[idx],
"event_time": event_times[idx],
"xyz_coords": coords[idx],
"time_since_last_pulse_in_s": time_since_last_pulse[idx],
"stimulation_intensity_mso": stimulation_intensity_mso[idx],
"stimulation_intensity_didt": stimulation_intensity_didt[idx],
}
anno.append_trace_attr(tattr)
return anno.anno
[docs]def cut_traces(streams, annotation: Annotations) -> List[TraceData]:
"""cut the tracedate from a matfile given Annotations
args
----
xdfile: FileName
the xdffile for cutting the data. must correspond in name to the one specified in the annotation
annotation: Annotations
the annotations specifying e.g. onsets as well as pre and post durations
returns
-------
traces: List[TraceData]
"""
channel = decode(annotation["attrs"]["channel_of_interest"])
print("Selecting traces for channel", channel)
datastream = pick_stream_with_channel(channel, streams)
cix = datastream.channel_labels.index(channel)
pre = decode(annotation["attrs"]["samples_pre_event"])
post = decode(annotation["attrs"]["samples_post_event"])
traces = []
for attrs in annotation["traces"]:
onset = decode(attrs["event_sample"])
trace = datastream.time_series[onset - pre : onset + post, cix]
traces.append(trace)
return traces
if __name__ == "__main__":
# folder = "/media/rtgugg/sd/Desktop/test-offspect/betti/nocoords"
# fname = "TMS_NMES_AmWo_pre2.xdf"
# fname = Path(folder) / fname
# xdf = XDFFile(fname)
folder = "/media/rtgugg/sd/Desktop/test-offspect/betti/toomanytraces"
fname = Path(folder) / "TMS_NMES_MaBa_pre2.xdf"
files = [XDFFile(fname)]