forked from nbauernfeind/deephaven-core
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(csharp/ExcelAddIn): Make connections in the background (deephave…
…n#6011) The "Test Connection" and "Set Connection" button could delay your progress because they were serialized on a single thread. This can get annoying if you type in the wrong credentials, as computer might take several seconds to figure that out. Even if you correct your credentials in the meantime, you would have to wait for the original connection request to resolve. This PR changes the code so that it does that work on a background thread. Doing so also exposed a bug in my organization of Observables, so they have been reorganized a little bit. In particular I was trying to be clever by having one object observe two different things and the logic wasn't quite right.
- Loading branch information
Showing
11 changed files
with
296 additions
and
205 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
using Deephaven.DeephavenClient; | ||
using Deephaven.ExcelAddIn.Models; | ||
using Deephaven.ExcelAddIn.Util; | ||
using Deephaven.DeephavenClient.ExcelAddIn.Util; | ||
using Deephaven.DheClient.Session; | ||
|
||
namespace Deephaven.ExcelAddIn.Providers; | ||
|
||
internal class ClientProvider( | ||
WorkerThread workerThread, | ||
TableTriple descriptor) : IObserver<StatusOr<SessionBase>>, IObservable<StatusOr<Client>>, IDisposable { | ||
|
||
private readonly ObserverContainer<StatusOr<Client>> _observers = new(); | ||
private StatusOr<Client> _client = StatusOr<Client>.OfStatus("[No Client]"); | ||
private DndClient? _ownedDndClient = null; | ||
|
||
public IDisposable Subscribe(IObserver<StatusOr<Client>> observer) { | ||
// We need to run this on our worker thread because we want to protect | ||
// access to our dictionary. | ||
workerThread.Invoke(() => { | ||
_observers.Add(observer, out _); | ||
observer.OnNext(_client); | ||
}); | ||
|
||
return ActionAsDisposable.Create(() => { | ||
workerThread.Invoke(() => { | ||
_observers.Remove(observer, out _); | ||
}); | ||
}); | ||
} | ||
|
||
public void Dispose() { | ||
DisposeClientState(); | ||
} | ||
|
||
public void OnNext(StatusOr<SessionBase> session) { | ||
// Get onto the worker thread if we're not already on it. | ||
if (workerThread.InvokeIfRequired(() => OnNext(session))) { | ||
return; | ||
} | ||
|
||
try { | ||
// Dispose whatever state we had before. | ||
DisposeClientState(); | ||
|
||
// If the new state is just a status message, make that our status and transmit to our observers | ||
if (!session.GetValueOrStatus(out var sb, out var status)) { | ||
_observers.SetAndSendStatus(ref _client, status); | ||
return; | ||
} | ||
|
||
var pqId = descriptor.PersistentQueryId; | ||
|
||
// New state is a Core or CorePlus Session. | ||
_ = sb.Visit(coreSession => { | ||
if (pqId != null) { | ||
_observers.SetAndSendStatus(ref _client, "[PQ Id Not Valid for Community Core]"); | ||
return Unit.Instance; | ||
} | ||
|
||
// It's a Core session so we have our Client. | ||
_observers.SetAndSendValue(ref _client, coreSession.Client); | ||
return Unit.Instance; // Essentially a "void" value that is ignored. | ||
}, corePlusSession => { | ||
// It's a CorePlus session so subscribe us to its PQ observer for the appropriate PQ ID | ||
// If no PQ id was provided, that's a problem | ||
if (pqId == null) { | ||
_observers.SetAndSendStatus(ref _client, "[PQ Id is Required]"); | ||
return Unit.Instance; | ||
} | ||
|
||
// Connect to the PQ on a separate thread | ||
Utility.RunInBackground(() => ConnectToPq(corePlusSession.SessionManager, pqId)); | ||
return Unit.Instance; | ||
}); | ||
} catch (Exception ex) { | ||
_observers.SetAndSendStatus(ref _client, ex.Message); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// This is executed on a separate thread because it might take a while. | ||
/// </summary> | ||
/// <param name="sessionManager"></param> | ||
/// <param name="pqId"></param> | ||
private void ConnectToPq(SessionManager sessionManager, PersistentQueryId pqId) { | ||
StatusOr<Client> result; | ||
DndClient? dndClient = null; | ||
try { | ||
dndClient = sessionManager.ConnectToPqByName(pqId.Id, false); | ||
result = StatusOr<Client>.OfValue(dndClient); | ||
} catch (Exception ex) { | ||
result = StatusOr<Client>.OfStatus(ex.Message); | ||
} | ||
|
||
// commit the results, but on the worker thread | ||
workerThread.Invoke(() => { | ||
// This should normally be null, but maybe there's a race. | ||
var oldDndClient = Utility.Exchange(ref _ownedDndClient, dndClient); | ||
_observers.SetAndSend(ref _client, result); | ||
|
||
// Yet another thread | ||
if (oldDndClient != null) { | ||
Utility.RunInBackground(() => Utility.IgnoreExceptions(() => oldDndClient.Dispose())); | ||
} | ||
}); | ||
} | ||
|
||
private void DisposeClientState() { | ||
// Get onto the worker thread if we're not already on it. | ||
if (workerThread.InvokeIfRequired(DisposeClientState)) { | ||
return; | ||
} | ||
|
||
var oldClient = Utility.Exchange(ref _ownedDndClient, null); | ||
if (oldClient != null) { | ||
_observers.SetAndSendStatus(ref _client, "Disposing client"); | ||
oldClient.Dispose(); | ||
} | ||
} | ||
|
||
public void OnCompleted() { | ||
throw new NotImplementedException(); | ||
} | ||
|
||
public void OnError(Exception error) { | ||
throw new NotImplementedException(); | ||
} | ||
} |
Oops, something went wrong.