Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KUX-1527: Investigate race condition in broadpeak plugin causing 7011… #37

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.kaltura.playkit.PlayerEvent;
import com.kaltura.tvplayer.PKMediaEntryInterceptor;

import java.util.HashMap;
import java.util.Map;

import tv.broadpeak.smartlib.SmartLib;
Expand All @@ -30,7 +31,7 @@ public class BroadpeakPlugin extends PKPlugin implements PKMediaEntryInterceptor
private static final PKLog log = PKLog.get("BroadpeakPlugin");

private MessageBus messageBus;
private StreamingSession session;
private final Map<String, StreamingSession> sessionsMap = new HashMap<>();
private Player player;
private BroadpeakConfig config;
private Context context;
Expand Down Expand Up @@ -86,14 +87,14 @@ protected void onLoad(final Player player, Object config, final MessageBus messa
if (PKError.Severity.Fatal.equals(event.error.severity)) {
log.e("PlayerEvent Fatal Error " + event.error.message);
// Stop the session in case of Playback Error
stopStreamingSession();
stopCurrentStreamingSession();
}
});

this.messageBus.addListener(this, PlayerEvent.stopped, event -> {
log.d("PlayerEvent stopped: calling stopStreamingSession");
// Stop the session in case of Playback stop
stopStreamingSession();
stopCurrentStreamingSession();
});
}

Expand Down Expand Up @@ -148,7 +149,7 @@ protected void onUpdateConfig(Object config) {

private void restartSmartLib(BroadpeakConfig bpConfig) {
log.d("Releasing SmartLib and initializing with updated configs");
stopStreamingSession();
stopCurrentStreamingSession();
SmartLib.getInstance().release();
if (bpConfig != null) {
SmartLib.getInstance().init(context,
Expand All @@ -173,18 +174,29 @@ protected void onApplicationResumed() {
@Override
protected void onDestroy() {
// Stop the session
stopStreamingSession();
stopCurrentStreamingSession();
// Release SmartLib
SmartLib.getInstance().release();
if (messageBus != null) {
messageBus.removeListeners(this);
}
}

private void stopStreamingSession() {
log.d("stopStreamingSession");
if (session != null) {
session.stopStreamingSession();
private void stopCurrentStreamingSession() {
log.d("stopCurrentStreamingSession");
if (player != null && player.getMediaSource() != null) {
stopStreamingSession(player.getMediaSource().getUrl());
}
}

private void stopStreamingSession(String sessionKey) {
log.d("stopStreamingSession called with sessionKey=[" + sessionKey + "]");
if (sessionKey != null && sessionsMap.containsKey(sessionKey)) {
StreamingSession session = sessionsMap.get(sessionKey);
if (session != null) {
session.stopStreamingSession();
}
sessionsMap.remove(sessionKey);
}
}

Expand All @@ -200,9 +212,7 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
log.d("Apply Entry " + mediaEntry.getName() + " - " + mediaEntry.getId() + " url: " + mediaEntry.getSources().get(0).getUrl());

// Stop the session for fresh media entry
if (session != null) {
stopStreamingSession();
}
stopCurrentStreamingSession();

PKMediaSource source = mediaEntry.getSources().get(0);
if (PKMediaFormat.udp.equals(source.getMediaFormat())) {
Expand All @@ -213,13 +223,14 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
return;
}
// Start the session and get the final stream URL
session = SmartLib.getInstance().createStreamingSession();
StreamingSession session = SmartLib.getInstance().createStreamingSession();
if (session == null) {
sendBroadpeakErrorEvent(errorCode, errorMessage);
return;
}
sessionsMap.put(source.getUrl(), session);

addSessionConfig();
addSessionConfig(session);
session.attachPlayer(player, messageBus);

StreamingSessionResult result = session.getURL(source.getUrl());
Expand All @@ -235,12 +246,12 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
errorCode = result.getErrorCode();
errorMessage = result.getErrorMessage();
}
stopStreamingSession();
stopStreamingSession(source.getUrl());
// send event to MessageBus
sendBroadpeakErrorEvent(errorCode, errorMessage);
}
} else {
stopStreamingSession();
stopCurrentStreamingSession();
errorMessage = BroadpeakError.InvalidMediaEntry.errorMessage;
errorCode = BroadpeakError.InvalidMediaEntry.errorCode;
sendBroadpeakErrorEvent(errorCode, errorMessage);
Expand All @@ -249,7 +260,7 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
listener.onComplete();
}

private void addSessionConfig() {
private void addSessionConfig(StreamingSession session) {
if (!TextUtils.isEmpty(config.getAdCustomReference())) {
session.setAdCustomReference(config.getAdCustomReference());
}
Expand Down