Skip to content

Commit

Permalink
fix: add 10s rewind button
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Oct 22, 2024
1 parent 065ffca commit ca5a087
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 198 deletions.
44 changes: 43 additions & 1 deletion packages/live-streaming/frontend/src/__tests__/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,49 @@ describe("MediaStream", () => {
);

await waitForResolved(() =>
expect(maxTime).to.eq(chunks[chunks.length - 1].chunk.time)
expect(maxTime).to.eq(
chunks[chunks.length - 1].track.startTime +
chunks[chunks.length - 1].chunk.time
)
);
});

it("start before first chunk", async () => {
let framesPerTrack = 2;

const { mediaStreams, track1, viewerStreams } =
await createScenario({
delay: 1000,
first: {
start: 10,
size: framesPerTrack,
end: 1010,
type: "video",
},
});
let chunks: { track: Track<any>; chunk: Chunk }[] = [];

// start playing from track1 and then assume we will start playing from track2
const progress = 0;
let maxTime = 0;
iterator = await viewerStreams.iterate(progress, {
onProgress: (ev) => {
chunks.push(ev);
},
changeProcessor: (change) => change, // allow concurrent tracks
onMaxTimeChange: (newMaxTime) => {
maxTime = newMaxTime.maxTime;
},
});
const expecteChunkCount = framesPerTrack;
await waitForResolved(() =>
expect(chunks.length).to.eq(expecteChunkCount)
);
await waitForResolved(() =>
expect(maxTime).to.eq(
chunks[chunks.length - 1].track.startTime +
chunks[chunks.length - 1].chunk.time
)
);
});
});
Expand Down
104 changes: 55 additions & 49 deletions packages/live-streaming/frontend/src/media/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,15 @@ export abstract class TrackSource {
}

