import os
from datetime import datetime
from typing import List
import numpy as np
import pandas as pd
from pathlib import Path
[docs]class Segment:
def __init__(self, start, stop):
self.start = start
self.stop = stop
[docs] def length(self):
return self.stop - self.start
def __repr__(self):
return "Segment([{}, {}])".format(self.start, self.stop)
[docs]def df_to_printable(df : pd.DataFrame, delimiter:str=' ' , header:bool=False) -> str:
"""Takes a DataFrame and create a terminal printable string representing the output within a reasonable window
options to have an aligned (like ls -l) output or parsable (with defined delimiter) in the order given
:param df: pandas DataFrame containing the data to print
:type df: pd.DataFrame
:param delimiter: Character delimiting fields, when char is in the fields, escape those with the escape char
:type delimiter: str
:param visual: Whether to align the columns of the output and ignores escaping characters (output is not parsable)
:type visual: bool
:param escape_char: Character escaping fields when those contain the delimiting char
:type escape_char: str
:param header: First line of the output is the header, containing the name of the columns
:type header: bool
:return: representation of the dataframe
:rtype: str
"""
if len(delimiter) != 1: raise ValueError(f"Invalid Delimiter {delimiter}, should be 1 character")
df = df.fillna('').astype(str)
result = f""
colsizes = {}
for col in df.columns:
colsizes[col] = df[col].astype(str).str.len().max() if df[col].astype(str).str.len().max() > len(col) else len(col)
if header:
first_col = True
for col in colsizes.keys():
# add a delimiter if not in te first column
result += delimiter if not first_col else ""
first_col = False
result += col.rjust(colsizes[col])
result += "\n"
record = df.to_dict(orient='index')
for row in record:
first_col = True
for col in colsizes.keys():
# add a delimiter if not in te first column
result += delimiter if not first_col else ""
first_col = False
result += record[row][col].rjust(colsizes[col])
result += f"{delimiter}\033[94m{row}\033[0m\n"
return result
[docs]def printable_unit_duration(duration) -> str:
"""from a duration in milliseconds, returns a string with an appropriate unit between ms, seconds, minutes and hours
:param duration: duration in milliseconds
:type duration: int
:return: converted duration with unit letter
:rtype: str
"""
# start big, most recs are long so this will often reduce te number of tests
# shorter than 1 hour
if duration < (1000 * 60 * 60):
# shorter than a minute
if duration < (1000 * 60):
# shorter than a second
if duration < (1000):
return f"{duration}ms"
else:
return f"{round(duration / 1000, 1)}s"
else:
return f"{round(duration / (1000 * 60),1)}m"
else:
return f"{round(duration / (1000 * 60 * 60), 1)}h"
[docs]def retry_func( func : callable , excep: Exception, tries : int = 3, **kwargs):
for i in range(tries):
try:
func(**kwargs)
return
except excep as e:
if i == tries - 1:
raise e
[docs]def intersect_ranges(xs, ys):
# Try to get the first range in each iterator:
try:
x, y = next(xs), next(ys)
except StopIteration:
return
while True:
# Yield the intersection of the two ranges, if it's not empty:
intersection = Segment(max(x.start, y.start), min(x.stop, y.stop))
if intersection.length() > 0:
yield intersection
# Try to increment the range with the earlier stopping value:
try:
if x.stop <= y.stop:
x = next(xs)
else:
y = next(ys)
except StopIteration:
return
[docs]class TimeInterval:
def __init__(self, start : datetime, stop : datetime):
#remove the day/month/year component
self.start = start.replace(year=1900, month=1, day=1)
self.stop = stop.replace(year=1900, month=1, day=1)
[docs] def length(self):
return self.stop - self.start
def __repr__(self):
return "TimeInterval([{}, {}])".format(self.start, self.stop)
def __eq__(self, other):
return self.start == other.start and self.stop == other.stop
[docs]def time_intervals_intersect(ti1 : TimeInterval, ti2 : TimeInterval) -> List[TimeInterval]:
"""
given 2 time intervals (those do not take in consideration days, only time in the day), return an array of new interval(s) representing the intersections of the original ones.
Examples
1. time_intervals_intersect( TimeInterval( datetime(1900,1,1,8,57), datetime(1900,1,1,21,4)), TimeInterval( datetime(1900,1,1,10,36), datetime(1900,1,1,22,1))) => [TimeInterval(10:36 , 21:04)]
2. time_intervals_intersect( TimeInterval( datetime(1900,1,1,8,57), datetime(1900,1,1,22,1)), TimeInterval( datetime(1900,1,1,21,4), datetime(1900,1,1,10,36))) => [TimeInterval(08:57 , 10:36),TimeInterval(21:04 , 22:01)]
:param ti1: first interval
:param ti2: second interval
:type ti1: TimeInterval
:type ti2: TimeInterval
:return: list of intervals that intersect
:rtype: list[TimeInterval]
"""
#The calculation and boolean evaluation is done that way to optimize the process, those expressions were obtained using a Karnaugh table. Given the relations between the different start and ending times, the boolean relations used below gives the correct intervals
a = ti1.start <= ti1.stop
b = ti2.start <= ti2.stop
c = ti1.stop <= ti2.stop
d = ti1.start <= ti2.start
e = ti1.start <= ti2.stop
f = ti2.start <= ti1.stop
r = []
#case where correct resulting interval is [start of the 2nd interval : end of the 1st interval]
if c and (d and (not e or f) or not e and f) or d and not e and f : r = [TimeInterval(ti2.start,ti1.stop)]
#case where correct resulting interval is [start of the 2nd interval : end of the 2nd interval]
elif not c and (d and (b or not a) or not a and b) or not a and b and d : r = [ti2]
#case where correct resulting interval is [start of the 1st interval : end of the 2nd interval]
elif not c and (not d and (not e and not f or e) or e and not f) or not d and e and not f : r = [TimeInterval(ti1.start,ti2.stop)]
#case where correct resulting interval is [start of the 1st interval : end of the 1st interval]
elif c and (not d and (not a and not b or a) or a and not b) or a and not b and not d : r = [ti1]
# !! here the expression was simplified because previous statements should already have caught their relevant cases (ie this statement should always be last or changed)
#case where correct resulting interval is [start of the 1st interval : end of the 2nd interval] U [start of the 2nd interval : end of the 1st interval]
elif not a and (not b or e) or d and e and f : r = [TimeInterval(ti1.start,ti2.stop),TimeInterval(ti2.start,ti1.stop)]
#remove the intervals having equal values (3:00 to 3:00)
i = 0
while i < len(r):
if r[i].start == r[i].stop:
r.pop(i)
else:
i += 1
return r
[docs]def get_audio_duration(filename: Path) -> int:
from soundfile import info
if not filename.exists():
print('Warning: could not find file {}, setting duration to 0'.format(filename))
return 0
duration = 0
try:
duration = info(str(filename)).duration
except Exception as e:
print('Warning: could not read duration for {}, setting duration to 0'.format(filename))
pass
return duration
#reads a wav file for a given start point and duration (both in seconds)
[docs]def read_wav(filename, start_s, length_s):
import librosa
#we use librosa because it supports more codecs and is less likely to crash on an unsual encoding
y,sr = librosa.load(filename, sr=None,mono=False, offset=start_s, duration = length_s)
channels = 1 if len(y.shape) == 1 else y.shape[0]
return y, sr, channels
#take 2 audio files, a starting point for each and a length to compare in seconds
#return a divergence score representing the average difference in audio signal
[docs]def calculate_shift(file1, file2, start1, start2, interval):
"""
take 2 audio files, a starting point for each and a length to compare in seconds
return a divergence score representing the average difference in audio signal
:param file1: path to the first wav file to compare
:type file1: str
:param file2: path to the second wav file to compare
:type file2: str
:param start1: starting point for the comparison in seconds for the first audio
:type start1: int
:param start2: starting point for the comparison in seconds for the second audio
:type start2: int
:param interval: length to compare between the 2 audios on in seconds
:type interval: int
:return: tuple of divergence score and number of values used
:rtype: (float, int)
"""
ref, ref_rate, ref_chan = read_wav(file1, start1, interval)
test, test_rate, test_chan = read_wav(file2, start2, interval)
if ref_chan != test_chan: #if different number of channels, shrink if possible
print('WARNING : different number of channels, attempting to compress channels to carry on with analysis')
if ref_chan == 1 and test_chan > 1 :
test = np.mean(test,axis=0)
test_chan = 1
print('{} was shrunk to mono channel for the analysis, it has a higher level of information than {}'.format(file2,file1))
elif ref_chan > 1 and test_chan == 1:
ref = np.mean(ref,axis=0)
ref_chan = 1
print('{} was shrunk to mono channel for the analysis, it has a higher level of information than {}'.format(file1,file2))
else:
raise Exception('audios do not match, {} has {} channel(s) while {} has {}'.format(file1,ref_chan,file2,test_chan))
#in case of multiple channels, reshape to be 1D array (they should have the same number of channels at this point)
if ref_chan > 1:
ref = np.reshape(ref,ref_chan * ref.shape[1])
test = np.reshape(test,test_chan * test.shape[1])
#when sampling rate is different, look for a downsampled rate that can be used
if ref_rate != test_rate:
from math import gcd
new_rate = gcd(ref_rate,test_rate)
print('WARNING : sampling rates do not match between audios ({}Hz and {}Hz), attempting to downsample to {}Hz'.format(ref_rate,test_rate,new_rate))
if ref_rate > new_rate : #downsample if needed
ref = ref[::int(ref_rate/new_rate)]
ref_rate = new_rate
if test_rate > new_rate : #downsample if needed
test = test[::int(test_rate/new_rate)]
test_rate = new_rate
sampling_rate = ref_rate
#downsample to save computation time only if sampling_rate is higher than 400
downsampled_rate = 400 if sampling_rate > 400 else sampling_rate
ref = ref[::int(sampling_rate/downsampled_rate)]
test = test[::int(sampling_rate/downsampled_rate)]
# straight up difference of the audio signal averaged over the 2 segments analysed
# times 1000 is arbitrary, just to have an easily readable and comparable score output
res = np.abs(ref - test).sum() * 1000 /(len(ref))
return res,len(ref)
[docs]def find_lines_involved_in_overlap(df: pd.DataFrame, onset_label: str = 'range_onset', offset_label:str = 'range_offset', labels = []) -> pd.DataFrame:
"""takes a dataframe as input. The dataframe is supposed to have a column for the onset
og a timeline and one for the offset. The function returns a boolean series where
all indexes having 'True' are lines involved in overlaps and 'False' when not
e.g. to select all lines involved in overlaps, use:
```
ovl_segments = df[find_lines_involved_in_overlap(df, 'segment_onset', 'segment_offset')]
```
and to select line that never overlap, use:
```
ovl_segments = df[~find_lines_involved_in_overlap(df, 'segment_onset', 'segment_offset')]
```
:param df: pandas DataFrame where we want to find overlaps, having some time segments described by 2 columns (onset and offset)
:type df: pd.DataFrame
:param onset_label: column label for the onset of time segments
:type onset_label: str
:param offset_label: columns label for the offset of time segments
:type offset_label: str
:param labels: list of column labels that are required to match to be involved in overlap.
:type labels: list[str]
:return: pandas Series of boolean values where 'True' are indexes where overlaps exist
:rtype: pd.Series
"""
conditions = f"(df['{onset_label}'] < row['{offset_label}']) & (df['{offset_label}'] > row['{onset_label}']) & (df.index != row.name)"
for l in labels:
conditions = "(df['{}'] == row['{}']) & ".format(l,l) + conditions
#overlap is defined by having s2.offset > s1.onset and s2.onset < s1.offset and s2.index != s1.index (same seg)
return df.apply(lambda row: True if df[eval(conditions)].shape[0] else False,axis=1)
[docs]def series_to_datetime(time_series, time_index_list, time_column_name:str, date_series = None, date_index_list = None, date_column_name = None) -> pd.Series:
"""
returns a series of datetimes from a series of str. Using pd.to_datetime on all the formats \
listed for a specific column name in an index consisting of IndexColumn items. \
To have the date included and not only time), one can use a second series for date, \
with also the corresponding index and column
:param time_series: pandas series of strings to transform into datetime (can contain NA value => NaT datetime), if date_series is given, time_series should only have the time
:type time_series: pandas.Series
:param time_index_list: list of index to use where the column wanted is present
:type time_index_list: List[IndexColumn]
:param time_column_name: name of the IndexColumn to use (IndexColumn.name value) for accepted formats
:type time_column_name: str
:param date_series: pandas series of strings to transform into the date component of datetime (can contain NA value)
:type date_series: pandas.Series
:param date_index_list: list of index to use where the column wanted is present
:type date_index_list: List[IndexColumn]
:param date_column_name: name of the IndexColumn to use (IndexColumn.name value) for accepted formats for dates
:type date_column_name: str
:return: series with dtype datetime containing the converted datetimes
:rtype: pandas.Series
"""
time_formats = next(x for x in time_index_list if x.name==time_column_name).datetime
series = pd.Series(np.nan, index=time_series.index , dtype='datetime64[ns]')
if date_series is not None:
time_sr = date_series + ' ' + time_series
date_formats = next(x for x in date_index_list if x.name==date_column_name).datetime
for frmt in time_formats:
for dfrmt in date_formats:
series = series.fillna(pd.to_datetime(time_sr, format="{} {}".format(dfrmt,frmt), errors="coerce"))
else:
time_sr = time_series.copy()
for frmt in time_formats:
series = series.fillna(pd.to_datetime(time_sr, format=frmt, errors="coerce"))
return series