From a279d6517f5f8daa668d6db8ae583d69e22a8d0b Mon Sep 17 00:00:00 2001 From: Max Holland Date: Mon, 4 Dec 2023 15:41:19 +0000 Subject: [PATCH] Use errgroup to write segment to disk in background --- transcode/transcode.go | 114 ++++++++++++++--------------------------- video/media.go | 6 --- 2 files changed, 39 insertions(+), 81 deletions(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index 6e42c822e..8f7637b76 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "golang.org/x/sync/errgroup" "net/url" "os" "path/filepath" @@ -22,8 +23,7 @@ import ( ) const ( - UploadTimeout = 5 * time.Minute - SegmentChannelSize = 10 + UploadTimeout = 5 * time.Minute ) type TranscodeSegmentRequest struct { @@ -54,12 +54,6 @@ type TranscodeSegmentRequest struct { IsClip bool } -type TranscodedSegmentInfo struct { - RequestID string - RenditionName string - SegmentIndex int -} - func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) { log.AddContext(transcodeRequest.RequestID, "source_manifest", transcodeRequest.SourceManifestURL, "stream_name", streamName) log.Log(transcodeRequest.RequestID, "RunTranscodeProcess (v2) Beginning") @@ -155,16 +149,23 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } } - // Create a buffered channel where transcoded segments are sent to be written to disk - segmentChannel := make(chan TranscodedSegmentInfo, SegmentChannelSize) - - // Create a waitgroup to synchronize when the disk writing goroutine finishes - var wg sync.WaitGroup + var TransmuxStorageDir string + if transcodeRequest.GenerateMP4 { + var err error + // Create folder to hold transmux-ed files in local storage temporarily + TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_") + if err != nil && !os.IsExist(err) { + log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) + return outputs, segmentsCount, err + } + defer os.RemoveAll(TransmuxStorageDir) + } + segFileWriter := newSegmentFileWriter(TransmuxStorageDir) // Setup parallel transcode sessions var jobs *ParallelTranscoding jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { - err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel) + err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segFileWriter) segmentsCount++ if err != nil { return err @@ -177,39 +178,6 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return nil }) - var TransmuxStorageDir string - if transcodeRequest.GenerateMP4 { - var err error - // Create folder to hold transmux-ed files in local storage temporarily - TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_") - if err != nil && !os.IsExist(err) { - log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) - return outputs, segmentsCount, err - } - defer os.RemoveAll(TransmuxStorageDir) - - // Start the disk-writing (consumer) goroutine - wg.Add(1) - go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) { - var segmentBatch []TranscodedSegmentInfo - defer wg.Done() - - // Keep checking for new segments in the buffered channel - for segInfo := range segmentChannel { - segmentBatch = append(segmentBatch, segInfo) - // Begin writing to disk if at-least 50% of buffered channel is full - if len(segmentBatch) >= SegmentChannelSize/2 { - writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) - segmentBatch = nil - } - } - // Handle any remaining segments after the channel is closed - if len(segmentBatch) > 0 { - writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) - } - }(TransmuxStorageDir, &renditionList) - } - // Start the transcoding (producer) goroutines jobs.Start() if err = jobs.Wait(); err != nil { @@ -217,11 +185,8 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return outputs, segmentsCount, err } - // If the disk-writing gorouine was started, then close the segment channel to - // signal that no more segments will be sent. This will be a no-op if MP4s are not requested. - close(segmentChannel) // Wait for disk-writing goroutine to finish. This will be a no-op if MP4s are not requested. - wg.Wait() + segFileWriter.wait() // Build the manifests and push them to storage manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip) @@ -400,27 +365,35 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return outputs, segmentsCount, nil } -func writeSegmentsToDisk(transmuxTopLevelDir string, renditionList *video.TRenditionList, segmentBatch []TranscodedSegmentInfo) (int64, error) { - for _, segInfo := range segmentBatch { +type segmentFileWriter struct { + group errgroup.Group + transmuxTopLevelDir string +} + +func newSegmentFileWriter(transmuxTopLevelDir string) *segmentFileWriter { + s := &segmentFileWriter{transmuxTopLevelDir: transmuxTopLevelDir} + s.group.SetLimit(5) + return s +} - // All accesses to renditionList and segmentList is protected by a mutex behind the scenes - segmentList := renditionList.GetSegmentList(segInfo.RenditionName) - segmentData := segmentList.GetSegment(segInfo.SegmentIndex) - segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts") +func (s *segmentFileWriter) wait() error { + return s.group.Wait() +} + +func (s *segmentFileWriter) writeSegment(requestID string, renditionName string, segIndex int, segmentData []byte) { + s.group.Go(func() error { + segmentFilename := filepath.Join(s.transmuxTopLevelDir, requestID+"_"+renditionName+"_"+strconv.Itoa(segIndex)+".ts") segmentFile, err := os.Create(segmentFilename) if err != nil { - return 0, fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err) + return fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err) } defer segmentFile.Close() _, err = segmentFile.Write(segmentData) if err != nil { - return 0, fmt.Errorf("error writing segment err: %w", err) + return fmt.Errorf("error writing segment err: %w", err) } - // "Delete" buffered segment data from memory in hopes the garbage-collector releases it - segmentList.RemoveSegmentData(segInfo.SegmentIndex) - - } - return 0, nil + return nil + }) } func uploadMp4Files(basePath *url.URL, mp4OutputFiles []string, prefix string) ([]video.OutputVideoFile, error) { @@ -496,7 +469,7 @@ func transcodeSegment( transcodedStats []*video.RenditionStats, renditionList *video.TRenditionList, broadcaster clients.BroadcasterClient, - segmentChannel chan<- TranscodedSegmentInfo, + segFilewriter *segmentFileWriter, ) error { start := time.Now() @@ -580,16 +553,7 @@ func transcodeSegment( // exists in the renditionList which contains only profiles for which mp4s will // be generated i.e. all profiles for mp4 inputs and only highest quality // rendition for hls inputs like recordings. - segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData) - - // send this transcoded segment to the segment channel so that it can be written - // to disk in parallel - segmentChannel <- TranscodedSegmentInfo{ - RequestID: transcodeRequest.RequestID, - RenditionName: transcodedSegment.Name, // Use actual rendition name - SegmentIndex: segment.Index, // Use actual segment index - } - + segFilewriter.writeSegment(transcodeRequest.RequestID, transcodedSegment.Name, segment.Index, transcodedSegment.MediaData) } } diff --git a/video/media.go b/video/media.go index 1eba7173c..71d1c7c4d 100644 --- a/video/media.go +++ b/video/media.go @@ -39,12 +39,6 @@ func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) { s.mu.Unlock() } -func (s *TSegmentList) RemoveSegmentData(segIdx int) { - s.mu.Lock() - s.SegmentDataTable[segIdx] = []byte{} - s.mu.Unlock() -} - func (s *TSegmentList) GetSegment(segIdx int) []byte { s.mu.Lock() defer s.mu.Unlock()