Source code for ChildProject.metrics

import pandas as pd
import numpy as np

from typing import List


[docs]def segments_to_annotation(segments: pd.DataFrame, column: str): """Transform a dataframe of annotation segments into a pyannote.core.Annotation object :param segments: a dataframe of input segments. It should at least have the following columns: ``segment_onset``, ``segment_offset`` and ``column``. :type segments: pd.DataFrame :param column: the name of the column in ``segments`` that should be used for the values of the annotations (e.g. speaker_type). :type column: str :return: the pyannote.core.Annotation object. :rtype: pyannote.core.Annotation """ from pyannote.core import Annotation, Segment annotation = Annotation() for segment in segments.to_dict(orient="records"): start = segment["segment_onset"] end = segment["segment_offset"] annotation[Segment(start, end)] = segment[column] return annotation
[docs]def pyannote_metric( segments: pd.DataFrame, reference: str, hypothesis: str, metric, column: str ): ref = segments_to_annotation(segments[segments["set"] == reference], column) hyp = segments_to_annotation(segments[segments["set"] == hypothesis], column) return metric(ref, hyp, detailed=True)
[docs]def segments_to_grid( segments: pd.DataFrame, range_onset: int, range_offset: int, timescale: int, column: str, categories: list, none=True, overlap=False, ) -> float: """Transform a dataframe of annotation segments into a 2d matrix representing the indicator function of each of the ``categories`` across time. Each row of the matrix corresponds to a unit of time of length ``timescale`` (in milliseconds), ranging from ``range_onset`` to ``range_offset``; each column corresponds to one of the ``categories`` provided, plus two special columns (overlap and none). The value of the cell ``ij`` of the output matrix is set to 1 if the class ``j`` is active at time ``i``, 0 otherwise. If `overlap` is True, an additional column is appended to the grid, which set to 1 if more than two classes are active at time ``i``. If `none` is set to True, an additional column is appended to the grid, which is set to one if none of the classes are active at time ``i``. The shape of the output matrix is therefore ``((range_offset-range_onset)/timescale, len(categories) + n)``, where n = 2 if both `overlap` and `none` are True, 1 if one of them is True, and 0 otherwise. The fraction of time a class ``j`` is active can therefore be calculated as ``np.mean(grid, axis = 0)[j]`` :param segments: a dataframe of input segments. It should at least have the following columns: ``segment_onset``, ``segment_offset`` and ``column``. :type segments: pd.DataFrame :param range_onset: timestamp of the beginning of the range to consider (in milliseconds) :type range_onset: int :param range_offset: timestamp of the end of the range to consider (in milliseconds) :type range_offset: int :param timescale: length of each time unit (in milliseconds) :type timescale: int :param column: the name of the column in ``segments`` that should be used for the values of the annotations (e.g. speaker_type). :type column: str :param categories: the list of categories :type categories: list :param none: append a 'none' column, default True :type none: bool :param overlap: append an overlap column, default False :type overlap: bool :return: the output grid :rtype: numpy.array """ categories = list(map(str, categories)) units = int(np.ceil((range_offset - range_onset) / timescale)) # align on the grid segments.loc[:, "segment_onset"] = segments.loc[:, "segment_onset"] - range_onset segments.loc[:, "segment_offset"] = segments.loc[:, "segment_offset"] - range_onset segments.loc[:, "onset_index"] = ( segments.loc[:, "segment_onset"] // timescale ).astype(int) segments.loc[:, "offset_index"] = ( segments.loc[:, "segment_offset"] // timescale ).astype(int) category_table = {categories[i]: i for i in range(len(categories))} data = np.zeros((units, len(categories) + int(overlap) + int(none)), dtype=int) for segment in segments.to_dict(orient="records"): category = str(segment[column]) if category not in category_table: continue category_index = category_table[category] data[segment["onset_index"] : segment["offset_index"], category_index] = 1 if overlap or none: non_zero = np.count_nonzero(data, axis=1) if overlap: overlap_index = -2 if none else -1 data[:, overlap_index] = non_zero > 1 if none: data[:, -1] = non_zero == 0 return data
[docs]def grid_to_vector(grid, categories): """Transform a grid of active classes into a vector of labels. In case several classes are active at time i, the label is set to 'overlap'. See :func:`ChildProject.metrics.segments_to_grid` for a description of grids. :param grid: a NumPy array of shape ``(n, len(categories))`` :type grid: numpy.array :param categories: the list of categories :type categories: list :return: the vector of labels of length ``n`` (e.g. ``np.array([none FEM FEM FEM overlap overlap CHI])``) :rtype: numpy.array """ return np.vectorize(lambda x: categories[x])( grid.shape[1] - np.argmax(grid[:, ::-1], axis=1) - 1 )
[docs]def conf_matrix(rows_grid, columns_grid): """compute the confusion matrix (as counts) from grids of active classes. See :func:`ChildProject.metrics.segments_to_grid` for a description of grids. :param rows_grid: the grid corresponding to the rows of the confusion matrix. :type rows_grid: numpy.array :param columns_grid: the grid corresponding to the columns of the confusion matrix. :type columns_grid: numpy.array :param categories: the labels corresponding to each class :type categories: list of strings :return: a square numpy array of counts :rtype: numpy.array """ return rows_grid.T @ columns_grid
[docs]def vectors_to_annotation_task(*args, drop: List[str] = []): """transform vectors of labels into a nltk AnnotationTask object. :param *args: vector of labels for each annotator; add one argument per annotator. :type *args: 1d np.array() of labels :param drop: list of labels that should be ignored :type drop: List[str] :return: the AnnotationTask object :rtype: nltk.metrics.agreement.AnnotationTask """ from nltk.metrics import agreement v = np.vstack(args) it = np.nditer(v, flags=["multi_index"]) if len(drop): data = [ (it.multi_index[0], it.multi_index[1], str(x)) for x in it if str(x) not in drop ] else: data = [(it.multi_index[0], it.multi_index[1], str(x)) for x in it] return agreement.AnnotationTask(data=data)
[docs]def gamma( segments: pd.DataFrame, column: str, alpha: float = 1, beta: float = 1, precision_level: float = 0.05, ) -> float: """Compute Mathet et al. gamma agreement on `segments`. The gamma measure evaluates the reliability of both the segmentation and the categorization simultaneously; a extensive description of the method and its parameters can be found in Mathet et al., 2015 (`doi:10.1162/COLI_a_00227 <https://dx.doi.org/10.1162/COLI_a_00227>`_) This function uses the `pyagreement-agreement package <https://pygamma-agreement.readthedocs.io/en/latest/>`_ by `Titeux et al <https://hal.archives-ouvertes.fr/hal-03144116>`_. :param segments: input segments dataframe (see :ref:`format-annotations-segments` for the dataframe format) :type segments: pd.DataFrame :param column: name of the categorical column of the segments to consider, e.g. 'speaker_type' :type column: str :param alpha: gamma agreement time alignment weight, defaults to 1 :type alpha: float, optional :param beta: gamma agreement categorical weight, defaults to 1 :type beta: float, optional :param precision_level: level of precision (see pygamma-agreement's documentation), defaults to 0.05 :type precision_level: float, optional :return: gamma agreement :rtype: float """ from pyannote.core import Segment from pygamma_agreement.continuum import Continuum from pygamma_agreement.dissimilarity import CombinedCategoricalDissimilarity continuum = Continuum() for segment in segments.to_dict(orient="records"): continuum.add( segment["set"], Segment(segment["segment_onset"], segment["segment_offset"]), segment[column], ) dissim = CombinedCategoricalDissimilarity( delta_empty=1, alpha=alpha, beta=beta ) gamma_results = continuum.compute_gamma(dissim, precision_level=precision_level) return gamma_results.gamma