from abc import ABC, abstractmethod
import argparse
import datetime
import multiprocessing as mp
import numpy as np
import os, glob
import sys
import pandas as pd
import shutil
import subprocess
from typing import Union, List
from yaml import dump
import ChildProject
from ChildProject.pipelines.pipeline import Pipeline
pipelines = {}
[docs]class AudioProcessor(ABC):
def __init__(
self,
project: ChildProject.projects.ChildProject,
name: str,
input_profile: str = None,
threads: int = 1,
recordings: Union[str, List[str], pd.DataFrame] = None,
):
self.project = project
self.name = name
self.threads = int(threads)
self.recordings = Pipeline.recordings_from_list(recordings)
self.input_profile = input_profile
if self.input_profile:
input_path = os.path.join(
self.project.path,
ChildProject.projects.ChildProject.CONVERTED_RECORDINGS,
self.input_profile,
)
assert os.path.exists(
input_path
), f"provided input profile {input_profile} does not exist"
self.converted = pd.DataFrame()
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
pipelines[cls.SUBCOMMAND] = cls
[docs] def output_directory(self):
return os.path.join(
self.project.path,
ChildProject.projects.ChildProject.CONVERTED_RECORDINGS,
self.name,
)
[docs] @abstractmethod
def process_recording(self, recording):
pass
[docs] def process(self, parameters):
recordings = self.project.get_recordings_from_list(self.recordings)
os.makedirs(name=self.output_directory(), exist_ok=True)
if self.threads == 1:
self.converted = [
self.process_recording(recording)
for recording in recordings.to_dict("records")
]
else:
with mp.Pool(processes=self.threads) as pool:
self.converted = pool.map(
self.process_recording, recordings.to_dict("records")
)
previously_converted = self.read_metadata()
self.converted = pd.concat(self.converted)
if not len(self.converted):
return
self.converted.set_index(
["original_filename", "converted_filename"], inplace=True
)
self.converted = self.converted.assign(parameters=parameters)
if previously_converted is not None:
self.converted = pd.concat(
[
previously_converted[
~previously_converted.index.isin(self.converted.index)
],
self.converted,
]
)
self.export_metadata()
[docs] @staticmethod
def add_parser(parsers):
pass
[docs]class BasicProcessor(AudioProcessor):
SUBCOMMAND = "basic"
def __init__(
self,
project: ChildProject.projects.ChildProject,
name: str,
format: str,
codec: str,
sampling: int,
threads: int = 1,
recordings: Union[str, List[str], pd.DataFrame] = None,
skip_existing: bool = False,
input_profile: str = None,
):
super().__init__(
project,
name,
threads=threads,
recordings=recordings,
input_profile=input_profile,
)
self.format = format
self.codec = str(codec)
self.sampling = int(sampling)
self.skip_existing = bool(skip_existing)
[docs] def process_recording(self, recording):
if recording["recording_filename"] == "NA":
return pd.DataFrame()
original_file = self.project.get_recording_path(
recording["recording_filename"], self.input_profile
)
destination_file = os.path.join(
self.output_directory(),
os.path.splitext(recording["recording_filename"])[0]
+ "."
+ self.format,
)
os.makedirs(name=os.path.dirname(destination_file), exist_ok=True)
skip = self.skip_existing and (
os.path.exists(destination_file) or os.path.islink(destination_file)
)
if skip:
return pd.DataFrame()
args = [
"ffmpeg",
"-y",
"-loglevel",
"error",
"-i",
original_file,
"-c:a",
self.codec,
"-ar",
str(self.sampling),
]
args.append(destination_file)
proc = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
success = proc.returncode == 0
if not success:
print(stderr, file=sys.stderr)
return pd.DataFrame(
[
{
"original_filename": recording["recording_filename"],
"converted_filename": "",
"success": False,
"error": stderr,
}
]
)
else:
converted_files = [
os.path.splitext(recording["recording_filename"])[0]
+ "."
+ self.format
]
return pd.DataFrame(
[
{
"original_filename": recording["recording_filename"],
"converted_filename": cf,
"success": True,
}
for cf in converted_files
]
)
[docs] @staticmethod
def add_parser(subparsers, subcommand):
parser = subparsers.add_parser(subcommand, help="basic audio conversion")
parser.add_argument("--format", help="audio format (e.g. wav)", required=True)
parser.add_argument(
"--codec", help="audio codec (e.g. pcm_s16le)", required=True
)
parser.add_argument(
"--sampling",
help="sampling frequency (e.g. 16000)",
required=True,
type=int,
)
parser.add_argument(
"--skip-existing",
dest="skip_existing",
required=False,
default=False,
action="store_true",
)
parser.add_argument(
"--recordings",
help="list of recordings to process, separated by whitespaces; only values of 'recording_filename' present in the metadata are supported.",
default=None,
nargs="+",
)
[docs]class VettingProcessor(AudioProcessor):
SUBCOMMAND = "vetting"
def __init__(
self,
project: ChildProject.projects.ChildProject,
name: str,
segments_path: str,
threads: int = 1,
recordings: Union[str, List[str], pd.DataFrame] = None,
input_profile: str = None,
):
super().__init__(
project,
name,
threads=threads,
recordings=recordings,
input_profile=input_profile,
)
self.segments = pd.read_csv(segments_path)
[docs] def process_recording(self, recording):
import librosa
import soundfile
original_file = self.project.get_recording_path(
recording["recording_filename"], self.input_profile
)
destination_file = os.path.join(
self.output_directory(),
os.path.splitext(recording["recording_filename"])[0] + ".wav",
)
os.makedirs(name=os.path.dirname(destination_file), exist_ok=True)
vettoed_segments = self.segments[
self.segments["recording_filename"] == recording["recording_filename"]
]
signal, sr = librosa.load(original_file, sr=None, mono=False)
onsets = librosa.time_to_samples(
times=vettoed_segments["segment_onset"].values / 1000, sr=sr
)
offsets = librosa.time_to_samples(
times=vettoed_segments["segment_offset"].values / 1000, sr=sr
)
if signal.ndim == 1:
for i in range(len(onsets)):
signal[onsets[i] : offsets[i]] = 0
soundfile.write(destination_file, signal, samplerate=sr)
else:
for i in range(len(onsets)):
signal[:, onsets[i] : offsets[i]] = 0
soundfile.write(destination_file, np.transpose(signal), samplerate=sr)
return pd.DataFrame(
[
{
"original_filename": recording["recording_filename"],
"converted_filename": os.path.splitext(
recording["recording_filename"]
)[0]
+ ".wav",
"success": True,
}
]
)
[docs] @staticmethod
def add_parser(subparsers, subcommand):
parser = subparsers.add_parser(subcommand, help="vetting")
parser.add_argument(
"--segments-path",
help="path to the CSV dataframe containing the segments to be vetted",
required=True,
)
parser.add_argument(
"--recordings",
help="list of recordings to process, separated by commas; only values of 'recording_filename' present in the metadata are supported.",
default=None,
nargs="+",
)
[docs]class ChannelMapper(AudioProcessor):
SUBCOMMAND = "channel-mapping"
def __init__(
self,
project: ChildProject.projects.ChildProject,
name: str,
channels: list,
threads: int = 1,
recordings: Union[str, List[str], pd.DataFrame] = None,
input_profile: str = None,
):
super().__init__(
project,
name,
threads=threads,
recordings=recordings,
input_profile=input_profile,
)
self.channels = [list(map(float, channel.split(","))) for channel in channels]
self.channels = np.array(self.channels)
[docs] def process_recording(self, recording):
import librosa
import soundfile
original_file = self.project.get_recording_path(
recording["recording_filename"], self.input_profile
)
destination_file = os.path.join(
self.output_directory(),
os.path.splitext(recording["recording_filename"])[0] + ".wav",
)
os.makedirs(name=os.path.dirname(destination_file), exist_ok=True)
df = pd.DataFrame(
[
{
"original_filename": recording["recording_filename"],
"converted_filename": os.path.splitext(
recording["recording_filename"]
)[0]
+ ".wav",
}
]
)
signal, sr = librosa.load(original_file, sr=None, mono=False)
if self.channels.shape[1] != signal.shape[0]:
print(
"skipping '{}' due to channel mismatch (expected {} channels, got {})".format(
recording["recording_filename"],
self.channels.shape[1],
signal.shape[0],
)
)
return df.assign(success=False)
output = np.matmul(self.channels, signal)
soundfile.write(destination_file, np.transpose(output), samplerate=sr)
return df.assign(success=True)
[docs] @staticmethod
def add_parser(subparsers, subcommand):
parser = subparsers.add_parser(subcommand, help="channel mapping")
parser.add_argument(
"--channels",
help="lists of weigths for each channel",
nargs="+",
required=True,
)
parser.add_argument(
"--recordings",
help="list of recordings to process, separated by commas; only values of 'recording_filename' present in the metadata are supported.",
default=None,
nargs="+",
)
[docs]class AudioProcessingPipeline(Pipeline):
def __init__(self):
pass
[docs] def run(
self,
path: str,
name: str,
processor: str,
threads: int = 1,
func=None,
**kwargs,
):
parameters = locals()
parameters = [
{key: parameters[key]}
for key in parameters
if key not in ["self", "kwargs"]
]
parameters.extend([{key: kwargs[key]} for key in kwargs])
date = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.project = ChildProject.projects.ChildProject(path)
self.project.read()
if processor not in pipelines:
raise NotImplementedError(f"invalid pipeline '{processor}'")
proc = pipelines[processor](self.project, name, threads=threads, **kwargs)
proc.process(f"parameters_{date}.yml")
print("exported audio to {}".format(proc.output_directory()))
parameters_path = os.path.join(
proc.output_directory(), f"parameters_{date}.yml"
)
dump(
{
"parameters": parameters,
"package_version": ChildProject.__version__,
"date": date,
},
open(parameters_path, "w+"),
)
print("exported processor parameters to {}".format(parameters_path))
return os.path.join(proc.output_directory(), "recordings.csv"), parameters_path
[docs] @staticmethod
def setup_parser(parser):
parser.add_argument("path", help="path to the dataset")
parser.add_argument("name", help="name of the export profile")
subparsers = parser.add_subparsers(help="processor", dest="processor")
for pipeline in pipelines:
pipelines[pipeline].add_parser(subparsers, pipeline)
parser.add_argument(
"--threads",
help="amount of threads running conversions in parallel (0 = uses all available cores)",
required=False,
default=1,
type=int,
)
parser.add_argument(
"--input-profile",
help="profile of input recordings (process raw recordings by default)",
default=None,
)