Source code for ChildProject.utils

import os
from datetime import datetime
from typing import List
import numpy as np
import pandas as pd
from pathlib import Path


[docs]class Segment: def __init__(self, start, stop): self.start = start self.stop = stop
[docs] def length(self): return self.stop - self.start
def __repr__(self): return "Segment([{}, {}])".format(self.start, self.stop)
[docs]def df_to_printable(df : pd.DataFrame, delimiter:str=' ' , header:bool=False) -> str: """Takes a DataFrame and create a terminal printable string representing the output within a reasonable window options to have an aligned (like ls -l) output or parsable (with defined delimiter) in the order given :param df: pandas DataFrame containing the data to print :type df: pd.DataFrame :param delimiter: Character delimiting fields, when char is in the fields, escape those with the escape char :type delimiter: str :param visual: Whether to align the columns of the output and ignores escaping characters (output is not parsable) :type visual: bool :param escape_char: Character escaping fields when those contain the delimiting char :type escape_char: str :param header: First line of the output is the header, containing the name of the columns :type header: bool :return: representation of the dataframe :rtype: str """ if len(delimiter) != 1: raise ValueError(f"Invalid Delimiter {delimiter}, should be 1 character") df = df.fillna('').astype(str) result = f"" colsizes = {} for col in df.columns: colsizes[col] = df[col].astype(str).str.len().max() if df[col].astype(str).str.len().max() > len(col) else len(col) if header: first_col = True for col in colsizes.keys(): # add a delimiter if not in te first column result += delimiter if not first_col else "" first_col = False result += col.rjust(colsizes[col]) result += "\n" record = df.to_dict(orient='index') for row in record: first_col = True for col in colsizes.keys(): # add a delimiter if not in te first column result += delimiter if not first_col else "" first_col = False result += record[row][col].rjust(colsizes[col]) result += f"{delimiter}\033[94m{row}\033[0m\n" return result
[docs]def printable_unit_duration(duration) -> str: """from a duration in milliseconds, returns a string with an appropriate unit between ms, seconds, minutes and hours :param duration: duration in milliseconds :type duration: int :return: converted duration with unit letter :rtype: str """ # start big, most recs are long so this will often reduce te number of tests # shorter than 1 hour if duration < (1000 * 60 * 60): # shorter than a minute if duration < (1000 * 60): # shorter than a second if duration < (1000): return f"{duration}ms" else: return f"{round(duration / 1000, 1)}s" else: return f"{round(duration / (1000 * 60),1)}m" else: return f"{round(duration / (1000 * 60 * 60), 1)}h"
[docs]def retry_func( func : callable , excep: Exception, tries : int = 3, **kwargs): for i in range(tries): try: func(**kwargs) return except excep as e: if i == tries - 1: raise e
[docs]def intersect_ranges(xs, ys): # Try to get the first range in each iterator: try: x, y = next(xs), next(ys) except StopIteration: return while True: # Yield the intersection of the two ranges, if it's not empty: intersection = Segment(max(x.start, y.start), min(x.stop, y.stop)) if intersection.length() > 0: yield intersection # Try to increment the range with the earlier stopping value: try: if x.stop <= y.stop: x = next(xs) else: y = next(ys) except StopIteration: return
[docs]class TimeInterval: def __init__(self, start : datetime, stop : datetime): #remove the day/month/year component self.start = start.replace(year=1900, month=1, day=1) self.stop = stop.replace(year=1900, month=1, day=1)
[docs] def length(self): return self.stop - self.start
def __repr__(self): return "TimeInterval([{}, {}])".format(self.start, self.stop) def __eq__(self, other): return self.start == other.start and self.stop == other.stop
[docs]def time_intervals_intersect(ti1 : TimeInterval, ti2 : TimeInterval) -> List[TimeInterval]: """ given 2 time intervals (those do not take in consideration days, only time in the day), return an array of new interval(s) representing the intersections of the original ones. Examples 1. time_intervals_intersect( TimeInterval( datetime(1900,1,1,8,57), datetime(1900,1,1,21,4)), TimeInterval( datetime(1900,1,1,10,36), datetime(1900,1,1,22,1))) => [TimeInterval(10:36 , 21:04)] 2. time_intervals_intersect( TimeInterval( datetime(1900,1,1,8,57), datetime(1900,1,1,22,1)), TimeInterval( datetime(1900,1,1,21,4), datetime(1900,1,1,10,36))) => [TimeInterval(08:57 , 10:36),TimeInterval(21:04 , 22:01)] :param ti1: first interval :param ti2: second interval :type ti1: TimeInterval :type ti2: TimeInterval :return: list of intervals that intersect :rtype: list[TimeInterval] """ #The calculation and boolean evaluation is done that way to optimize the process, those expressions were obtained using a Karnaugh table. Given the relations between the different start and ending times, the boolean relations used below gives the correct intervals a = ti1.start <= ti1.stop b = ti2.start <= ti2.stop c = ti1.stop <= ti2.stop d = ti1.start <= ti2.start e = ti1.start <= ti2.stop f = ti2.start <= ti1.stop r = [] #case where correct resulting interval is [start of the 2nd interval : end of the 1st interval] if c and (d and (not e or f) or not e and f) or d and not e and f : r = [TimeInterval(ti2.start,ti1.stop)] #case where correct resulting interval is [start of the 2nd interval : end of the 2nd interval] elif not c and (d and (b or not a) or not a and b) or not a and b and d : r = [ti2] #case where correct resulting interval is [start of the 1st interval : end of the 2nd interval] elif not c and (not d and (not e and not f or e) or e and not f) or not d and e and not f : r = [TimeInterval(ti1.start,ti2.stop)] #case where correct resulting interval is [start of the 1st interval : end of the 1st interval] elif c and (not d and (not a and not b or a) or a and not b) or a and not b and not d : r = [ti1] # !! here the expression was simplified because previous statements should already have caught their relevant cases (ie this statement should always be last or changed) #case where correct resulting interval is [start of the 1st interval : end of the 2nd interval] U [start of the 2nd interval : end of the 1st interval] elif not a and (not b or e) or d and e and f : r = [TimeInterval(ti1.start,ti2.stop),TimeInterval(ti2.start,ti1.stop)] #remove the intervals having equal values (3:00 to 3:00) i = 0 while i < len(r): if r[i].start == r[i].stop: r.pop(i) else: i += 1 return r
[docs]def get_audio_duration(filename: Path) -> int: from soundfile import info if not filename.exists(): print('Warning: could not find file {}, setting duration to 0'.format(filename)) return 0 duration = 0 try: duration = info(str(filename)).duration except Exception as e: print('Warning: could not read duration for {}, setting duration to 0'.format(filename)) pass return duration
#reads a wav file for a given start point and duration (both in seconds)
[docs]def read_wav(filename, start_s, length_s): import librosa #we use librosa because it supports more codecs and is less likely to crash on an unsual encoding y,sr = librosa.load(filename, sr=None,mono=False, offset=start_s, duration = length_s) channels = 1 if len(y.shape) == 1 else y.shape[0] return y, sr, channels
#take 2 audio files, a starting point for each and a length to compare in seconds #return a divergence score representing the average difference in audio signal
[docs]def calculate_shift(file1, file2, start1, start2, interval): """ take 2 audio files, a starting point for each and a length to compare in seconds return a divergence score representing the average difference in audio signal :param file1: path to the first wav file to compare :type file1: str :param file2: path to the second wav file to compare :type file2: str :param start1: starting point for the comparison in seconds for the first audio :type start1: int :param start2: starting point for the comparison in seconds for the second audio :type start2: int :param interval: length to compare between the 2 audios on in seconds :type interval: int :return: tuple of divergence score and number of values used :rtype: (float, int) """ ref, ref_rate, ref_chan = read_wav(file1, start1, interval) test, test_rate, test_chan = read_wav(file2, start2, interval) if ref_chan != test_chan: #if different number of channels, shrink if possible print('WARNING : different number of channels, attempting to compress channels to carry on with analysis') if ref_chan == 1 and test_chan > 1 : test = np.mean(test,axis=0) test_chan = 1 print('{} was shrunk to mono channel for the analysis, it has a higher level of information than {}'.format(file2,file1)) elif ref_chan > 1 and test_chan == 1: ref = np.mean(ref,axis=0) ref_chan = 1 print('{} was shrunk to mono channel for the analysis, it has a higher level of information than {}'.format(file1,file2)) else: raise Exception('audios do not match, {} has {} channel(s) while {} has {}'.format(file1,ref_chan,file2,test_chan)) #in case of multiple channels, reshape to be 1D array (they should have the same number of channels at this point) if ref_chan > 1: ref = np.reshape(ref,ref_chan * ref.shape[1]) test = np.reshape(test,test_chan * test.shape[1]) #when sampling rate is different, look for a downsampled rate that can be used if ref_rate != test_rate: from math import gcd new_rate = gcd(ref_rate,test_rate) print('WARNING : sampling rates do not match between audios ({}Hz and {}Hz), attempting to downsample to {}Hz'.format(ref_rate,test_rate,new_rate)) if ref_rate > new_rate : #downsample if needed ref = ref[::int(ref_rate/new_rate)] ref_rate = new_rate if test_rate > new_rate : #downsample if needed test = test[::int(test_rate/new_rate)] test_rate = new_rate sampling_rate = ref_rate #downsample to save computation time only if sampling_rate is higher than 400 downsampled_rate = 400 if sampling_rate > 400 else sampling_rate ref = ref[::int(sampling_rate/downsampled_rate)] test = test[::int(sampling_rate/downsampled_rate)] # straight up difference of the audio signal averaged over the 2 segments analysed # times 1000 is arbitrary, just to have an easily readable and comparable score output res = np.abs(ref - test).sum() * 1000 /(len(ref)) return res,len(ref)
[docs]def find_lines_involved_in_overlap(df: pd.DataFrame, onset_label: str = 'range_onset', offset_label:str = 'range_offset', labels = []) -> pd.DataFrame: """takes a dataframe as input. The dataframe is supposed to have a column for the onset og a timeline and one for the offset. The function returns a boolean series where all indexes having 'True' are lines involved in overlaps and 'False' when not e.g. to select all lines involved in overlaps, use: ``` ovl_segments = df[find_lines_involved_in_overlap(df, 'segment_onset', 'segment_offset')] ``` and to select line that never overlap, use: ``` ovl_segments = df[~find_lines_involved_in_overlap(df, 'segment_onset', 'segment_offset')] ``` :param df: pandas DataFrame where we want to find overlaps, having some time segments described by 2 columns (onset and offset) :type df: pd.DataFrame :param onset_label: column label for the onset of time segments :type onset_label: str :param offset_label: columns label for the offset of time segments :type offset_label: str :param labels: list of column labels that are required to match to be involved in overlap. :type labels: list[str] :return: pandas Series of boolean values where 'True' are indexes where overlaps exist :rtype: pd.Series """ conditions = f"(df['{onset_label}'] < row['{offset_label}']) & (df['{offset_label}'] > row['{onset_label}']) & (df.index != row.name)" for l in labels: conditions = "(df['{}'] == row['{}']) & ".format(l,l) + conditions #overlap is defined by having s2.offset > s1.onset and s2.onset < s1.offset and s2.index != s1.index (same seg) return df.apply(lambda row: True if df[eval(conditions)].shape[0] else False,axis=1)
[docs]def series_to_datetime(time_series, time_index_list, time_column_name:str, date_series = None, date_index_list = None, date_column_name = None) -> pd.Series: """ returns a series of datetimes from a series of str. Using pd.to_datetime on all the formats \ listed for a specific column name in an index consisting of IndexColumn items. \ To have the date included and not only time), one can use a second series for date, \ with also the corresponding index and column :param time_series: pandas series of strings to transform into datetime (can contain NA value => NaT datetime), if date_series is given, time_series should only have the time :type time_series: pandas.Series :param time_index_list: list of index to use where the column wanted is present :type time_index_list: List[IndexColumn] :param time_column_name: name of the IndexColumn to use (IndexColumn.name value) for accepted formats :type time_column_name: str :param date_series: pandas series of strings to transform into the date component of datetime (can contain NA value) :type date_series: pandas.Series :param date_index_list: list of index to use where the column wanted is present :type date_index_list: List[IndexColumn] :param date_column_name: name of the IndexColumn to use (IndexColumn.name value) for accepted formats for dates :type date_column_name: str :return: series with dtype datetime containing the converted datetimes :rtype: pandas.Series """ time_formats = next(x for x in time_index_list if x.name==time_column_name).datetime series = pd.Series(np.nan, index=time_series.index , dtype='datetime64[ns]') if date_series is not None: time_sr = date_series + ' ' + time_series date_formats = next(x for x in date_index_list if x.name==date_column_name).datetime for frmt in time_formats: for dfrmt in date_formats: series = series.fillna(pd.to_datetime(time_sr, format="{} {}".format(dfrmt,frmt), errors="coerce")) else: time_sr = time_series.copy() for frmt in time_formats: series = series.fillna(pd.to_datetime(time_sr, format=frmt, errors="coerce")) return series