Skip to content

Commit

Permalink
Feature/mediaprovider load finish fix (kaltura#191)
Browse files Browse the repository at this point in the history
* checking 302 fix

* OttSessionProvider - adding session load on logout process in case session expired

* adding logs

* fixing unfinished threads

* adding logs to backend loaders

* OttSessionProvider - prevent null on session storing

* adding logs

* Ott  - fixing unfinishing load thread

* OttMediaProvider - updating logs

* Update README.md
  • Loading branch information
tehilar authored and noamtamim committed Mar 28, 2017
1 parent f931467 commit 792165d
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public class MockParams {
public static final String MediaId2_File_Main_HD = "690395";//vild
public static final String MediaId2_File_Web_HD = "690403";//vild

public static final String Toystory_File_SD_Dash = "737631";//vild
public static final String Toystory_File_Main_HD_Dash = "737630";//vild
public static final String Toystory_File_Main_HD = "737629";//vild

public static final String PnxNotEntitledMedia = "482731";
public static final String PnxNoFilesFoundMedia = "482550";//no mediaFiles on asset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import static com.kaltura.playkit.backend.MockParams.MediaId2;
import static com.kaltura.playkit.backend.MockParams.MediaId2_File_Main_HD;
import static com.kaltura.playkit.backend.MockParams.MediaId2_File_Main_SD;
import static com.kaltura.playkit.backend.MockParams.MediaId2_File_SD_Dash;
import static com.kaltura.playkit.backend.MockParams.MediaId2_File_Web_HD;
import static com.kaltura.playkit.backend.MockParams.MediaId5;
import static com.kaltura.playkit.backend.MockParams.PnxBaseUrl;
import static com.kaltura.playkit.backend.MockParams.PnxKS;
Expand All @@ -59,6 +57,9 @@
import static com.kaltura.playkit.backend.MockParams.PnxPassword;
import static com.kaltura.playkit.backend.MockParams.PnxUsername;
import static com.kaltura.playkit.backend.MockParams.ToystoryMediaId;
import static com.kaltura.playkit.backend.MockParams.Toystory_File_Main_HD;
import static com.kaltura.playkit.backend.MockParams.Toystory_File_Main_HD_Dash;
import static com.kaltura.playkit.backend.MockParams.Toystory_File_SD_Dash;
import static com.kaltura.playkit.backend.MockParams.WebHD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -312,28 +313,28 @@ public void onComplete(PrimitiveResult response) {
latchCount--;
phoenixMediaProvider = new PhoenixMediaProvider()
.setSessionProvider(EmptySessionProvider)
.setAssetType(APIDefines.KalturaAssetType.Media).setAssetId(MediaId2)
.setFileIds(MediaId2_File_Main_SD, MediaId2_File_SD_Dash, MediaId2_File_Web_HD);
.setAssetType(APIDefines.KalturaAssetType.Media).setAssetId(ToystoryMediaId)
.setFileIds(Toystory_File_Main_HD_Dash, Toystory_File_SD_Dash, Toystory_File_Main_HD);
latchCount++;
phoenixMediaProvider.load(new OnMediaLoadCompletion() {
@Override
public void onComplete(ResultElement<PKMediaEntry> response) {
if (response.isSuccess()) {
assertTrue(response.getResponse() != null);
assertTrue(response.getResponse().getId().equals(MediaId2));
assertTrue(response.getResponse().getId().equals(ToystoryMediaId));
List<PKMediaSource> sources = response.getResponse().getSources();
assertNotNull(sources);
assertTrue(sources.size() == 3);
assertTrue(sources.size() == 1);
for (PKMediaSource source : sources) {
if (source.getId().equals(MediaId2_File_Main_SD)) {
assertTrue(source.getMediaFormat().equals(PKMediaFormat.wvm));
}
if (source.getId().equals(MediaId2_File_Web_HD)) {
assertTrue(source.getMediaFormat().equals(PKMediaFormat.hls));
if (source.getId().equals(Toystory_File_Main_HD_Dash)) {
assertTrue(source.getMediaFormat().equals(PKMediaFormat.dash));
}
if (source.getId().equals(MediaId2_File_SD_Dash)) {
/*if (source.getId().equals(Toystory_File_SD_Dash)) {
assertTrue(source.getMediaFormat().equals(PKMediaFormat.dash));
}
if (source.getId().equals(Toystory_File_Main_HD)) {
assertTrue(source.getMediaFormat().equals(PKMediaFormat.wvm));
}*/
}
assertTrue(response.getResponse().getMediaType().equals(PKMediaEntry.MediaEntryType.Unknown));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public abstract class BECallableLoader extends CallableLoader {
protected RequestQueue requestQueue;
protected SessionProvider sessionProvider;

private boolean waitForCompletion = false;


protected BECallableLoader(String tag, RequestQueue requestsExecutor, SessionProvider sessionProvider, OnCompletion completion){
super(tag, completion);
Expand All @@ -45,28 +47,33 @@ protected void cancel() {
}

isCanceled = true;
PKLog.i(TAG, loadId+": i am canceled ");
PKLog.i(TAG, loadId+": i am canceled ...notifyCompletion");

notifyCompletion();
}

@Override
protected void load() throws InterruptedException {

PKLog.i(TAG, loadId + ": load: start on get ks ");
PKLog.v(TAG, loadId + ": load: start on get ks ");
waitForCompletion = true;

sessionProvider.getSessionToken(new OnCompletion<PrimitiveResult>() {
@Override
public void onComplete(PrimitiveResult response) {
if(isCanceled()){
notifyCompletion();
waitForCompletion = false;
return;
}

ErrorElement error = response.error != null ? response.error : validateKs(response.getResult());
if (error == null) {
try {
requestRemote(response.getResult());

PKLog.d(TAG, loadId + " remote load request finished...notifyCompletion");
notifyCompletion();
waitForCompletion = false;
} catch (InterruptedException e) {
interrupted();
}
Expand All @@ -76,14 +83,19 @@ public void onComplete(PrimitiveResult response) {
if (completion != null) {
completion.onComplete(Accessories.<PKMediaEntry>buildResult(null, error));
}

PKLog.d(TAG, loadId + "remote load error finished...notifyCompletion");
notifyCompletion();
waitForCompletion = false;
}
}
});

waitCompletion();

PKLog.i(TAG, loadId+": wait for completion released");
if(waitForCompletion) { // prevent lock thread on already completed load //TODO: replace latch locks
PKLog.v(TAG, loadId+": load: setting outer completion wait lock");
waitCompletion();
}
PKLog.d(TAG, loadId+": load: wait for completion released");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,23 @@ protected CallableLoader(String tag, OnCompletion completion) {
protected void notifyCompletion() {
if (waitCompletion != null) {
synchronized (syncObject) {
PKLog.i(TAG, loadId + ": notifyCompletion: countDown = " + waitCompletion.getCount());
PKLog.v(TAG, loadId + ": notifyCompletion: countDown = " + waitCompletion.getCount());
waitCompletion.countDown();
}
}
}

protected void waitCompletion(int countDownLatch) throws InterruptedException {
if(waitCompletion != null && waitCompletion.getCount() == countDownLatch){
return;
}

synchronized (syncObject) {
PKLog.i(TAG, loadId + ": waitCompletion: set new counDown" + (waitCompletion != null ? "already has counter " + waitCompletion.getCount() : ""));
PKLog.i(TAG, loadId + ": waitCompletion: set new counDown " + (waitCompletion != null ? "already has counter " + waitCompletion.getCount() : ""));
waitCompletion = new CountDownLatch(countDownLatch);
}
waitCompletion.await();

waitCompletion = null;
}

protected void waitCompletion() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class OttSessionProvider extends BaseSessionProvider {
public Boolean call() throws Exception {
if(refreshToken == null){
Log.d(TAG, "refreshToken is not available, can't activate refresh");
onSessionRefreshTaskResults(new PrimitiveResult(ErrorElement.SessionError.addMessage(" FAILED TO RECOVER SESSION!!")));
return false;
}

Expand All @@ -92,6 +93,17 @@ public Boolean call() throws Exception {
return true;
}
};

private void onSessionRefreshTaskResults(PrimitiveResult result) {
if(sessionRecoveryCallback != null){
sessionRecoveryCallback.onComplete(result);
sessionRecoveryCallback = null;
}

if(sessionRefreshListener != null ){
sessionRefreshListener.onComplete(result.error == null? result.getResult() : null);
}
}
//endregion


Expand Down Expand Up @@ -231,9 +243,12 @@ private void handleStartSession(ResponseElement response, OnCompletion<Primitive
ErrorElement error = null;

if (response != null && response.isSuccess()) {
PKLog.d(TAG, "handleStartSession: response success, checking inner responses");
List<BaseResult> responses = PhoenixParser.parse(response.getResponse()); // parses KalturaLoginResponse, KalturaSession

if (responses.get(0).error != null) { //!- failed to login
PKLog.d(TAG, "handleStartSession: first response failure: "+responses.get(0).error);

//?? clear session?
error = ErrorElement.SessionError;

Expand All @@ -243,13 +258,17 @@ private void handleStartSession(ResponseElement response, OnCompletion<Primitive
// session data is taken from second response since its common for both user/anonymous login
// and we need this response for the expiry.
if (responses.get(1).error == null) { // get session data success
PKLog.d(TAG, "handleStartSession: second response success");

KalturaSession session = (KalturaSession) responses.get(1);
setSession(session.getKs(), session.getExpiry(), session.getUserId()); // save new session

if (completion != null) {
completion.onComplete(new PrimitiveResult(session.getKs()));
}
} else {
PKLog.d(TAG, "handleStartSession: second response failure: "+responses.get(1).error);

error = ErrorElement.SessionError;
}

Expand Down Expand Up @@ -305,26 +324,39 @@ public void endSession(final OnCompletion<BaseResult> completion) {
return;
}

APIOkRequestsExecutor.getSingleton().queue(OttUserService.logout(apiBaseUrl, getSessionToken(), sessionUdid)
.completion(new OnRequestCompletion() {
@Override
public void onComplete(ResponseElement response) {
ErrorElement error = null;
if (response != null && response.isSuccess()) {
PKLog.d(TAG, "endSession: logout user session success. clearing session data.");
} else {
error = response.getError() != null ? response.getError() : ErrorElement.GeneralError.message("failed to end session");
PKLog.e(TAG, "endSession: session logout failed. clearing session data. " + error.getMessage());
}
OttSessionProvider.super.endSession();
sessionUdid = null;
if (completion != null) {
completion.onComplete(new BaseResult(error));
}
}
}).build());
// make sure the ks is valid for the request (refreshes it if needed)
getSessionToken(new OnCompletion<PrimitiveResult>() {
@Override
public void onComplete(PrimitiveResult response) {
if(response.error == null) { // in case the session checked for expiry and ready to use:

APIOkRequestsExecutor.getSingleton().queue(OttUserService.logout(apiBaseUrl, response.getResult(), sessionUdid)
.completion(new OnRequestCompletion() {
@Override
public void onComplete(ResponseElement response) {
ErrorElement error = null;
if (response != null && response.isSuccess()) {
PKLog.d(TAG, "endSession: logout user session success. clearing session data.");
} else {
error = response.getError() != null ? response.getError() : ErrorElement.GeneralError.message("failed to end session");
PKLog.e(TAG, "endSession: session logout failed. clearing session data. " + error.getMessage());
}
OttSessionProvider.super.endSession();
sessionUdid = null;
if (completion != null) {
completion.onComplete(new BaseResult(error));
}
}
}).build());

} else { // in case ks retrieval failed:
completion.onComplete(response);
}
}
});

} else {
Log.w(TAG, "endSession: but no active session available");
sessionUdid = null;
}
}
Expand Down Expand Up @@ -399,14 +431,7 @@ public void onComplete(ResponseElement response) {
refreshResult :
new PrimitiveResult(ErrorElement.SessionError.addMessage(" FAILED TO RECOVER SESSION!!"));

if(sessionRecoveryCallback != null){
sessionRecoveryCallback.onComplete(refreshedKsResult);
sessionRecoveryCallback = null;
}

if(sessionRefreshListener != null && refreshResult != null){
sessionRefreshListener.onComplete(refreshResult.getResult());
}
onSessionRefreshTaskResults(refreshedKsResult);
}
});
APIOkRequestsExecutor.getSingleton().queue(multiRequest.build());
Expand Down Expand Up @@ -460,7 +485,13 @@ private void updateRefreshDelta(long expiry) {
* @return
*/
public String encryptSession(){
StringBuilder data = new StringBuilder(getSessionToken()).append(" ~~ ")

String sessionToken = getSessionToken();
if(sessionToken == null){
return null;
}

StringBuilder data = new StringBuilder(sessionToken).append(" ~~ ")
.append(refreshToken).append(" ~~ ").append(sessionUdid);

return Base64.encodeToString(data.toString().getBytes(), Base64.NO_WRAP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void onComplete(ResponseElement response) {
loadReq = null;

try {
onAssetGetResponse(response/*, requestBuilder instanceof MultiRequestBuilder*/);
onAssetGetResponse(response);

} catch (InterruptedException e) {
interrupted();
Expand All @@ -293,7 +293,9 @@ public void onComplete(ResponseElement response) {
loadReq = requestQueue.queue(requestBuilder.build());
PKLog.d(TAG, loadId + ": request queued for execution [" + loadReq + "]");
}

waitCompletion();
PKLog.v(TAG, loadId + ": requestRemote wait released");
}

private String getApiBaseUrl() {
Expand Down Expand Up @@ -378,6 +380,7 @@ case of error object in the response, will be parsed to BaseResult object (error
completion.onComplete(Accessories.buildResult(mediaEntry, error));
}

PKLog.w(TAG, loadId + " media load finished, callback passed...notifyCompletion");
notifyCompletion();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public void onFailure(Call call, IOException e) { //!! in case of request error
}
// handle failures: create response from exception
action.onComplete(new ExecutedRequest().error(e).success(false));
Log.v(TAG, "enqueued request finished with failure, results passed to callback");
}

@Override
Expand All @@ -177,6 +178,7 @@ public void onResponse(Call call, Response response) throws IOException {
// pass parsed response to action completion block
ResponseElement responseElement = onGotResponse(response, action);
action.onComplete(responseElement);
Log.v(TAG, "enqueued request finished with success, results passed to callback");
}
});
return (String) call.request().tag();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private DataSource.Factory buildDataSourceFactory(boolean useBandwidthMeter) {
*/
private HttpDataSource.Factory buildHttpDataSourceFactory(boolean useBandwidthMeter) {
return new DefaultHttpDataSourceFactory(getUserAgent(context), useBandwidthMeter ? BANDWIDTH_METER : null, DefaultHttpDataSource.DEFAULT_CONNECT_TIMEOUT_MILLIS,
DefaultHttpDataSource.DEFAULT_READ_TIMEOUT_MILLIS, true);
DefaultHttpDataSource.DEFAULT_READ_TIMEOUT_MILLIS, false);
}

private static String getUserAgent(Context context) {
Expand Down

0 comments on commit 792165d

Please sign in to comment.