Skip to content

Commit

Permalink
Implement streamed session autoresets
Browse files Browse the repository at this point in the history
Redesigns the way autoresets work a bunch.

The server now sends a unique payload along with the autoreset request
message. This is used by clients to recognize that the server supports
this new style of autoreset and to correlate autoresets in the unlikely
case of them overtaking each other.

The autoreset now doesn't immediately start upon the first client
responding. Instead, clients send along information about their
operating system, recent latency measurements, reset capabilities
(currently only the new "stream" capability) and if the user marked
their network connection as "poor". The server collects these (under a
time limit) and then determines the best client to perform the
autoreset. At the time of writing, this goes in the order of:

* Supporting streamed resets
* Better operating systems (Android or the browser are worse)
* Better network quality (according to client settings)
* Lower average ping
* Earlier autoreset response

Whoever client gets the cut gets to perform the autoreset. If they don't
support streamed autoresets, it proceeds as before.

If they do support streamed autoresets, they are instructed to perform
one of those. The client will send a stream-reset-start message, upon
which the server will enqueue a soft reset and a start marker for the
client, using the autoreset payload as the correlator. The client in
turn uses this message with the matching correlator to send themselves
an internal message through the paint engine that will give them a clean
canvas state to build a reset image from.

The reset image is packed into a new ResetStream meta message. This
message contains gzip-compressed messages for the new reset image, which
the server will decompress into a new, parallel image. Each of these
message is acknowledged by the server before the client sends the next
one, as to not their upstream entirely.

Once the client is done sending their reset image, they tell the server
that they're done and, as a sanity check, how many messages they expect
to have been decompressed. The reset stream is then put into a pending
state, to be resolved once all clients are caught up to the messages
that have been added since after the reset stream was started (which, in
most cases, will already be the case.)

Once that occurs, the reset is resolved: all messages that have been
added since the reset stream was started are packed on top of that new
reset image and the session history is replaced with this new state.
History positions of clients are shifted forward into the new area,
block caches are shifted over to the new offsets etc. Server-side
recordings (the ones done by the --record flag, not the .archived ones)
are not touched, they will continue recording.

The resulting state *should* be identical to what clients are already
seeing, so they receive no indication that the session has been reset
other than the session size in the corner shrinking to a smaller value.
The client that does the streaming will get a message in the
bottom-right corner telling them that they're compressing the session
and uploading it, to give them an indication as to why Drawpile is using
more resources or their network is more saturated.

When an autoreset fails, the request state is reset and another round
will be attempted after a delay. This also makes some ancillary changes
along the way, like sending session size updates at a more reasonable
time after a reset, replacing the int and uint for history positions and
sizes with long long and size_t, as well as checking journal writes for
failures. Setting the network quality to "poor" also no longer results
in autoresets to be disabled entirely, instead they either transmit a
bad network score under the new system or just delay the autoreset
response by a bunch unter the old one.

This relates to #1247.
  • Loading branch information
askmeaboutlo0m committed Aug 30, 2024
1 parent 4b2f509 commit 0d7196f
Show file tree
Hide file tree
Showing 65 changed files with 3,742 additions and 479 deletions.
7 changes: 0 additions & 7 deletions src/desktop/dialogs/settingsdialog/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ void Network::initNetwork(
form, tr("Connection quality:"), true,
{{tr("Good"), 1}, {tr("Poor"), 0}});
settings.bindServerAutoReset(autoReset);
auto *autoResetNote = utils::formNote(
tr("If all operators in a session set connection quality to Poor, "
"auto-reset will not work and the server will stop processing "
"updates until the session is manually reset."),
QSizePolicy::Label, QIcon::fromTheme("dialog-warning"));
form->addRow(nullptr, autoResetNote);
settings.bindServerAutoReset(autoResetNote, &QWidget::setHidden);

auto *timeout = new QSpinBox(this);
timeout->setAlignment(Qt::AlignLeft);
Expand Down
18 changes: 15 additions & 3 deletions src/desktop/mainwindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,15 @@ MainWindow::MainWindow(bool restoreWindowPosition, bool singleSession)
m_dockToolSettings, &docks::ToolSettings::resetColors);

// Network client <-> UI connections
connect(m_doc, &Document::catchupProgress, this, &MainWindow::updateCatchupProgress);
connect(m_doc, &Document::catchupProgress, m_netstatus, &widgets::NetStatus::setCatchupProgress);
connect(
m_doc, &Document::catchupProgress, this,
&MainWindow::updateCatchupProgress);
connect(
m_doc, &Document::catchupProgress, m_netstatus,
&widgets::NetStatus::setCatchupProgress);
connect(
m_doc, &Document::streamResetProgress, this,
&MainWindow::updateStreamResetProgress);

