Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriptions impl, server side #1997

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Conversation

lcodes
Copy link
Contributor

@lcodes lcodes commented Nov 15, 2024

Description of Changes

Moving subscriptions from one batch-subscribe call to one call per subscribe with matching unsubscribe call.

API and ABI breaking changes

Updates to the websocket API and introduction of new messages.

Expected complexity level and risk

3: changes the way subscriptions are stored and used.

Testing

In progress

@bfops
Copy link
Collaborator

bfops commented Nov 19, 2024

(Seems to be failing tests?)

@@ -121,7 +121,7 @@ struct SubscriptionTable {
}

pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
let queries = args.get_many::<String>("query").unwrap();
let query = args.get_one::<String>("query").unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case and because it is not part of the subscription proposal, I would consider not changing the CLI, as there is no way to add another query once you have added the first. Rather, you can issue N subscribe commands. We could also do this as future work if you prefer and make an issue for that.

Comment on lines 89 to +92
/// Register SQL queries on which to receive updates.
Subscribe(Subscribe),
/// Unregister SQL queries which are receiving updates.
Unsubscribe(Unsubscribe),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Register SQL queries on which to receive updates.
Subscribe(Subscribe),
/// Unregister SQL queries which are receiving updates.
Unsubscribe(Unsubscribe),
/// Register a SQL query on which to receive an initial and subsequent updates for.
/// The subscribed set is mutable, that is, this `Subscribe` message
/// can be followed by more, or `Unsubscribe` messages.
Subscribe(Subscribe),
/// Unregister a SQL query which the client is receiving updates for.
Unsubscribe(Unsubscribe),

Comment on lines +236 to +237
/// After connecting, to inform client of its identity.
IdentityToken(IdentityToken),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This will conflict with the ids-not-names stuff.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the ids-not-names stuff?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +47 to +48
let ptr = &self.data as *const [u8; 32] as *const u128;
QueryId { hash: unsafe { u256::from_words(*ptr, *ptr.wrapping_add(1)) } }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both a QueryHash and QueryId, why not reuse QueryHash = QueryId?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why not use u256::from_le_bytes(self.data) or some such?

Comment on lines +52 to +57
impl Into<QueryHash> for QueryId {
fn into(self) -> QueryHash {
let ptr = &self.hash.0 as *const [u128; 2] as *const [u8; 32];
QueryHash { data: unsafe { *ptr } }
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here...

Comment on lines +325 to +369
FormatSwitch::Bsatn(table_rows) => FormatSwitch::Bsatn(ws::SubscribeApplied {
total_host_execution_duration_micros,
request_id,
query_id,
rows: ws::SubscribeRows {
table_id: result.table_id,
table_name: result.table_name,
table_rows
},
}.into()),
FormatSwitch::Json(table_rows) => FormatSwitch::Json(ws::SubscribeApplied {
total_host_execution_duration_micros,
request_id,
query_id,
rows: ws::SubscribeRows {
table_id: result.table_id,
table_name: result.table_name,
table_rows
},
}.into()),
}
},
SubscriptionResult::Unsubscribe(result) => {
protocol.assert_matches_format_switch(&result.table_rows);
match result.table_rows {
FormatSwitch::Bsatn(table_rows) => FormatSwitch::Bsatn(ws::UnsubscribeApplied {
total_host_execution_duration_micros,
request_id,
query_id,
rows: ws::SubscribeRows {
table_id: result.table_id,
table_name: result.table_name,
table_rows
},
}.into()),
FormatSwitch::Json(table_rows) => FormatSwitch::Json(ws::UnsubscribeApplied {
total_host_execution_duration_micros,
request_id,
query_id,
rows: ws::SubscribeRows {
table_id: result.table_id,
table_name: result.table_name,
table_rows
},
}.into()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code duplication here is rather unfortunate :(

Comment on lines +103 to +110
let mut remove_table_query = |table_id: TableId| {
if let Entry::Occupied(mut entry) = self.tables.entry(table_id) {
let hashes = entry.get_mut();
if hashes.remove(&query) && hashes.is_empty() {
entry.remove();
}
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the version below from which it was copied looks like it could be extracted to a method.

}
}

if let Some(ids) = self.subscribers.get_mut(&query) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use ?

Comment on lines +31 to +38
let mut compiled = compile_sql(relational_db, auth, tx, &input)?;
match compiled.len() {
1 => {},
0 => return Err(SubscriptionError::Empty.into()),
_ => return Err(SubscriptionError::Multiple.into()),
}

Err(SubscriptionError::SideEffect(match compiled.pop().unwrap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut compiled = compile_sql(relational_db, auth, tx, &input)?;
match compiled.len() {
1 => {},
0 => return Err(SubscriptionError::Empty.into()),
_ => return Err(SubscriptionError::Multiple.into()),
}
Err(SubscriptionError::SideEffect(match compiled.pop().unwrap() {
let mut compiled = compile_sql(relational_db, auth, tx, &input)?;
let single = match (compiled.pop(), complied.pop()) {
(None, None) => return Err(SubscriptionError::Empty.into()),
(Some(_), Some(_)) => return Err(SubscriptionError::Multiple.into()),
(Some(x), None) => x,
};
Err(SubscriptionError::SideEffect(match single {

@@ -208,7 +223,7 @@ impl ExecutionUnit {
sql: &str,
slow_query_threshold: Option<Duration>,
compression: Compression,
) -> Option<TableUpdate<F>> {
) -> TableUpdate<F> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? I noticed the filter_map -> map

@gefjon
Copy link
Contributor

gefjon commented Nov 20, 2024

@jsdt Note there's an edge case where republishing a module with a hot-swap can remove an index, which can break an indexed semijoin subscription query, since we refuse incremental semijoins which are not indexed on both of the joined columns. The intended semantics here is that, after a hotswap, we recompile all active subscribed queries. If any of them fails to compile, like in the removed-index case, the host sends an error message to the client and then ends the subscription, but the websocket connection is not closed. Any queries that successfully compile stay in place.

Please find out if this PR implements that behavior. If it does not, please create a ticket to fix it in a follow-up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants