From 183b2cd02f05e57b2f218ebd1d31b1a5532b4c2e Mon Sep 17 00:00:00 2001 From: dominik Date: Thu, 30 Nov 2023 10:50:27 +0200 Subject: [PATCH] KUX-1527: Investigate race condition in broadpeak plugin causing 7011 errors --- .../plugins/broadpeak/BroadpeakPlugin.java | 45 ++++++++++++------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/broadpeakplugin/src/main/java/com/kaltura/playkit/plugins/broadpeak/BroadpeakPlugin.java b/broadpeakplugin/src/main/java/com/kaltura/playkit/plugins/broadpeak/BroadpeakPlugin.java index 13117af..6f81559 100644 --- a/broadpeakplugin/src/main/java/com/kaltura/playkit/plugins/broadpeak/BroadpeakPlugin.java +++ b/broadpeakplugin/src/main/java/com/kaltura/playkit/plugins/broadpeak/BroadpeakPlugin.java @@ -17,6 +17,7 @@ import com.kaltura.playkit.PlayerEvent; import com.kaltura.tvplayer.PKMediaEntryInterceptor; +import java.util.HashMap; import java.util.Map; import tv.broadpeak.smartlib.SmartLib; @@ -30,7 +31,7 @@ public class BroadpeakPlugin extends PKPlugin implements PKMediaEntryInterceptor private static final PKLog log = PKLog.get("BroadpeakPlugin"); private MessageBus messageBus; - private StreamingSession session; + private final Map sessionsMap = new HashMap<>(); private Player player; private BroadpeakConfig config; private Context context; @@ -86,14 +87,14 @@ protected void onLoad(final Player player, Object config, final MessageBus messa if (PKError.Severity.Fatal.equals(event.error.severity)) { log.e("PlayerEvent Fatal Error " + event.error.message); // Stop the session in case of Playback Error - stopStreamingSession(); + stopCurrentStreamingSession(); } }); this.messageBus.addListener(this, PlayerEvent.stopped, event -> { log.d("PlayerEvent stopped: calling stopStreamingSession"); // Stop the session in case of Playback stop - stopStreamingSession(); + stopCurrentStreamingSession(); }); } @@ -148,7 +149,7 @@ protected void onUpdateConfig(Object config) { private void restartSmartLib(BroadpeakConfig bpConfig) { log.d("Releasing SmartLib and initializing with updated configs"); - stopStreamingSession(); + stopCurrentStreamingSession(); SmartLib.getInstance().release(); if (bpConfig != null) { SmartLib.getInstance().init(context, @@ -173,7 +174,7 @@ protected void onApplicationResumed() { @Override protected void onDestroy() { // Stop the session - stopStreamingSession(); + stopCurrentStreamingSession(); // Release SmartLib SmartLib.getInstance().release(); if (messageBus != null) { @@ -181,10 +182,21 @@ protected void onDestroy() { } } - private void stopStreamingSession() { - log.d("stopStreamingSession"); - if (session != null) { - session.stopStreamingSession(); + private void stopCurrentStreamingSession() { + log.d("stopCurrentStreamingSession"); + if (player != null && player.getMediaSource() != null) { + stopStreamingSession(player.getMediaSource().getUrl()); + } + } + + private void stopStreamingSession(String sessionKey) { + log.d("stopStreamingSession called with sessionKey=[" + sessionKey + "]"); + if (sessionKey != null && sessionsMap.containsKey(sessionKey)) { + StreamingSession session = sessionsMap.get(sessionKey); + if (session != null) { + session.stopStreamingSession(); + } + sessionsMap.remove(sessionKey); } } @@ -200,9 +212,7 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list log.d("Apply Entry " + mediaEntry.getName() + " - " + mediaEntry.getId() + " url: " + mediaEntry.getSources().get(0).getUrl()); // Stop the session for fresh media entry - if (session != null) { - stopStreamingSession(); - } + stopCurrentStreamingSession(); PKMediaSource source = mediaEntry.getSources().get(0); if (PKMediaFormat.udp.equals(source.getMediaFormat())) { @@ -213,13 +223,14 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list return; } // Start the session and get the final stream URL - session = SmartLib.getInstance().createStreamingSession(); + StreamingSession session = SmartLib.getInstance().createStreamingSession(); if (session == null) { sendBroadpeakErrorEvent(errorCode, errorMessage); return; } + sessionsMap.put(source.getUrl(), session); - addSessionConfig(); + addSessionConfig(session); session.attachPlayer(player, messageBus); StreamingSessionResult result = session.getURL(source.getUrl()); @@ -235,12 +246,12 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list errorCode = result.getErrorCode(); errorMessage = result.getErrorMessage(); } - stopStreamingSession(); + stopStreamingSession(source.getUrl()); // send event to MessageBus sendBroadpeakErrorEvent(errorCode, errorMessage); } } else { - stopStreamingSession(); + stopCurrentStreamingSession(); errorMessage = BroadpeakError.InvalidMediaEntry.errorMessage; errorCode = BroadpeakError.InvalidMediaEntry.errorCode; sendBroadpeakErrorEvent(errorCode, errorMessage); @@ -249,7 +260,7 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list listener.onComplete(); } - private void addSessionConfig() { + private void addSessionConfig(StreamingSession session) { if (!TextUtils.isEmpty(config.getAdCustomReference())) { session.setAdCustomReference(config.getAdCustomReference()); }