connect(
m_doc->client(), &net::Client::serverStatusUpdate, m_viewStatusBar,
Expand Down Expand Up @@ -1855,6 +1862,11 @@ void MainWindow::updateCatchupProgress(int percent)
m_canvasView->setCatchupProgress(percent, false);
}

void MainWindow::updateStreamResetProgress(int percent)
{
m_canvasView->setStreamResetProgress(percent);
}

void MainWindow::savePreResetImageAs()
{
if(m_preResetCanvasState.isNull()) {
Expand Down Expand Up @@ -2392,7 +2404,7 @@ void MainWindow::leave()
}
});

if(m_doc->client()->uploadQueueBytes() > 0) {
if(m_doc->client()->uploadQueueBytes() > 0 || m_doc->isStreamingReset()) {
leavebox->setIcon(QMessageBox::Warning);
leavebox->setInformativeText(tr("There is still unsent data! Please wait until transmission completes!"));
}
Expand Down
1 change: 1 addition & 0 deletions src/desktop/mainwindow.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ private slots:

void showResetNoticeDialog(const drawdance::CanvasState &canvasState);
void updateCatchupProgress(int percent);
void updateStreamResetProgress(int percent);
void showCompatibilityModeWarning();

void onOperatorModeChange(bool op);
Expand Down
46 changes: 46 additions & 0 deletions src/desktop/scene/canvasscene.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "desktop/scene/transformitem.h"
#include "desktop/scene/usermarkeritem.h"
#include "desktop/utils/widgetutils.h"
#include "desktop/view/canvasscene.h"
#include "libclient/canvas/canvasmodel.h"
#include "libclient/canvas/layerlist.h"
#include "libclient/canvas/paintengine.h"
Expand Down Expand Up @@ -41,6 +42,7 @@ CanvasScene::CanvasScene(QObject *parent)
, m_lockNotice(nullptr)
, m_toolNotice(nullptr)
, m_catchup(nullptr)
, m_streamResetNotice(nullptr)
, m_showAnnotationBorders(false)
, m_showAnnotations(true)
, m_showUserMarkers(true)
Expand Down Expand Up @@ -188,6 +190,9 @@ void CanvasScene::setSceneBounds(const QRectF &sceneBounds)
if(m_catchup) {
setCatchupPosition();
}
if(m_streamResetNotice) {
setStreamResetNoticePosition();
}
for(ToggleItem *ti : m_toggleItems) {
ti->updateSceneBounds(sceneBounds);
}
Expand Down Expand Up @@ -306,6 +311,30 @@ void CanvasScene::setCatchupProgress(int percent)
setCatchupPosition();
}

void CanvasScene::setStreamResetProgress(int percent)
{
if(percent > 100) {
if(m_streamResetNotice) {
m_streamResetNotice->setText(
view::CanvasScene::getStreamResetProgressText(percent));
if(m_streamResetNotice->persist() < 0.0) {
m_streamResetNotice->setPersist(NOTICE_PERSIST);
}
}
} else {
if(m_streamResetNotice) {
m_streamResetNotice->setText(
view::CanvasScene::getStreamResetProgressText(percent));
m_streamResetNotice->setPersist(-1.0);
} else {
m_streamResetNotice = new NoticeItem(
view::CanvasScene::getStreamResetProgressText(percent));
addItem(m_streamResetNotice);
}
setStreamResetNoticePosition();
}
}

void CanvasScene::onUserJoined(int id, const QString &name)
{
Q_UNUSED(name);
Expand Down Expand Up @@ -534,6 +563,11 @@ void CanvasScene::advanceAnimations()
delete m_catchup;
m_catchup = nullptr;
}

if(m_streamResetNotice && !m_streamResetNotice->animationStep(STEP)) {
delete m_streamResetNotice;
m_streamResetNotice = nullptr;
}
}

void CanvasScene::laserTrail(int userId, int persistence, const QColor &color)
Expand Down Expand Up @@ -743,6 +777,18 @@ void CanvasScene::setCatchupPosition()
catchupBounds.height() + NOTICE_OFFSET));
}

void CanvasScene::setStreamResetNoticePosition()
{
qreal catchupOffset =
m_catchup ? m_catchup->boundingRect().height() + NOTICE_OFFSET : 0.0;
QRectF streamResetNoticeBounds = m_streamResetNotice->boundingRect();
m_streamResetNotice->updatePosition(
sceneRect().bottomRight() -
QPointF(
streamResetNoticeBounds.width() + NOTICE_OFFSET,
streamResetNoticeBounds.height() + NOTICE_OFFSET + catchupOffset));
}