async open(args: { sender: PublicSignKey } & Partial<Args>): Promise<void> {
console.log(
"LISTEN FROM",
args?.replicate,
shiftToU32(+new Date()),
"TO",
shiftToU32(+new Date()) + 24 * 60 * 60 * 1e3
);
console.log("REPLICATE FROM", shiftToU32(+new Date()));
/*
console.log(
"LISTEN FROM",
args?.replicate,
shiftToU32(+new Date()),
"TO",
shiftToU32(+new Date()) + 24 * 60 * 60 * 1e3
);
*/

await this.chunks.open({
type: Chunk,
Expand Down Expand Up @@ -653,7 +654,6 @@ export class MediaStreamDB extends Program<{}> {
async getLatest(
options?: SearchOptions<Track<AudioStreamDB | WebcodecsStreamDB>, any>
): Promise<Track<AudioStreamDB | WebcodecsStreamDB>[]> {
console.log("QUERY", options);
const tracks = await this.tracks.index.search(
new SearchRequest({
query: [
Expand Down Expand Up @@ -888,12 +888,12 @@ export class MediaStreamDB extends Program<{}> {
if (trackOptionIndex >= 0) {
currentTrackOptions.splice(trackOptionIndex, 1);
opts?.onTrackOptionsChange?.(currentTrackOptions);
console.log(
/* console.log(
"REMOVE TRACK OPTION",
track.startTime,
track.endTime,
ended
);
); */
}
}

Expand All @@ -919,7 +919,7 @@ export class MediaStreamDB extends Program<{}> {
}

if (startProgressBarMediaTime() === "live") {
console.log("PUSH LIVE", pendingFrames.length);
/* console.log("PUSH LIVE", pendingFrames.length); */
onProgressWrapped({
chunk: frame.chunk,
track: frame.track,
Expand Down Expand Up @@ -954,7 +954,7 @@ export class MediaStreamDB extends Program<{}> {
break;
}
} else {
console.log("DISCARD!", startAt);
/* console.log("DISCARD!", startAt); */
}
}
spliceSize++;
Expand All @@ -972,11 +972,11 @@ export class MediaStreamDB extends Program<{}> {
equals(x.id, track!.id)
);
if (!exists) {
console.log(
/* console.log(
"ADD TRACK OPTION",
track.startTime,
currentTrackOptions.length
);
); */
currentTrackOptions.push(track);

opts?.onTrackOptionsChange?.(currentTrackOptions);
Expand All @@ -1003,15 +1003,15 @@ export class MediaStreamDB extends Program<{}> {
remove?: Track;
isOption?: boolean;
}) => {
/* console.log(
"ADD TO QUEUE MAYBE ADD TRACK",
change.add && toBase64(change.add.id),
!!currentTracks.find((x) => equals(x.track.id, change.add!.id))
); */
/* console.log(
"ADD TO QUEUE MAYBE ADD TRACK",
change.add && toBase64(change.add.id),
!!currentTracks.find((x) => equals(x.track.id, change.add!.id))
); */

return openTrackQueue.add(async () => {
// remove existing track if we got a new track with same id that has a endtime set before the currentTime
/* console.log("MAYBE CHANGE TRACK", change.add?.startTime, change.add?.endTime, change.isOption); */
// console.log("MAYBE CHANGE TRACK", change.add?.startTime, change.add?.endTime, change.isOption);
try {
if (change.add && change.add.endTime != null) {
const mediaTimeForType = mediaTime();
Expand Down Expand Up @@ -1044,6 +1044,7 @@ export class MediaStreamDB extends Program<{}> {
add: change.add,
remove: change.remove,
});

if (filteredChange.add) {
await addTrack(filteredChange.add);
}
Expand Down Expand Up @@ -1076,13 +1077,13 @@ export class MediaStreamDB extends Program<{}> {
}
}

console.log(
/* console.log(
"ADD TRACK",
closed,
track.startTime,
latestPlayedFrame.get(track.source.mediaType),
currentTracks.length
);
); */

let close: () => void;
let open: () => void;
Expand Down Expand Up @@ -1140,7 +1141,7 @@ export class MediaStreamDB extends Program<{}> {
const createIterator = async () => {
const progressNumber = startProgressBarMediaTime();
if (typeof progressNumber == "number") {
console.log("CREATE TRACK ITERATOR", {
/* console.log("CREATE TRACK ITERATOR", {
progressNumber: progressNumber,
maxTime: this.maxTime,
trackStartTime: track.startTime,
Expand All @@ -1150,7 +1151,7 @@ export class MediaStreamDB extends Program<{}> {
0
),
prev: !!iterator,
});
}); */
return track.iterate(
Math.max(progressNumber - track.startTime, 0)
);
Expand All @@ -1160,7 +1161,7 @@ export class MediaStreamDB extends Program<{}> {

onMaxTimeChange = async (changeValue: number | undefined) => {
await iterator?.close();
console.log("MAXTIME CHANGE", changeValue, this.maxTime);
/* console.log("MAXTIME CHANGE", changeValue, this.maxTime); */
iterator = await createIterator();
};

Expand All @@ -1172,7 +1173,7 @@ export class MediaStreamDB extends Program<{}> {

/* THIS WRONG */
const loopCondition = () => {
console.log("LOOP CONDITION", {
/* console.log("LOOP CONDITION", {
latestPlayedFrame: latestPlayedFrame.get(
track.source.mediaType
),
Expand All @@ -1186,13 +1187,13 @@ export class MediaStreamDB extends Program<{}> {
(latestPlayedFrame.get(
track.source.mediaType
) || 0) -
(latestPendingFrame.get(
track.source.mediaType
) || 0) <
bufferTime &&
(latestPendingFrame.get(
track.source.mediaType
) || 0) <
bufferTime &&
!iterator?.done() &&
!closed,
});
}); */

return (
(latestPlayedFrame.get(track.source.mediaType) ||
Expand Down Expand Up @@ -1227,7 +1228,6 @@ export class MediaStreamDB extends Program<{}> {
if (!isLatest) {
continue;
}

pendingFrames.push({ chunk, track });
}
}
Expand Down Expand Up @@ -1257,7 +1257,7 @@ export class MediaStreamDB extends Program<{}> {
}
}
} catch (error) {
console.error("FAILED BUT WE CAUGHT IT!");
console.error("Failed to buffer", error);
throw error;
}

Expand Down Expand Up @@ -1302,13 +1302,13 @@ export class MediaStreamDB extends Program<{}> {
};

currentTracks.push(trackWithBuffer);
console.log(
/* console.log(
"ADDED TO CURRENT",
trackWithBuffer.track.startTime,
mediaTime(),
currentTracks.length,
"maxTime: " + this.maxTime
);
); */
opts?.onTracksChange?.(currentTracks.map((x) => x.track));
};

Expand All @@ -1330,14 +1330,17 @@ export class MediaStreamDB extends Program<{}> {
const currentTime = mediaTime();
if (currentTime === "live") {
await maybeChangeTrack({ add: track, isOption: true });
} else if (track.startTime <= currentTime) {
} else if (
track.startTime <= currentTime || // ready to play
(currentTracks.length === 0 && startPlayAt == null) // no tracks playing and not started playing yet
) {
if (track.endTime == null || track.endTime > currentTime) {
/* console.log(
"SCHEDULE TRACK",
track.startTime,
track.endTime,
currentTime
); */
/* console.log(
"SCHEDULE TRACK",
track.startTime,
track.endTime,
currentTime
); */
await maybeChangeTrack({ add: track, isOption: true });
} else {
tracksToRemove.push([
Expand Down Expand Up @@ -1387,7 +1390,7 @@ export class MediaStreamDB extends Program<{}> {

await this.getLatest().then(async (tracks) => {
// const openTracks = await Promise.all(tracks.map(async (x) => { const openTrack = await this.node.open(x); openTrack.source.chunks.log.waitForReplicator(this.owner); return openTrack }))
console.log("LATEST", tracks);
/* console.log("LATEST", tracks); */
return listener(
new CustomEvent("change", {
detail: { added: tracks, removed: [] },
Expand Down Expand Up @@ -1474,7 +1477,7 @@ export class MediaStreamDB extends Program<{}> {
fetchedOnce = true;

for (const track of current) {
console.log("ADD OPTION", track.startTime);
/* console.log("ADD OPTION", track.startTime); */
addTrackAsOption(track);
}

Expand Down Expand Up @@ -1519,7 +1522,7 @@ export class MediaStreamDB extends Program<{}> {
};

close = () => {
console.log("CLOSE TRACKS");
/* console.log("CLOSE TRACKS"); */
pause();
pendingFrames = [];
stopMaxTimeSync?.();
Expand All @@ -1539,10 +1542,13 @@ export class MediaStreamDB extends Program<{}> {
playbackTime = 0;
}

return (
const time =
playbackTime +
(startPlayAt != null ? nowMicroSeconds() - startPlayAt! : 0)
);
(startPlayAt != null
? nowMicroSeconds() - startPlayAt!
: 0);

return time;
};
}

Expand Down
Loading

0 comments on commit ca5a087

Please sign in to comment.