Skip to content

Commit

Permalink
All model calls return the worker instance that processed the task
Browse files Browse the repository at this point in the history
database errors now log SQL triggering error

Promise.all used in model.js for syncing indices and values, Dramatically increases GPU speed (2x boost: 1170x on 3090!!)

worker instance no longer passed down from processNextFile, it's set in feedChunks to model

predictworkers list adds isReady to isAvailable to fix getting list on launch
all predictworker messages from worker include worker instance
Added wiatForWorker to finx issue with no ready / available workers  returning undefined.
  • Loading branch information
Mattk70 committed Feb 6, 2024
1 parent 9d29eb1 commit f063941
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 226 deletions.
15 changes: 11 additions & 4 deletions js/BirdNet2.4.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const NOT_BIRDS = [
"Alouatta pigra_Mexican Black Howler Monkey",
"Tamias striatus_Eastern Chipmunk",
"Tamiasciurus hudsonicus_Red Squirrel"];

const MYSTERIES = ['Unknown Sp._Unknown Sp.'];
const GRAYLIST = [];
const GOLDEN_LIST = []
Expand All @@ -92,9 +93,11 @@ const CONFIG = {

onmessage = async (e) => {
const modelRequest = e.data.message;
const worker = e.data.worker;
let response;
try {
switch (modelRequest) {

case "load": {
const version = e.data.model;
DEBUG && console.log("load request to worker");
Expand Down Expand Up @@ -144,7 +147,8 @@ onmessage = async (e) => {
sampleRate: myModel.config.sampleRate,
chunkLength: myModel.chunkLength,
backend: tf.getBackend(),
labels: labels
labels: labels,
worker: worker
});
});
break;
Expand Down Expand Up @@ -199,7 +203,8 @@ onmessage = async (e) => {
channels: 1,
image: image,
file: specFile,
filepath: filepath
filepath: filepath,
worker: worker
};
postMessage(response);
break;
Expand All @@ -218,7 +223,8 @@ onmessage = async (e) => {
lat: myModel.lat,
lon: myModel.lon,
week: myModel.week,
updateResults: false
updateResults: false,
worker: worker
});
break;
}
Expand Down Expand Up @@ -456,6 +462,7 @@ class Model {
const finalPrediction = newPrediction || prediction;
//new
const { indices, values } = tf.topk(finalPrediction, 5, true)
// const [topIndices, topValues] = await Promise.all([indices.arraySync(), values.arraySync()]).catch((err => console.log(err)));
const topIndices = indices.arraySync();
const topValues = values.arraySync();
indices.dispose();
Expand Down Expand Up @@ -513,7 +520,7 @@ class Model {
};

async predictChunk(audioBuffer, start, fileStart, file, threshold, confidence) {
DEBUG && console.log('predictCunk begin', tf.memory().numTensors);
DEBUG && console.log('predictCunk begin', tf.memory());
audioBuffer = tf.tensor1d(audioBuffer);

// check if we need to pad
Expand Down
8 changes: 4 additions & 4 deletions js/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const sqlite3 = DEBUG ? require('sqlite3').verbose() : require('sqlite3');
sqlite3.Database.prototype.runAsync = function (sql, ...params) {
return new Promise((resolve, reject) => {
this.run(sql, params, function (err) {
if (err) return reject(console.log(err));
if (err) return reject(console.log(err, sql));
resolve(this);
});
});
Expand All @@ -15,7 +15,7 @@ sqlite3.Database.prototype.runAsync = function (sql, ...params) {
sqlite3.Database.prototype.allAsync = function (sql, ...params) {
return new Promise((resolve, reject) => {
this.all(sql, params, (err, rows) => {
if (err) return reject(console.log(err));
if (err) return reject(console.log(err, sql));
resolve(rows);
});
});
Expand All @@ -25,7 +25,7 @@ sqlite3.Statement.prototype.allAsync = function (...params) {
if (DEBUG) console.log('SQL\n', this.sql, '\nParams\n', params)
return new Promise((resolve, reject) => {
this.all(params, (err, rows) => {
if (err) return reject(console.log(err));
if (err) return reject(console.log(err, sql));
if (DEBUG) console.log('\nRows:', rows)
resolve(rows);
});
Expand All @@ -35,7 +35,7 @@ sqlite3.Statement.prototype.allAsync = function (...params) {
sqlite3.Database.prototype.getAsync = function (sql, ...params) {
return new Promise((resolve, reject) => {
this.get(sql, params, (err, row) => {
if (err) return reject(console.log(err));
if (err) return reject(console.log(err, sql));
resolve(row);
});
});
Expand Down
252 changes: 123 additions & 129 deletions js/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,141 +21,135 @@ const CONFIG = {

onmessage = async (e) => {
const modelRequest = e.data.message;
const worker = e.data.worker;
let response;
try {
switch (modelRequest) {
case "load": {const version = e.data.model;
if (DEBUG) {
console.log("load request to worker");
}
const { height: height, width: width, labels: labels, location: location } = JSON.parse(fs.readFileSync(path.join(__dirname, `../${version}_model_config.json`), "utf8"));
const appPath = "../" + location + "/";
const list = e.data.list;
const batch = e.data.batchSize;
const backend = e.data.backend;
// labels.push(...MYSTERIES);
// postMessage({
// message: "labels",
// labels: labels
// });
if (DEBUG) {
console.log(`model received load instruction. Using list: ${list}, batch size ${batch}`);
}
tf.setBackend(backend).then(async () => {
if (backend === "webgl") {
tf.env().set("WEBGL_FORCE_F16_TEXTURES", true);
tf.env().set("WEBGL_PACK", true);
tf.env().set("WEBGL_EXP_CONV", true);
tf.env().set("TOPK_K_CPU_HANDOFF_THRESHOLD", 128);
tf.env().set("TOPK_LAST_DIM_CPU_HANDOFF_SIZE_THRESHOLD", 0);
}
tf.enableProdMode();
if (DEBUG) {
console.log(tf.env());
console.log(tf.env().getFlags());
}
myModel = new Model(appPath, list, version);
myModel.height = height;
myModel.width = width;
myModel.list = e.data.list;
myModel.lat = parseFloat(e.data.lat);
myModel.lon = parseFloat(e.data.lon);
myModel.week = parseInt(e.data.week);
myModel.speciesThreshold = parseFloat(e.data.threshold);
myModel.labels = labels;
await myModel.loadModel();
// postMessage({
// message: "update-list",
// blocked: BLOCKED_IDS,
// lat: myModel.lat,
// lon: myModel.lon,
// week: myModel.week,
// updateResults: false
// });
myModel.warmUp(batch);
BACKEND = tf.getBackend();
postMessage({
message: "model-ready",
sampleRate: myModel.config.sampleRate,
chunkLength: myModel.chunkLength,
backend: tf.getBackend(),
labels: labels
});
});
break;
}
case "predict": {if (myModel.model_loaded) {
const { chunks: chunks, start: start, fileStart: fileStart, file: file, snr: snr, confidence: confidence, worker: worker, context: context, resetResults: resetResults } = e.data;
myModel.useContext = context;
myModel.selection = !resetResults;
const [result,filename,startPosition] = await myModel.predictChunk(chunks, start, fileStart, file, snr, confidence / 1000);
response = {
message: "prediction",
file: filename,
result: result,
fileStart: startPosition,
worker: worker,
selection: myModel.selection
};
postMessage(response);
myModel.result = [];
}
break;
}
case "get-spectrogram": {const buffer = e.data.buffer;
if (buffer.length < myModel.chunkLength) {
return;
}
const specFile = e.data.file;
const filepath = e.data.filepath;
const spec_height = e.data.height;
const spec_width = e.data.width;
let image;
const signal = tf.tensor1d(buffer, "float32");
const bufferTensor = myModel.normalise_audio(signal);
signal.dispose();
const imageTensor = tf.tidy(() => {
return myModel.makeSpectrogram(bufferTensor);
});
image = tf.tidy(() => {
let spec = myModel.fixUpSpecBatch(tf.expandDims(imageTensor, 0), spec_height, spec_width);
const spec_max = tf.max(spec);
return spec.mul(255).div(spec_max).dataSync();
});
bufferTensor.dispose();
imageTensor.dispose();
response = {
message: "spectrogram",
width: myModel.inputShape[2],
height: myModel.inputShape[1],
channels: myModel.inputShape[3],
image: image,
file: specFile,
filepath: filepath
};
postMessage(response);
break;
}
case "load": {
const version = e.data.model;
if (DEBUG) {
console.log("load request to worker");
}
const { height: height, width: width, labels: labels, location: location } = JSON.parse(fs.readFileSync(path.join(__dirname, `../${version}_model_config.json`), "utf8"));
const appPath = "../" + location + "/";
const list = e.data.list;
const batch = e.data.batchSize;
const backend = e.data.backend;
if (DEBUG) {
console.log(`model received load instruction. Using list: ${list}, batch size ${batch}`);
}
tf.setBackend(backend).then(async () => {
if (backend === "webgl") {
tf.env().set("WEBGL_FORCE_F16_TEXTURES", true);
tf.env().set("WEBGL_PACK", true);
tf.env().set("WEBGL_EXP_CONV", true);
tf.env().set("TOPK_K_CPU_HANDOFF_THRESHOLD", 128);
tf.env().set("TOPK_LAST_DIM_CPU_HANDOFF_SIZE_THRESHOLD", 0);
}
tf.enableProdMode();
if (DEBUG) {
console.log(tf.env());
console.log(tf.env().getFlags());
}
myModel = new Model(appPath, list, version);
myModel.height = height;
myModel.width = width;
myModel.list = e.data.list;
myModel.lat = parseFloat(e.data.lat);
myModel.lon = parseFloat(e.data.lon);
myModel.week = parseInt(e.data.week);
myModel.speciesThreshold = parseFloat(e.data.threshold);
myModel.labels = labels;
await myModel.loadModel();
myModel.warmUp(batch);
BACKEND = tf.getBackend();
postMessage({
message: "model-ready",
sampleRate: myModel.config.sampleRate,
chunkLength: myModel.chunkLength,
backend: tf.getBackend(),
labels: labels,
worker: worker
});
});
break;
}
case "predict": {
if (myModel.model_loaded) {
const { chunks: chunks, start: start, fileStart: fileStart, file: file, snr: snr, confidence: confidence, context: context, resetResults: resetResults } = e.data;
myModel.useContext = context;
myModel.selection = !resetResults;
const [result,filename,startPosition] = await myModel.predictChunk(chunks, start, fileStart, file, snr, confidence / 1000);
response = {
message: "prediction",
file: filename,
result: result,
fileStart: startPosition,
worker: worker,
selection: myModel.selection
};
postMessage(response);
myModel.result = [];
}
break;
}
case "get-spectrogram": {
const buffer = e.data.buffer;
if (buffer.length < myModel.chunkLength) {
return;
}
const specFile = e.data.file;
const filepath = e.data.filepath;
const spec_height = e.data.height;
const spec_width = e.data.width;
let image;
const signal = tf.tensor1d(buffer, "float32");
const bufferTensor = myModel.normalise_audio(signal);
signal.dispose();
const imageTensor = tf.tidy(() => {
return myModel.makeSpectrogram(bufferTensor);
});
image = tf.tidy(() => {
let spec = myModel.fixUpSpecBatch(tf.expandDims(imageTensor, 0), spec_height, spec_width);
const spec_max = tf.max(spec);
return spec.mul(255).div(spec_max).dataSync();
});
bufferTensor.dispose();
imageTensor.dispose();
response = {
message: "spectrogram",
width: myModel.inputShape[2],
height: myModel.inputShape[1],
channels: myModel.inputShape[3],
image: image,
file: specFile,
filepath: filepath,
worker: worker
};
postMessage(response);
break;
}
case "list": {
myModel.list = e.data.list;
myModel.lat = parseFloat(e.data.lat);
myModel.lon = parseFloat(e.data.lon);
myModel.week = parseInt(e.data.week) || myModel.week;
myModel.speciesThreshold = parseFloat(e.data.threshold);
if (DEBUG) {
console.log(`Setting list to ${myModel.list}`);
}
await myModel.setList();
postMessage({
message: "update-list",
blocked: BLOCKED_IDS,
lat: myModel.lat,
lon: myModel.lon,
week: myModel.week,
updateResults: true
});
break;
}
if (DEBUG) {
console.log(`Setting list to ${myModel.list}`);
}
await myModel.setList();
postMessage({
message: "update-list",
blocked: BLOCKED_IDS,
lat: myModel.lat,
lon: myModel.lon,
week: myModel.week,
updateResults: true,
worker: worker
});
break;
}
}
}
// If worker was respawned
Expand Down Expand Up @@ -386,9 +380,9 @@ class Model {
const finalPrediction = newPrediction || prediction;
const { indices, values } = tf.topk(finalPrediction, 5, true)
//const adjusted_values = tf.div(1, tf.add(1, tf.exp(tf.mul(tf.neg(10), values.sub(0.6)))));

const topIndices = indices.arraySync();
const topValues = values.arraySync();
const [topIndices, topValues] = await Promise.all([indices.array(), values.array()]).catch((err => console.log(err)));
// const topIndices = indices.arraySync();
// const topValues = await values.arraySync();
indices.dispose();
values.dispose();

Expand Down
Loading

0 comments on commit f063941

Please sign in to comment.