void CanvasScene::setShowOwnUserMarker(bool showOwnUserMarker)
{
if(showOwnUserMarker != m_showOwnUserMarker) {
Expand Down
4 changes: 4 additions & 0 deletions src/desktop/scene/canvasscene.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public slots:

void setCatchupProgress(int percent);

void setStreamResetProgress(int percent);

signals:
//! Canvas size has just changed
void canvasResized(int xoffset, int yoffset, const QSize &oldSize);
Expand Down Expand Up @@ -180,6 +182,7 @@ private slots:
void setLockNoticePosition();
void setToolNoticePosition(bool initial);
void setCatchupPosition();
void setStreamResetNoticePosition();

//! The actual canvas model
canvas::CanvasModel *m_model;
Expand All @@ -206,6 +209,7 @@ private slots:
NoticeItem *m_lockNotice;
NoticeItem *m_toolNotice;
CatchupItem *m_catchup;
NoticeItem *m_streamResetNotice;
QVector<ToggleItem *> m_toggleItems;

OutlineItem *m_outlineItem;
Expand Down
2 changes: 1 addition & 1 deletion src/desktop/scene/noticeitem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ bool NoticeItem::setPersist(qreal seconds)
bool changed =
seconds != m_persist && (seconds < FADEOUT || m_persist < FADEOUT);
m_persist = seconds;
setOpacity(qBound(0.0, m_persist / FADEOUT, 1.0));
setOpacity(m_persist < 0.0 ? 1.0 : qBound(0.0, m_persist / FADEOUT, 1.0));
if(changed) {
refresh();
}
Expand Down
1 change: 1 addition & 0 deletions src/desktop/scene/noticeitem.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class NoticeItem final : public BaseItem {

bool setText(const QString &text);

qreal persist() const { return m_persist; }
bool setPersist(qreal seconds);

bool setOpacity(qreal opacity);
Expand Down
7 changes: 7 additions & 0 deletions src/desktop/scene/scenewrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ void SceneWrapper::setCatchupProgress(int percent, bool force)
}
}

void SceneWrapper::setStreamResetProgress(int percent)
{
if(m_scene) {
m_scene->setStreamResetProgress(percent);
}
}

void SceneWrapper::setSaveInProgress(bool saveInProgress)
{
m_view->setSaveInProgress(saveInProgress);
Expand Down
1 change: 1 addition & 0 deletions src/desktop/scene/scenewrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class SceneWrapper final : public QObject, public view::CanvasWrapper {
void setShowSelectionMask(bool showSelectionMask) override;

void setCatchupProgress(int percent, bool force) override;
void setStreamResetProgress(int percent) override;
void setSaveInProgress(bool saveInProgress) override;

void showDisconnectedWarning(
Expand Down
49 changes: 49 additions & 0 deletions src/desktop/view/canvasscene.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,35 @@ void CanvasScene::setCatchupProgress(int percent)
setCatchupPosition();
}

void CanvasScene::setStreamResetProgress(int percent)
{
if(percent > 100) {
if(m_streamResetNotice) {
m_streamResetNotice->setText(getStreamResetProgressText(percent));
if(m_streamResetNotice->persist() < 0.0) {
m_streamResetNotice->setPersist(NOTICE_PERSIST);
}
}
} else {
if(m_streamResetNotice) {
m_streamResetNotice->setText(getStreamResetProgressText(percent));
m_streamResetNotice->setPersist(-1.0);
} else {
m_streamResetNotice =
new NoticeItem(getStreamResetProgressText(percent));
addSceneItem(m_streamResetNotice);
}
setStreamResetNoticePosition();
}
}

QString CanvasScene::getStreamResetProgressText(int percent)
{
return percent < 0
? tr("Compressing canvas…")
: tr("Uploading canvas %1%").arg(qBound(0, percent, 100));
}

int CanvasScene::checkHover(const QPointF &scenePos, bool *outWasHovering)
{
ToggleItem::Action action = ToggleItem::Action::None;
Expand Down Expand Up @@ -539,6 +568,9 @@ void CanvasScene::onSceneRectChanged()
if(m_catchup) {
setCatchupPosition();
}
if(m_streamResetNotice) {
setStreamResetNoticePosition();
}
setTogglePositions();
}

Expand Down Expand Up @@ -773,6 +805,18 @@ void CanvasScene::setCatchupPosition()
catchupBounds.height() + NOTICE_OFFSET));
}

void CanvasScene::setStreamResetNoticePosition()
{
qreal catchupOffset =
m_catchup ? m_catchup->boundingRect().height() + NOTICE_OFFSET : 0.0;
QRectF streamResetNoticeBounds = m_streamResetNotice->boundingRect();
m_streamResetNotice->updatePosition(
sceneRect().bottomRight() -
QPointF(
streamResetNoticeBounds.width() + NOTICE_OFFSET,
streamResetNoticeBounds.height() + NOTICE_OFFSET + catchupOffset));
}

