Skip to content

Commit

Permalink
KUX-1527: Investigate race condition in broadpeak plugin causing 7011…
Browse files Browse the repository at this point in the history
… errors
  • Loading branch information
dominik11165 committed Nov 30, 2023
1 parent 9532b54 commit 183b2cd
Showing 1 changed file with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.kaltura.playkit.PlayerEvent;
import com.kaltura.tvplayer.PKMediaEntryInterceptor;

import java.util.HashMap;
import java.util.Map;

import tv.broadpeak.smartlib.SmartLib;
Expand All @@ -30,7 +31,7 @@ public class BroadpeakPlugin extends PKPlugin implements PKMediaEntryInterceptor
private static final PKLog log = PKLog.get("BroadpeakPlugin");

private MessageBus messageBus;
private StreamingSession session;
private final Map<String, StreamingSession> sessionsMap = new HashMap<>();
private Player player;
private BroadpeakConfig config;
private Context context;
Expand Down Expand Up @@ -86,14 +87,14 @@ protected void onLoad(final Player player, Object config, final MessageBus messa
if (PKError.Severity.Fatal.equals(event.error.severity)) {
log.e("PlayerEvent Fatal Error " + event.error.message);
// Stop the session in case of Playback Error
stopStreamingSession();
stopCurrentStreamingSession();
}
});

this.messageBus.addListener(this, PlayerEvent.stopped, event -> {
log.d("PlayerEvent stopped: calling stopStreamingSession");
// Stop the session in case of Playback stop
stopStreamingSession();
stopCurrentStreamingSession();
});
}

Expand Down Expand Up @@ -148,7 +149,7 @@ protected void onUpdateConfig(Object config) {

private void restartSmartLib(BroadpeakConfig bpConfig) {
log.d("Releasing SmartLib and initializing with updated configs");
stopStreamingSession();
stopCurrentStreamingSession();
SmartLib.getInstance().release();
if (bpConfig != null) {
SmartLib.getInstance().init(context,
Expand All @@ -173,18 +174,29 @@ protected void onApplicationResumed() {
@Override
protected void onDestroy() {
// Stop the session
stopStreamingSession();
stopCurrentStreamingSession();
// Release SmartLib
SmartLib.getInstance().release();
if (messageBus != null) {
messageBus.removeListeners(this);
}
}

private void stopStreamingSession() {
log.d("stopStreamingSession");
if (session != null) {
session.stopStreamingSession();
private void stopCurrentStreamingSession() {
log.d("stopCurrentStreamingSession");
if (player != null && player.getMediaSource() != null) {
stopStreamingSession(player.getMediaSource().getUrl());
}
}

private void stopStreamingSession(String sessionKey) {
log.d("stopStreamingSession called with sessionKey=[" + sessionKey + "]");
if (sessionKey != null && sessionsMap.containsKey(sessionKey)) {
StreamingSession session = sessionsMap.get(sessionKey);
if (session != null) {
session.stopStreamingSession();
}
sessionsMap.remove(sessionKey);
}
}

Expand All @@ -200,9 +212,7 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
log.d("Apply Entry " + mediaEntry.getName() + " - " + mediaEntry.getId() + " url: " + mediaEntry.getSources().get(0).getUrl());

// Stop the session for fresh media entry
if (session != null) {
stopStreamingSession();
}
stopCurrentStreamingSession();

PKMediaSource source = mediaEntry.getSources().get(0);
if (PKMediaFormat.udp.equals(source.getMediaFormat())) {
Expand All @@ -213,13 +223,14 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
return;
}
// Start the session and get the final stream URL
session = SmartLib.getInstance().createStreamingSession();
StreamingSession session = SmartLib.getInstance().createStreamingSession();
if (session == null) {
sendBroadpeakErrorEvent(errorCode, errorMessage);
return;
}
sessionsMap.put(source.getUrl(), session);

addSessionConfig();
addSessionConfig(session);
session.attachPlayer(player, messageBus);

StreamingSessionResult result = session.getURL(source.getUrl());
Expand All @@ -235,12 +246,12 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
errorCode = result.getErrorCode();
errorMessage = result.getErrorMessage();
}
stopStreamingSession();
stopStreamingSession(source.getUrl());
// send event to MessageBus
sendBroadpeakErrorEvent(errorCode, errorMessage);
}
} else {
stopStreamingSession();
stopCurrentStreamingSession();
errorMessage = BroadpeakError.InvalidMediaEntry.errorMessage;
errorCode = BroadpeakError.InvalidMediaEntry.errorCode;
sendBroadpeakErrorEvent(errorCode, errorMessage);
Expand All @@ -249,7 +260,7 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
listener.onComplete();
}

private void addSessionConfig() {
private void addSessionConfig(StreamingSession session) {
if (!TextUtils.isEmpty(config.getAdCustomReference())) {
session.setAdCustomReference(config.getAdCustomReference());
}
Expand Down

0 comments on commit 183b2cd

Please sign in to comment.