Skip to content

Commit

Permalink
Add isDeleted flag to watched change events
Browse files Browse the repository at this point in the history
  • Loading branch information
cachapa committed Sep 16, 2023
1 parent a0f7f1d commit 91c3dd9
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 5.1.2

- Add `isDeleted` flag to watched change events

## 5.1.1

- Remove timezone drift from `Hlc.toString`
Expand Down
10 changes: 6 additions & 4 deletions lib/src/map_crdt/map_crdt.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import 'record.dart';
/// datasets. It is incredibly inefficient.
class MapCrdt extends MapCrdtBase {
final Map<String, Map<String, Record>> _recordMaps;
final Map<String, StreamController<({String key, dynamic value})>>
final Map<String,
StreamController<({String key, dynamic value, bool isDeleted})>>
_changeControllers;

@override
Expand Down Expand Up @@ -37,13 +38,14 @@ class MapCrdt extends MapCrdtBase {
void putRecords(Map<String, Map<String, Record>> dataset) {
dataset.forEach((table, records) {
_recordMaps[table]!.addAll(records);
records.forEach((key, value) =>
_changeControllers[table]!.add((key: key, value: value.value)));
records.forEach((key, record) => _changeControllers[table]!
.add((key: key, value: record.value, isDeleted: record.isDeleted)));
});
}

@override
Stream<({String key, dynamic value})> watch(String table, {String? key}) {
Stream<({String key, dynamic value, bool isDeleted})> watch(String table,
{String? key}) {
if (!tables.contains(table)) throw 'Unknown table: $table';
return key == null
? _changeControllers[table]!.stream
Expand Down
4 changes: 3 additions & 1 deletion lib/src/map_crdt/map_crdt_base.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import '../hlc.dart';
import '../types.dart';
import 'record.dart';

typedef WatchEvent = ({String key, dynamic value, bool isDeleted});

/// A CRDT backed by a simple in-memory hashmap.
/// Useful for testing, or for applications which only require small, ephemeral
/// datasets. It is incredibly inefficient.
Expand Down Expand Up @@ -97,7 +99,7 @@ abstract class MapCrdtBase extends Crdt {
/// Returns a stream of changes.
/// Use the optional [key] parameter to filter events or leave it empty to get
/// all changes.
Stream<({String key, dynamic value})> watch(String table, {String? key});
Stream<WatchEvent> watch(String table, {String? key});

@override
CrdtChangeset getChangeset({
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: crdt
description: Dart implementation of Conflict-free Replicated Data Types (CRDTs).
version: 5.1.1
version: 5.1.2
homepage: https://github.com/cachapa/crdt
repository: https://github.com/cachapa/crdt
issue_tracker: https://github.com/cachapa/crdt/issues
Expand Down
10 changes: 9 additions & 1 deletion test/map_crdt_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,18 @@ void main() {

test('Single change', () async {
// ignore: unawaited_futures
expectLater(crdt.watch('table'), emits((key: 'x', value: 1)));
expectLater(
crdt.watch('table'), emits((key: 'x', value: 1, isDeleted: false)));
await crdt.put('table', 'x', 1);
});

test('Deleted', () async {
// ignore: unawaited_futures
expectLater(
crdt.watch('table'), emits((key: 'x', value: 1, isDeleted: true)));
await crdt.put('table', 'x', 1, true);
});

test('Enforce table existence', () {
expect(
() => crdt.watch('not_table'),
Expand Down

0 comments on commit 91c3dd9

Please sign in to comment.