void CanvasScene::setTogglePositions()
{
for(ToggleItem *ti : m_toggleItems) {
Expand Down Expand Up @@ -814,6 +858,11 @@ void CanvasScene::advanceAnimations()
m_catchup = nullptr;
}

if(m_streamResetNotice && !m_streamResetNotice->animationStep(dt)) {
delete m_streamResetNotice;
m_streamResetNotice = nullptr;
}

m_animationElapsedTimer.restart();
}

Expand Down
4 changes: 4 additions & 0 deletions src/desktop/view/canvasscene.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class CanvasScene final : public QGraphicsScene {

bool hasCatchup() const;
void setCatchupProgress(int percent);
void setStreamResetProgress(int percent);
static QString getStreamResetProgressText(int percent);

int checkHover(const QPointF &scenePos, bool *outWasHovering = nullptr);
void removeHover();
Expand Down Expand Up @@ -140,6 +142,7 @@ class CanvasScene final : public QGraphicsScene {
void setLockNoticePosition();
void setToolNoticePosition(bool initial);
void setCatchupPosition();
void setStreamResetNoticePosition();
void setTogglePositions();

void advanceAnimations();
Expand Down Expand Up @@ -183,6 +186,7 @@ class CanvasScene final : public QGraphicsScene {
NoticeItem *m_toolNotice = nullptr;

CatchupItem *m_catchup = nullptr;
NoticeItem *m_streamResetNotice = nullptr;

QVector<ToggleItem *> m_toggleItems;

Expand Down
1 change: 1 addition & 0 deletions src/desktop/view/canvaswrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class CanvasWrapper {
virtual void setShowSelectionMask(bool showSelectionMask) = 0;

virtual void setCatchupProgress(int percent, bool force) = 0;
virtual void setStreamResetProgress(int percent) = 0;
virtual void setSaveInProgress(bool saveInProgress) = 0;

virtual void
Expand Down
5 changes: 5 additions & 0 deletions src/desktop/view/viewwrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ void ViewWrapper::setCatchupProgress(int percent, bool force)
}
}

void ViewWrapper::setStreamResetProgress(int percent)
{
m_scene->setStreamResetProgress(percent);
}

void ViewWrapper::setSaveInProgress(bool saveInProgress)
{
m_controller->setSaveInProgress(saveInProgress);
Expand Down
1 change: 1 addition & 0 deletions src/desktop/view/viewwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ViewWrapper final : public QObject, public CanvasWrapper {
void setShowSelectionMask(bool showSelectionMask) override;

void setCatchupProgress(int percent, bool force) override;
void setStreamResetProgress(int percent) override;
void setSaveInProgress(bool saveInProgress) override;

void showDisconnectedWarning(
Expand Down
8 changes: 8 additions & 0 deletions src/drawdance/generators/protogen/protocol.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ PrivateChat:
- oflags u8
- message utf8

ResetStream:
id: 39
comment: |
Streamed chunk of session reset messages. The client and server
will negotiate support and compression algorithm.
fields:
- data bytes


# Meta messages (opaque)

Expand Down
19 changes: 19 additions & 0 deletions src/drawdance/libengine/dpengine/canvas_history.c
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,25 @@ void DP_canvas_history_soft_reset(DP_CanvasHistory *ch, DP_DrawContext *dc,
}
}

DP_CanvasState *DP_canvas_history_stream_start_state_inc(DP_CanvasHistory *ch,
DP_DrawContext *dc)
{
DP_ASSERT(ch);
bool have_fork = have_local_fork(ch);
if (have_fork) {
search_and_replay_from(ch, dc, ch->fork.start - ch->offset, false);
}

DP_CanvasState *cs = DP_canvas_state_incref(ch->current_state);

if (have_fork) {
finish_replay(ch, replay_fork_dec(ch, DP_canvas_state_incref(cs), dc),
dc);
}

return cs;
}

int DP_canvas_history_undo_depth_limit(DP_CanvasHistory *ch)
{
DP_ASSERT(ch);
Expand Down
3 changes: 3 additions & 0 deletions src/drawdance/libengine/dpengine/canvas_history.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ void DP_canvas_history_soft_reset(DP_CanvasHistory *ch, DP_DrawContext *dc,
unsigned int context_id,
DP_CanvasHistorySoftResetFn fn, void *user);

DP_CanvasState *DP_canvas_history_stream_start_state_inc(DP_CanvasHistory *ch,
DP_DrawContext *dc);

int DP_canvas_history_undo_depth_limit(DP_CanvasHistory *ch);

void DP_canvas_history_undo_depth_limit_set(DP_CanvasHistory *ch,
Expand Down
Loading

0 comments on commit 0d7196f

Please sign in to comment.