import argparse
import datetime
import itertools
import json
import math
import multiprocessing as mp
import os
import pandas as pd
import shutil
import subprocess
import sys
import array
import traceback
from typing import List
from yaml import dump
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter
from numpy import log10
from pydub import AudioSegment
from pydub.utils import get_array_type
from parselmouth import Sound
from parselmouth import SpectralAnalysisWindowShape
import ChildProject
from ChildProject.pipelines.pipeline import Pipeline
from ChildProject.tables import assert_dataframe, assert_columns_presence
from typing import Tuple
import time
[docs]def pad_interval(
onset: int, offset: int, chunks_length: int, chunks_min_amount: int = 1
) -> Tuple[int, int]:
length = offset - onset
target_length = chunks_length * max(
chunks_min_amount, math.ceil(length / chunks_length)
)
onset -= (target_length - length) / 2
offset += (target_length - length) / 2
return int(onset), int(offset)
[docs]class Chunk:
def __init__(
self, recording_filename, onset, offset, segment_onset, segment_offset
):
self.recording_filename = recording_filename
self.onset = onset
self.offset = offset
self.segment_onset = segment_onset
self.segment_offset = segment_offset
[docs] def getbasename(self, extension):
return "{}_{}_{}.{}".format(
os.path.splitext(self.recording_filename.replace("/", "_"))[0],
self.onset,
self.offset,
extension,
)
[docs]class ZooniversePipeline(Pipeline):
def __init__(self):
self.chunks = []
[docs] def get_credentials(self, login: str = "", pwd: str = ""):
"""returns input credentials if provided or attempts to read them
from the environment variables.
:param login: input login, defaults to ''
:type login: str, optional
:param pwd: input password, defaults to ''
:type pwd: str, optional
:return: (login, pwd)
:rtype: (str, str)
"""
if login and pwd:
self.zooniverse_login = login
self.zooniverse_pwd = pwd
return (self.zooniverse_login, self.zooniverse_pwd)
if os.getenv("ZOONIVERSE_LOGIN"):
self.zooniverse_login = os.getenv("ZOONIVERSE_LOGIN")
else:
raise Exception(
"no login specified, and no 'ZOONIVERSE_LOGIN' environment variable found"
)
if os.getenv("ZOONIVERSE_PWD"):
self.zooniverse_pwd = os.getenv("ZOONIVERSE_PWD")
else:
raise Exception(
"no password specified, and no 'ZOONIVERSE_PWD' environment variable found"
)
return (self.zooniverse_login, self.zooniverse_pwd)
def _split_recording(self, segments: pd.DataFrame) -> list:
#from raw sound data and sampling rate, build the spectrogram as a matplotlib figure and return it
def _create_spectrogram(data,sr):
snd = Sound(data,sampling_frequency=sr)
# this parameters were chosen to output a spectrogram useful for zooniverse applications (short sounds from babies) we did not feel the need to have flexibility on them
spectrogram = snd.to_spectrogram(window_length=0.0075,maximum_frequency=8000, time_step= 0.0001 ,frequency_step = 0.1,window_shape= SpectralAnalysisWindowShape.GAUSSIAN)
fig = plt.figure(figsize=(12, 6.75)) #size of the image, we chose 1200x675 pixels for a better display on zooniverse
gs = fig.add_gridspec(2, hspace=0, height_ratios=[1, 3]) #2 plots (spectrogram 3x bigger than oscillogram)
axs = gs.subplots(sharex=True)
#scpectrogram plot
dynamic_range=65
X, Y = spectrogram.x_grid(), spectrogram.y_grid()
sg_db = 10 * log10(spectrogram.values)
axs[1].pcolormesh(X, Y, sg_db, vmin=sg_db.max() - dynamic_range, cmap='Greys')
axs[1].set_ylim([spectrogram.ymin, spectrogram.ymax])
axs[1].set_xlabel("time [s]")
axs[1].set_ylabel("frequency [Hz]")
axs[1].tick_params( labelright=True)
axs[1].set_xlim([snd.xmin, snd.xmax])
#oscillogram plot
axs[0].plot(snd.xs(), snd.values.T, linewidth=0.5)
axs[0].set_xlim([snd.xmin, snd.xmax])
axs[0].set_ylabel("amplitude")
#remove overlapping labels
ticks = axs[0].yaxis.get_major_ticks()
if len(ticks) : ticks[0].label1.set_visible(False)
if len(ticks) > 1 : ticks[1].label1.set_visible(False)
fig.tight_layout()
return fig
segments = segments.to_dict(orient="records")
chunks = []
recording = segments[0]["recording_filename"]
source = self.project.get_recording_path(recording, self.profile)
audio = AudioSegment.from_file(source)
print("extracting chunks from {}...".format(source))
for segment in segments:
original_onset = int(segment["segment_onset"])
original_offset = int(segment["segment_offset"])
onset = original_onset
offset = original_offset
if self.chunks_length > 0:
onset, offset = pad_interval(
onset, offset, self.chunks_length, self.chunks_min_amount
)
if onset < 0:
print("skipping chunk with negative onset ({})".format(onset))
continue
intervals = [
(a, a + self.chunks_length)
for a in range(onset, offset, self.chunks_length)
]
else:
intervals = [(onset, offset)]
for (onset, offset) in intervals:
chunk = Chunk(
segment["recording_filename"],
onset,
offset,
original_onset,
original_offset,
)
chunk_audio = audio[chunk.onset : chunk.offset].fade_in(10).fade_out(10)
wav = os.path.join(self.destination, "chunks", chunk.getbasename("wav"))
mp3 = os.path.join(self.destination, "chunks", chunk.getbasename("mp3"))
if os.path.exists(wav) and os.path.getsize(wav) > 0:
print("{} already exists, exportation skipped.".format(wav))
else:
chunk_audio.export(wav, format="wav")
if os.path.exists(mp3) and os.path.getsize(mp3) > 0:
print("{} already exists, exportation skipped.".format(mp3))
else:
chunk_audio.export(mp3, format="mp3")
if self.spectro:
png = os.path.join(self.destination, "chunks", chunk.getbasename("png"))
#convert pydub sound data into raw data that the parselmouth library can use
bit_depth = chunk_audio.sample_width * 8
array_type = get_array_type(bit_depth)
sound = array.array(array_type, chunk_audio._data)
sr = chunk_audio.frame_rate
fig = _create_spectrogram(sound,sr) #create the plot figure
if os.path.exists(png) and os.path.getsize(png) > 0:
print("{} already exists, exportation skipped.".format(png))
else:
fig.savefig(png)
plt.close(fig)
chunks.append(chunk)
return chunks
[docs] def upload_chunks(
self,
chunks: str,
project_id: int,
set_name: str,
zooniverse_login="",
zooniverse_pwd="",
amount: int = 1000,
ignore_errors: bool = False,
**kwargs
):
"""Uploads ``amount`` audio chunks from the CSV dataframe `chunks` to a zooniverse project.
:param chunks: path to the chunk CSV dataframe
:type chunks: [type]
:param project_id: zooniverse project id
:type project_id: int
:param set_name: name of the subject set
:type set_name: str
:param zooniverse_login: zooniverse login. If not specified, the program attempts to get it from the environment variable ``ZOONIVERSE_LOGIN`` instead, defaults to ''
:type zooniverse_login: str, optional
:param zooniverse_pwd: zooniverse password. If not specified, the program attempts to get it from the environment variable ``ZOONIVERSE_PWD`` instead, defaults to ''
:type zooniverse_pwd: str, optional
:param amount: amount of chunks to upload, defaults to 0
:type amount: int, optional
"""
self.chunks_file = chunks
self.get_credentials(zooniverse_login, zooniverse_pwd)
metadata_location = os.path.join(self.chunks_file)
try:
self.chunks = pd.read_csv(metadata_location, index_col="index")
except:
raise Exception(
"cannot read chunk metadata from {}.".format(metadata_location)
)
assert_dataframe("chunks", self.chunks)
assert_columns_presence(
"chunks",
self.chunks,
{"recording_filename", "onset", "offset", "uploaded", "mp3"},
)
from panoptes_client import Panoptes, Project, Subject, SubjectSet
Panoptes.connect(username=self.zooniverse_login, password=self.zooniverse_pwd)
zooniverse_project = Project(project_id)
subjects_metadata = []
uploaded = 0
subject_set = None
for ss in zooniverse_project.links.subject_sets:
if ss.display_name == set_name:
subject_set = ss
if subject_set is None:
subject_set = SubjectSet()
subject_set.links.project = zooniverse_project
subject_set.display_name = set_name
subject_set.save()
subjects = []
chunks_to_upload = self.chunks[self.chunks["uploaded"] == False].head(amount)
chunks_to_upload = chunks_to_upload.to_dict(orient="index")
if len(chunks_to_upload) == 0:
print("nothing left to upload.")
return
for chunk_index in chunks_to_upload:
chunk = chunks_to_upload[chunk_index]
print(
"uploading chunk {} ({},{})".format(
chunk["recording_filename"], chunk["onset"], chunk["offset"]
)
)
subject = Subject()
subject.links.project = zooniverse_project
subject.add_location(
os.path.join(os.path.dirname(self.chunks_file), "chunks", chunk["mp3"])
)
subject.metadata["date_extracted"] = chunk["date_extracted"]
try:
subject.save()
except Exception as e:
print(
"failed to save chunk {}. an exception has occured:\n{}".format(
chunk_index, str(e)
)
)
print(traceback.format_exc())
if args.ignore_errors:
continue
else:
print("subject upload halting here.")
break
subjects.append(subject)
chunk["index"] = chunk_index
chunk["zooniverse_id"] = str(subject.id)
chunk["project_id"] = str(project_id)
chunk["subject_set"] = str(subject_set.display_name)
chunk["uploaded"] = True
subjects_metadata.append(chunk)
if len(subjects) == 0:
return
subject_set.add(subjects)
self.chunks.update(pd.DataFrame(subjects_metadata).set_index("index"))
self.chunks.to_csv(self.chunks_file)
[docs] def retrieve_classifications(
self,
destination: str,
project_id: int,
zooniverse_login: str = "",
zooniverse_pwd: str = "",
chunks: List[str] = [],
**kwargs
):
"""Retrieve classifications from Zooniverse as a CSV dataframe.
They will be matched with the original chunks metadata if the path one
or more chunk metadata files is provided.
:param destination: output CSV dataframe destination
:type destination: str
:param project_id: zooniverse project id
:type project_id: int
:param zooniverse_login: zooniverse login. If not specified, the program attempts to get it from the environment variable ``ZOONIVERSE_LOGIN`` instead, defaults to ''
:type zooniverse_login: str, optional
:param zooniverse_pwd: zooniverse password. If not specified, the program attempts to get it from the environment variable ``ZOONIVERSE_PWD`` instead, defaults to ''
:type zooniverse_pwd: str, optional
:param chunks: the list of chunk metadata files to match the classifications to. If provided, only the classifications that have a match will be returned.
:type chunks: List[str], optional
"""
self.get_credentials(zooniverse_login, zooniverse_pwd)
from panoptes_client import Panoptes, Project, Classification
Panoptes.connect(username=self.zooniverse_login, password=self.zooniverse_pwd)
project = Project(project_id)
answers_translation_table = []
for workflow in project.links.workflows:
workflow_id = workflow.id
for task_id in workflow.tasks:
n = 0
for answer in workflow.tasks[task_id]["answers"]:
answers_translation_table.append(
{
"workflow_id": str(workflow_id),
"task_id": str(task_id),
"answer_id": str(n),
"answer": answer["label"],
}
)
n += 1
answers_translation_table = pd.DataFrame(answers_translation_table)
classifications = []
for c in Classification.where(
scope="project", page_size=1000, project_id=project_id
):
classifications.append(c.raw)
classifications = pd.DataFrame(classifications)
classifications["user_id"] = classifications["links"].apply(lambda s: s["user"])
classifications["subject_id"] = (
classifications["links"].apply(lambda s: s["subjects"][0]).astype(int)
)
classifications["workflow_id"] = classifications["links"].apply(
lambda s: s["workflow"]
)
classifications["tasks"] = classifications["annotations"].apply(
lambda s: [(str(r["task"]), str(r["value"])) for r in s]
)
classifications = classifications.explode("tasks")
classifications["task_id"] = classifications["tasks"].str[0]
classifications["answer_id"] = classifications["tasks"].str[1]
classifications.drop(columns=["tasks"], inplace=True)
classifications = classifications[
["id", "user_id", "subject_id", "task_id", "answer_id", "workflow_id"]
]
classifications = classifications.merge(
answers_translation_table,
left_on=["workflow_id", "task_id", "answer_id"],
right_on=["workflow_id", "task_id", "answer_id"],
)
if chunks:
chunks = pd.concat([pd.read_csv(f) for f in chunks])
classifications = classifications.merge(
chunks, left_on="subject_id", right_on="zooniverse_id"
)
classifications.set_index("id").to_csv(destination)
[docs] def run(self, action, **kwargs):
if action == "extract-chunks":
return self.extract_chunks(**kwargs)
elif action == "upload-chunks":
return self.upload_chunks(**kwargs)
elif action == "retrieve-classifications":
return self.retrieve_classifications(**kwargs)
[docs] @staticmethod
def setup_parser(parser):
subparsers = parser.add_subparsers(help="action", dest="action")
parser_extraction = subparsers.add_parser(
"extract-chunks",
help="extract chunks to <destination>, and exports the metadata inside of this directory",
)
parser_extraction.add_argument("path", help="path to the dataset")
parser_extraction.add_argument(
"--keyword", help="export keyword", required=True
)
parser_extraction.add_argument(
"--chunks-length",
help="chunk length (in milliseconds). if <= 0, the segments will not be split into chunks (default value: 0)",
type=int,
default=0,
)
parser_extraction.add_argument(
"--chunks-min-amount",
help="minimum amount of chunks to extract from a segment (default value: 1)",
default=1,
)
parser_extraction.add_argument(
"--spectrogram",
help="the extraction generates a png spectrogram (default False)",
action="store_true",
)
parser_extraction.add_argument(
"--segments", help="path to the input segments dataframe", required=True
)
parser_extraction.add_argument(
"--destination", help="destination", required=True
)
parser_extraction.add_argument(
"--profile",
help="Recording profile to extract the audio clips from. If not specified, raw recordings will be used",
default="",
)
parser_extraction.add_argument(
"--threads", help="how many threads to run on", default=0, type=int
)
parser_upload = subparsers.add_parser(
"upload-chunks", help="upload chunks and updates chunk state"
)
parser_upload.add_argument(
"--chunks", help="path to the chunk CSV dataframe", required=True
)
parser_upload.add_argument(
"--project-id", help="zooniverse project id", required=True, type=int
)
parser_upload.add_argument(
"--set-name", help="subject set display name", required=True
)
parser_upload.add_argument(
"--amount",
help="amount of chunks to upload",
required=False,
type=int,
default=1000,
)
parser_upload.add_argument(
"--zooniverse-login",
help="zooniverse login. If not specified, the program attempts to get it from the environment variable ZOONIVERSE_LOGIN instead",
default="",
)
parser_upload.add_argument(
"--zooniverse-pwd",
help="zooniverse password. If not specified, the program attempts to get it from the environment variable ZOONIVERSE_PWD instead",
default="",
)
parser_upload.add_argument(
"--ignore-errors",
help="keep uploading even when a subject fails to upload for some reason",
action="store_true",
)
parser_retrieve = subparsers.add_parser(
"retrieve-classifications",
help="retrieve classifications and save them as <destination>",
)
parser_retrieve.add_argument(
"--destination", help="output CSV dataframe destination", required=True
)
parser_retrieve.add_argument(
"--project-id", help="zooniverse project id", required=True, type=int
)
parser_retrieve.add_argument(
"--zooniverse-login",
help="zooniverse login. If not specified, the program attempts to get it from the environment variable ZOONIVERSE_LOGIN instead",
default="",
)
parser_retrieve.add_argument(
"--zooniverse-pwd",
help="zooniverse password. If not specified, the program attempts to get it from the environment variable ZOONIVERSE_PWD instead",
default="",
)
parser_retrieve.add_argument(
"--chunks", help="list of chunks", nargs="+", required=True
)