diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a91e0c..8d2e3e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## 5.0.0 +This version introduces a major refactor which results in multiple breaking changes. This was done with the intention to make this package the basis for a family of CRDT libraries. + +Another motivation was to make this package compatible with [crdt_sync](https://github.com/cachapa/crdt_sync), thereby abstracting the communication protocol and network management for real-time remote synchronization. + +Changes: +- Simplified API +- Removed insert and get operations to make package more storage-agnostic +- Made most methods optionally async +- Reimplemented CrdtMap as a zero-dependency implementation + ## 4.0.3 - Update to Dart 3 diff --git a/README.md b/README.md index 1ab4995..5f73481 100644 --- a/README.md +++ b/README.md @@ -1,48 +1,56 @@ Dart implementation of Conflict-free Replicated Data Types (CRDTs). -This project is heavily influenced by James Long's talk [CRTDs for Mortals](https://www.dotconferences.com/2019/12/james-long-crdts-for-mortals) and includes a Dart-native implementation of Hybrid Local Clocks (HLC) based on the paper [Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases](https://cse.buffalo.edu/tech-reports/2014-04.pdf). +This project is heavily influenced by James Long's talk [CRTDs for Mortals](https://www.dotconferences.com/2019/12/james-long-crdts-for-mortals) and includes a Dart-native implementation of Hybrid Local Clocks (HLC) based on the paper [Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases](https://cse.buffalo.edu/tech-reports/2014-04.pdf) (pdf). -It has [zero external dependencies](https://github.com/cachapa/crdt/blob/master/pubspec.yaml), so it should run everywhere where Dart runs. +It has [minimal external dependencies](https://github.com/cachapa/crdt/blob/master/pubspec.yaml), so it should run anywhere where Dart runs, which is pretty much everywhere. -See [sql_crdt](https://github.com/cachapa/sql_crdt) for an implementation of CRDTs backed by an SQL database. +The `Crdt` class implements CRDT conflict resolution and serves as a storage-agnostic interface for specific implementations. Als included with this package is `MapCrdt`, an ephemeral implementation using Dart HashMaps. + +Other implementations include (so far): +- [hive_crdt](https://github.com/cachapa/hive_crdt), a no-sql implementation using [Hive](https://pub.dev/packages/hive) as persistent storage. +- [sql_crdt](https://github.com/cachapa/sql_crdt), an abstract implementation for using relational databases as a data storage backend. +- [sqlite_crdt](https://github.com/cachapa/sqlite_crdt), an implementation using Sqlite for storage, useful for mobile or small projects. +- [postgres_crdt](https://github.com/cachapa/postgres_crdt), a `sql_crdt` that benefits from PostgreSQL's performance and scalability intended for backend applications. + +Moreover, there's [crdt_sync](https://github.com/cachapa/crdt_sync), a turnkey approach for real-time network synchronization of `Crdt` nodes. ## Usage -The `Crdt` class works as a layer on top of a map. The simplest way to experiment is to initialise it with an empty map: +The simplest way to experiment with this package is to use the provided `MapCrdt` implementation: ```dart -import 'package:crdt/crdt.dart'; +import 'package:crdt/map_crdt.dart'; void main() { - var crdt = MapCrdt('node_id'); - - // Insert a record - crdt.put('a', 1); - // Read the record - print('Record: ${crdt.get('a')}'); - - // Export the CRDT as Json - final json = crdt.toJson(); - // Send to remote node - final remoteJson = sendToRemote(json); - // Merge remote CRDT with local - crdt.mergeJson(remoteJson); - // Verify updated record - print('Record after merging: ${crdt.get('a')}'); -} + var crdt1 = MapCrdt(['table']); + var crdt2 = MapCrdt(['table']); + + print('Inserting 2 records in crdt1…'); + crdt1.put('table', 'a', 1); + crdt1.put('table', 'b', 1); + + print('crdt1: ${crdt1.get('table')}'); -// Mock sending the CRDT to a remote node and getting an updated one back -String sendToRemote(String json) { - final hlc = Hlc.now('another_nodeId'); - return '{"a":{"hlc":"$hlc","value":2}}'; + print('\nInserting a conflicting record in crdt2…'); + crdt2.put('table', 'a', 2); + + print('crdt2: ${crdt2.get('table')}'); + + print('\nMerging crdt2 into crdt1…'); + crdt1.merge(crdt2.getChangeset()); + + print('crdt1: ${crdt1.get('table')}'); } ``` -You'll probably want to implement some sort of persistent storage by subclassing the `Crdt` class. An example using [Hive](https://pub.dev/packages/hive) is provided in [hive_crdt](https://github.com/cachapa/hive_crdt). +## Implementations + +`crdt` is currently helping build local-first experiences for: -## Example +- [Libra](https://libra-app.eu) a weigh management app with 1M+ installs. +- [tudo](https://github.com/cachapa/tudo) an open-source simple to-do app + backend. -A [simple example](https://github.com/cachapa/crdt/blob/master/example/crdt_example.dart) is provided with this project. +Are you using this package in your project? Let me know! ## Features and bugs diff --git a/example/crdt_example.dart b/example/crdt_example.dart index 34a56bc..15b5250 100644 --- a/example/crdt_example.dart +++ b/example/crdt_example.dart @@ -1,25 +1,22 @@ -import 'package:crdt/crdt.dart'; +import 'package:crdt/src/map_crdt/map_crdt.dart'; void main() { - var crdt = MapCrdt('node_id'); + var crdt1 = MapCrdt(['table']); + var crdt2 = MapCrdt(['table']); - // Insert a record - crdt.put('a', 1); - // Read the record - print('Record: ${crdt.get('a')}'); + print('Inserting 2 records in crdt1…'); + crdt1.put('table', 'a', 1); + crdt1.put('table', 'b', 1); - // Export the CRDT as Json - final json = crdt.toJson(); - // Send to remote node - final remoteJson = sendToRemote(json); - // Merge remote CRDT with local - crdt.mergeJson(remoteJson); - // Verify updated record - print('Record after merging: ${crdt.get('a')}'); -} + print('crdt1: ${crdt1.getMap('table')}'); + + print('\nInserting a conflicting record in crdt2…'); + crdt2.put('table', 'a', 2); + + print('crdt2: ${crdt2.getMap('table')}'); + + print('\nMerging crdt2 into crdt1…'); + crdt1.merge(crdt2.getChangeset()); -// Mock sending the CRDT to a remote node and getting an updated one back -String sendToRemote(String json) { - final hlc = Hlc.now('another_nodeId'); - return '{"a":{"hlc":"$hlc","value":2}}'; + print('crdt1: ${crdt1.getMap('table')}'); } diff --git a/lib/crdt.dart b/lib/crdt.dart index c276d38..10a9b19 100644 --- a/lib/crdt.dart +++ b/lib/crdt.dart @@ -1,7 +1,5 @@ library crdt; export 'src/crdt.dart'; -export 'src/crdt_json.dart'; export 'src/hlc.dart'; -export 'src/map_crdt.dart'; -export 'src/record.dart'; +export 'src/types.dart'; diff --git a/lib/map_crdt.dart b/lib/map_crdt.dart index 19a3efb..e417793 100644 --- a/lib/map_crdt.dart +++ b/lib/map_crdt.dart @@ -1,53 +1,5 @@ -import 'dart:async'; +library map_crdt; -import 'crdt.dart'; -import 'hlc.dart'; -import 'record.dart'; - -/// A CRDT backed by a in-memory map. -/// Useful for testing, or for applications which only require temporary datasets. -class MapCrdt extends Crdt { - final _map = >{}; - final _controller = StreamController>.broadcast(); - - @override - final String nodeId; - - MapCrdt(this.nodeId, [Map> seed = const {}]) { - _map.addAll(seed); - } - - @override - bool containsKey(K key) => _map.containsKey(key); - - @override - Record? getRecord(K key) => _map[key]; - - @override - void putRecord(K key, Record value) { - _map[key] = value; - _controller.add(MapEntry(key, value.value)); - } - - @override - void putRecords(Map> recordMap) { - _map.addAll(recordMap); - recordMap - .map((key, value) => MapEntry(key, value.value)) - .entries - .forEach(_controller.add); - } - - @override - Map> recordMap({Hlc? modifiedSince}) => - Map>.from(_map) - ..removeWhere((_, record) => - record.modified.logicalTime < (modifiedSince?.logicalTime ?? 0)); - - @override - Stream> watch({K? key}) => - _controller.stream.where((event) => key == null || key == event.key); - - @override - void purge() => _map.clear(); -} +export 'src/map_crdt/map_crdt.dart'; +export 'src/map_crdt/map_crdt_base.dart'; +export 'src/map_crdt/record.dart'; diff --git a/lib/src/crdt.dart b/lib/src/crdt.dart index 4db311d..060f90a 100644 --- a/lib/src/crdt.dart +++ b/lib/src/crdt.dart @@ -1,170 +1,102 @@ -import 'dart:math'; +import 'dart:async'; + +import 'package:meta/meta.dart'; +import 'package:uuid/uuid.dart'; -import 'crdt_json.dart'; import 'hlc.dart'; -import 'record.dart'; +import 'types.dart'; + +String generateNodeId() => Uuid().v4(); -abstract class Crdt { - /// Represents the latest logical time seen in the stored data +abstract mixin class Crdt { late Hlc _canonicalTime; + /// Represents the latest logical time seen in the stored data. Hlc get canonicalTime => _canonicalTime; - String get nodeId; - - /// Returns [true] if CRDT has any non-deleted records. - bool get isEmpty => map.isEmpty; - - /// Get size of dataset excluding deleted records. - int get length => map.length; - - /// Returns a simple key-value map without HLCs or deleted records. - /// See [recordMap]. - Map get map => - (recordMap()..removeWhere((_, record) => record.isDeleted)) - .map((key, record) => MapEntry(key, record.value)); - - List get keys => map.keys.toList(); - - List get values => map.values.toList(); - - Crdt() { - refreshCanonicalTime(); - } - - /// Gets a stored value. Returns [null] if value doesn't exist. - V? get(K key) => getRecord(key)?.value; - - /// Inserts or updates a value in the CRDT and increments the canonical time. - void put(K key, V? value) { - _canonicalTime = _canonicalTime.increment(); - final record = Record(_canonicalTime, value, _canonicalTime); - putRecord(key, record); + /// Updates the canonical time. + /// Should *never* be called from outside implementations. + @protected + set canonicalTime(Hlc value) => _canonicalTime = value; + + final _tableChangesController = + StreamController<({Hlc hlc, Iterable tables})>.broadcast(); + + /// Get this CRDT's node id + String get nodeId => canonicalTime.nodeId; + + /// Emits a list of the tables affected by changes in the database and the + /// timestamp at which they happened. + /// Useful for guaranteeing atomic merges across multiple tables. + Stream<({Hlc hlc, Iterable tables})> get onTablesChanged => + _tableChangesController.stream; + + /// Returns the last modified timestamp, optionally filtering for or against a + /// specific node id. + /// Useful to get "modified since" timestamps for synchronization. + /// Returns [Hlc.zero] if no timestamp is found. + FutureOr getLastModified({String? onlyNodeId, String? exceptNodeId}); + + /// Get a [Changeset] using the provided [changesetQueries]. + /// + /// Set the filtering parameters to to generate subsets: + /// [onlyTables] only records from the specified tables. Leave empty for all. + /// [onlyNodeId] only records set by the specified node. + /// [exceptNodeId] only records not set by the specified node. + /// [modifiedOn] records whose modified at this exact [Hlc]. + /// [modifiedAfter] records modified after the specified [Hlc]. + FutureOr getChangeset({ + Iterable? onlyTables, + String? onlyNodeId, + String? exceptNodeId, + Hlc? modifiedOn, + Hlc? modifiedAfter, + }); + + /// Checks if changeset is valid. This method is intended for implementations + /// and shouldn't generally be called from outside. + /// + /// Returns the highest hlc in the changeset or the canonical time, if higher. + @protected + Hlc validateChangeset(CrdtChangeset changeset) { + var hlc = canonicalTime; + // Iterate through all the incoming timestamps to: + // - Check for invalid entries (throws exception) + // - Update local canonical time if needed + changeset.forEach((table, records) { + for (final record in records) { + try { + hlc = hlc.merge(record['hlc'] as Hlc); + } catch (e) { + throw MergeError(e, table, record); + } + } + }); + return hlc; } - /// Inserts or updates all values in the CRDT and increments the canonical time accordingly. - void putAll(Map values) { - // Avoid touching the canonical time if no data is inserted - if (values.isEmpty) return; + /// Merge [changeset] with the local dataset. + FutureOr merge(CrdtChangeset changeset); - _canonicalTime = _canonicalTime.increment(); - final records = values.map>((key, value) => - MapEntry(key, Record(_canonicalTime, value, _canonicalTime))); - putRecords(records); - } + /// Notifies listeners and updates the canonical time. + @protected + void onDatasetChanged(Iterable affectedTables, Hlc hlc) { + assert(hlc >= canonicalTime); - /// Marks the record as deleted. - /// Note: this doesn't actually delete the record since the deletion needs to be propagated when merging with other CRDTs. - void delete(K key) => put(key, null); - - /// Checks if a record is marked as deleted - /// Returns null if record does not exist - bool? isDeleted(K key) => getRecord(key)?.isDeleted; - - /// Marks all records as deleted. - /// Note: by default this doesn't actually delete the records since the deletion needs to be propagated when merging with other CRDTs. - /// Set [purge] to true to clear the records. Useful for testing or to reset a store. - void clear({bool purge = false}) { - if (purge) { - this.purge(); - } else { - putAll(map.map((key, _) => MapEntry(key, null))); - } - } + // Bump canonical time if the new timestamp is higher + if (hlc > canonicalTime) canonicalTime = hlc; - /// Merges two CRDTs and updates record and canonical clocks accordingly. - /// See also [mergeJson()]. - void merge(Map> remoteRecords) { - final localRecords = recordMap(); - - final updatedRecords = (remoteRecords - ..removeWhere((key, value) { - _canonicalTime = _canonicalTime.merge(value.hlc); - return localRecords[key] != null && - localRecords[key]!.hlc >= value.hlc; - })) - .map((key, value) => - MapEntry(key, Record(value.hlc, value.value, _canonicalTime))); - - // Store updated records - putRecords(updatedRecords); - - // Increment canonical time - _canonicalTime = _canonicalTime.increment(); - } - - /// Merges two CRDTs and updates record and canonical clocks accordingly. - /// Use [keyDecoder] to convert non-string keys. - /// Use [valueDecoder] to convert non-native value types. - /// See also [merge()]. - void mergeJson(String json, - {KeyDecoder? keyDecoder, ValueDecoder? valueDecoder}) { - final map = CrdtJson.decode( - json, - _canonicalTime, - keyDecoder: keyDecoder, - valueDecoder: valueDecoder, - ); - merge(map); + _tableChangesController.add((hlc: hlc, tables: affectedTables)); } +} - /// Iterates through the CRDT to find the highest HLC timestamp. - /// Used to seed the Canonical Time. - /// Should be overridden if the implementation can do it more efficiently. - void refreshCanonicalTime() { - final map = recordMap(); - _canonicalTime = Hlc.fromLogicalTime( - map.isEmpty - ? 0 - : map.values.map((record) => record.hlc.logicalTime).reduce(max), - nodeId); - } +class MergeError { + final T error; + final String table; + final Map record; - /// Outputs the contents of this CRDT in Json format. - /// Use [modifiedSince] to encode only the most recently modified records. - /// Use [keyEncoder] to convert non-string keys. - /// Use [valueEncoder] to convert non-native value types. - String toJson( - {Hlc? modifiedSince, - KeyEncoder? keyEncoder, - ValueEncoder? valueEncoder}) => - CrdtJson.encode( - recordMap(modifiedSince: modifiedSince), - keyEncoder: keyEncoder, - valueEncoder: valueEncoder, - ); + MergeError(this.error, this.table, this.record); @override - String toString() => recordMap().toString(); - - //=== Abstract methods ===// - - bool containsKey(K key); - - /// Gets record containing value and HLC. - Record? getRecord(K key); - - /// Stores record without updating the HLC. - /// Meant for subclassing, clients should use [put()] instead. - /// Make sure to call [refreshCanonicalTime()] if using this method directly. - void putRecord(K key, Record value); - - /// Stores records without updating the HLC. - /// Meant for subclassing, clients should use [putAll()] instead. - /// Make sure to call [refreshCanonicalTime()] if using this method directly. - void putRecords(Map> recordMap); - - /// Retrieves CRDT map including HLCs. Useful for merging with other CRDTs. - /// Use [modifiedSince] to get only the most recently modified records. - /// See also [toJson()]. - Map> recordMap({Hlc? modifiedSince}); - - /// Watch for changes to this CRDT. - /// Use [key] to monitor a specific key. - Stream> watch({K key}); - - /// Clear all records. Records will be removed rather than being marked as deleted. - /// Useful for testing or to reset a store. - /// See also [clear]. - void purge(); + String toString() => '$error\n$table: $record'; } diff --git a/lib/src/crdt_json.dart b/lib/src/crdt_json.dart deleted file mode 100644 index 0b24fb4..0000000 --- a/lib/src/crdt_json.dart +++ /dev/null @@ -1,37 +0,0 @@ -import 'dart:convert'; - -import 'package:crdt/crdt.dart'; - -class CrdtJson { - CrdtJson._(); - - static String encode(Map> map, - {KeyEncoder? keyEncoder, ValueEncoder? valueEncoder}) => - jsonEncode( - map.map( - (key, value) => MapEntry( - keyEncoder == null ? key.toString() : keyEncoder(key), - value.toJson(key, valueEncoder: valueEncoder), - ), - ), - ); - - static Map> decode(String json, Hlc canonicalTime, - {KeyDecoder? keyDecoder, - ValueDecoder? valueDecoder, - NodeIdDecoder? nodeIdDecoder}) { - final now = Hlc.now(canonicalTime.nodeId); - final modified = canonicalTime >= now ? canonicalTime : now; - return (jsonDecode(json) as Map).map( - (key, value) => MapEntry( - keyDecoder == null ? key as K : keyDecoder(key), - Record.fromJson( - key, - value, - modified, - valueDecoder: valueDecoder, - ), - ), - ); - } -} diff --git a/lib/src/map_crdt/map_crdt.dart b/lib/src/map_crdt/map_crdt.dart new file mode 100644 index 0000000..a6f9ce3 --- /dev/null +++ b/lib/src/map_crdt/map_crdt.dart @@ -0,0 +1,31 @@ +import 'map_crdt_base.dart'; +import 'record.dart'; + +/// A CRDT backed by a simple in-memory hashmap. +/// Useful for testing, or for applications which only require small, ephemeral +/// datasets. It is incredibly inefficient. +class MapCrdt extends MapCrdtBase { + final Map> _recordMaps; + + @override + bool get isEmpty => _recordMaps.values.fold(true, (p, e) => p && e.isEmpty); + + @override + bool get isNotEmpty => !isEmpty; + + /// Instantiate a MapCrdt object with empty [tables]. + MapCrdt(Iterable tables) + : _recordMaps = {for (final table in tables.toSet()) table: {}}, + assert(tables.isNotEmpty), + super(tables); + + @override + Record? getRecord(String table, String key) => _recordMaps[table]![key]; + + @override + Map getRecords(String table) => Map.of(_recordMaps[table]!); + + @override + void putRecords(Map> dataset) => + dataset.forEach((table, records) => _recordMaps[table]!.addAll(records)); +} diff --git a/lib/src/map_crdt/map_crdt_base.dart b/lib/src/map_crdt/map_crdt_base.dart new file mode 100644 index 0000000..32d11b0 --- /dev/null +++ b/lib/src/map_crdt/map_crdt_base.dart @@ -0,0 +1,206 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +import '../crdt.dart'; +import '../hlc.dart'; +import '../types.dart'; +import 'record.dart'; + +/// A CRDT backed by a simple in-memory hashmap. +/// Useful for testing, or for applications which only require small, ephemeral +/// datasets. It is incredibly inefficient. +abstract class MapCrdtBase extends Crdt { + /// Names of all tables contained in this dataset. + final Set tables; + + /// Whether this dataset is empty. + bool get isEmpty; + + /// Whether this dataset has at least one record. + bool get isNotEmpty; + + MapCrdtBase(Iterable tables) : tables = tables.toSet() { + final nodeId = isEmpty + ? generateNodeId() + : tables + .map(getRecords) + .firstWhere((e) => e.isNotEmpty) + .values + .first + .modified + .nodeId; + // Seed canonical time with a node id, needed for [getLastModified] + canonicalTime = Hlc.zero(nodeId); + canonicalTime = getLastModified(); + } + + @protected + Record? getRecord(String table, String key); + + @protected + Map getRecords(String table); + + @protected + FutureOr putRecords(Map> dataset); + + /// Get a value from the local dataset. + dynamic get(String table, String key) { + if (!tables.contains(table)) throw 'Unknown table: $table'; + final value = getRecord(table, key); + return value == null || value.isDeleted ? null : value.value; + } + + /// Get a table map from the local dataset. + Map getMap(String table) { + if (!tables.contains(table)) throw 'Unknown table: $table'; + return (getRecords(table)..removeWhere((_, record) => record.isDeleted)) + .map((key, record) => MapEntry(key, record.value)); + } + + /// Insert a single value into this dataset. + /// + /// Use [putAll] if inserting multiple values to avoid incrementing the + /// canonical time unnecessarily. + // TODO Find a way to make this return [void] for sync implementations + Future put(String table, String key, dynamic value, + [bool isDeleted = false]) => + putAll({ + table: {key: value} + }, isDeleted); + + /// Insert multiple values into this dataset. + // TODO Find a way to make this return [void] for sync implementations + Future putAll(Map> dataset, + [bool isDeleted = false]) async { + // Ensure all incoming tables exist in local dataset + final badTables = dataset.keys.toSet().difference(tables); + if (badTables.isNotEmpty) { + throw 'Unknown table(s): ${badTables.join(', ')}'; + } + + // Ignore empty records + dataset.removeWhere((_, records) => records.isEmpty); + + // Generate records with incremented canonical time + final hlc = canonicalTime.increment(); + final records = dataset.map((table, values) => MapEntry( + table, + values.map((key, value) => + MapEntry(key, Record(value, isDeleted, hlc, hlc))))); + + // Store records + await putRecords(records); + onDatasetChanged(records.keys, hlc); + } + + @override + CrdtChangeset getChangeset({ + Iterable? onlyTables, + String? onlyNodeId, + String? exceptNodeId, + Hlc? modifiedOn, + Hlc? modifiedAfter, + }) { + assert(onlyNodeId == null || exceptNodeId == null); + assert(modifiedOn == null || modifiedAfter == null); + + // Modified times use the local node id + modifiedOn = modifiedOn?.apply(nodeId: nodeId); + modifiedAfter = modifiedAfter?.apply(nodeId: nodeId); + + // Ensure all incoming tables exist in local dataset + onlyTables ??= tables; + final badTables = onlyTables.toSet().difference(tables); + if (badTables.isNotEmpty) { + throw 'Unknown table(s): ${badTables.join(', ')}'; + } + + // Get records for the specified tables + final changeset = { + for (final table in onlyTables) table: getRecords(table) + }; + + // Apply remaining filters + for (final records in changeset.values) { + records.removeWhere((_, value) => + (onlyNodeId != null && value.hlc.nodeId != onlyNodeId) || + (exceptNodeId != null && value.hlc.nodeId == exceptNodeId) || + (modifiedOn != null && value.modified != modifiedOn) || + (modifiedAfter != null && value.modified <= modifiedAfter)); + } + + // Remove empty table changesets + changeset.removeWhere((_, records) => records.isEmpty); + + return changeset.map((table, records) => MapEntry( + table, + records + .map((key, record) => MapEntry(key, { + 'key': key, + ...record.toJson(), + })) + .values + .toList())); + } + + @override + Hlc getLastModified({String? onlyNodeId, String? exceptNodeId}) { + assert(onlyNodeId == null || exceptNodeId == null); + + final hlc = tables + .map((e) => getRecords(e).entries.map((e) => e.value)) + // Flatten records into single iterable + .fold([], (p, e) => p..addAll(e)) + // Apply filters + .where((e) => + (onlyNodeId == null && exceptNodeId == null) || + (onlyNodeId != null && e.hlc.nodeId == onlyNodeId) || + (exceptNodeId != null && e.hlc.nodeId != exceptNodeId)) + // Get only modified times + .map((e) => e.modified) + // Get highest time + .fold(Hlc.zero(nodeId), (p, e) => p > e ? p : e); + + return hlc; + } + + // TODO Find a way to make this return [void] for sync implementations + @override + Future merge(CrdtChangeset changeset) async { + if (changeset.recordCount == 0) return; + + // Ensure all incoming tables exist in local dataset + final badTables = changeset.keys.toSet().difference(tables); + if (badTables.isNotEmpty) { + throw 'Unknown table(s): ${badTables.join(', ')}'; + } + + // Ignore empty records + changeset.removeWhere((_, records) => records.isEmpty); + + // Validate changeset and get new canonical time + final hlc = validateChangeset(changeset); + + final newRecords = >{}; + for (final entry in changeset.entries) { + final table = entry.key; + for (final record in entry.value) { + final existing = getRecord(table, record['key'] as String); + if (existing == null || record['hlc'] as Hlc > existing.hlc) { + newRecords[table] ??= {}; + newRecords[table]![record['key'] as String] = Record( + record['value'], + record['is_deleted'] as bool, + record['hlc'] as Hlc, + hlc, + ); + } + } + } + + // Write new records + await putRecords(newRecords); + onDatasetChanged(changeset.keys, hlc); + } +} diff --git a/lib/src/map_crdt/record.dart b/lib/src/map_crdt/record.dart new file mode 100644 index 0000000..95ca8f0 --- /dev/null +++ b/lib/src/map_crdt/record.dart @@ -0,0 +1,31 @@ +import '../hlc.dart'; + +/// Stores a value associated with a given HLC +class Record { + final V? value; + final bool isDeleted; + final Hlc hlc; + final Hlc modified; + + Record(this.value, this.isDeleted, this.hlc, this.modified); + + /// Convenience method to implicitly copy the record type + Record copyWith(V? value, bool isDeleted, Hlc hlc, Hlc modified) => + Record(value, isDeleted, hlc, modified); + + Map toJson() => { + 'value': value, + 'is_deleted': isDeleted, + 'hlc': hlc, + }; + + @override + bool operator ==(other) => + other is Record && hlc == other.hlc && value == other.value; + + @override + int get hashCode => Object.hash(hlc, value, modified); + + @override + String toString() => toJson().toString(); +} diff --git a/lib/src/record.dart b/lib/src/record.dart deleted file mode 100644 index 38b03ce..0000000 --- a/lib/src/record.dart +++ /dev/null @@ -1,42 +0,0 @@ -import 'hlc.dart'; - -typedef KeyEncoder = String Function(K key); -typedef ValueEncoder = dynamic Function(K key, V? value); - -typedef KeyDecoder = K Function(String key); -typedef ValueDecoder = V Function(String key, dynamic value); - -typedef NodeIdDecoder = dynamic Function(String nodeId); - -/// Stores a value associated with a given HLC -class Record { - final Hlc hlc; - final V? value; - final Hlc modified; - - bool get isDeleted => value == null; - - Record(this.hlc, this.value, this.modified); - - Record.fromJson(dynamic key, Map map, this.modified, - {ValueDecoder? valueDecoder}) - : hlc = Hlc.parse(map['hlc']), - value = valueDecoder == null || map['value'] == null - ? map['value'] - : valueDecoder(key, map['value']); - - Map toJson(K key, {ValueEncoder? valueEncoder}) => { - 'hlc': hlc.toJson(), - 'value': valueEncoder == null ? value : valueEncoder(key, value), - }; - - @override - bool operator ==(other) => - other is Record && hlc == other.hlc && value == other.value; - - @override - int get hashCode => Object.hash(hlc, value, modified); - - @override - String toString() => toJson('').toString(); -} diff --git a/lib/src/types.dart b/lib/src/types.dart new file mode 100644 index 0000000..6e40ce1 --- /dev/null +++ b/lib/src/types.dart @@ -0,0 +1,8 @@ +typedef CrdtRecord = Map; +typedef CrdtTableChangeset = List; +typedef CrdtChangeset = Map; + +extension CrdtChangesetX on CrdtChangeset { + /// Convenience method to get number of records in a changeset + int get recordCount => values.fold(0, (prev, e) => prev + e.length); +} diff --git a/pubspec.yaml b/pubspec.yaml index 2c45f88..5c498a7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: crdt description: Dart implementation of Conflict-free Replicated Data Types (CRDTs). -version: 4.0.3 +version: 5.0.0 homepage: https://github.com/cachapa/crdt repository: https://github.com/cachapa/crdt issue_tracker: https://github.com/cachapa/crdt/issues @@ -9,7 +9,8 @@ environment: sdk: '>=3.0.0 <4.0.0' dependencies: - # no dependencies, no problems + meta: ^1.10.0 + uuid: ^4.0.0 dev_dependencies: lints: any diff --git a/test/crdt_test.dart b/test/crdt_test.dart deleted file mode 100644 index f6e3aaf..0000000 --- a/test/crdt_test.dart +++ /dev/null @@ -1,132 +0,0 @@ -import 'package:crdt/crdt.dart'; -import 'package:test/test.dart'; - -// Make dart test happy -void main() {} - -void crdtTests>(String nodeId, - {T Function()? syncSetup, - Future Function()? asyncSetup, - void Function(T crdt)? syncTearDown, - Future Function(T crdt)? asyncTearDown}) { - group('Basic', () { - late T crdt; - - setUp(() async { - crdt = syncSetup != null ? syncSetup() : await asyncSetup!(); - }); - - test('Node ID', () { - expect(crdt.nodeId, nodeId); - }); - - test('Empty', () { - expect(crdt.isEmpty, isTrue); - expect(crdt.length, 0); - expect(crdt.map, {}); - expect(crdt.keys, []); - expect(crdt.values, []); - }); - - test('One record', () { - crdt.put('x', 1); - - expect(crdt.isEmpty, isFalse); - expect(crdt.length, 1); - expect(crdt.map, {'x': 1}); - expect(crdt.keys, ['x']); - expect(crdt.values, [1]); - }); - - test('Empty after deleted record', () { - crdt.put('x', 1); - crdt.delete('x'); - - expect(crdt.isEmpty, isTrue); - expect(crdt.length, 0); - expect(crdt.map, {}); - expect(crdt.keys, []); - expect(crdt.values, []); - }); - - test('Put', () { - crdt.put('x', 1); - expect(crdt.get('x'), 1); - }); - - test('Update existing', () { - crdt.put('x', 1); - crdt.put('x', 2); - expect(crdt.get('x'), 2); - }); - - test('Put many', () { - crdt.putAll({'x': 2, 'y': 3}); - expect(crdt.get('x'), 2); - expect(crdt.get('y'), 3); - }); - - test('Delete value', () { - crdt.put('x', 1); - crdt.put('y', 2); - crdt.delete('x'); - expect(crdt.isDeleted('x'), isTrue); - expect(crdt.isDeleted('y'), isFalse); - expect(crdt.get('x'), null); - expect(crdt.get('y'), 2); - }); - - test('Clear', () { - crdt.put('x', 1); - crdt.put('y', 2); - crdt.clear(); - expect(crdt.isDeleted('x'), isTrue); - expect(crdt.isDeleted('y'), isTrue); - expect(crdt.get('x'), null); - expect(crdt.get('y'), null); - }); - - tearDown(() async { - if (syncTearDown != null) syncTearDown(crdt); - if (asyncTearDown != null) await asyncTearDown(crdt); - }); - }); - - group('Watch', () { - late T crdt; - - setUp(() async { - crdt = syncSetup != null ? syncSetup() : await asyncSetup!(); - }); - - test('All changes', () async { - final streamTest = expectLater( - crdt.watch(), - emitsInAnyOrder([ - (MapEntry event) => - event.key == 'x' && event.value == 1, - (MapEntry event) => - event.key == 'y' && event.value == 2, - ])); - crdt.put('x', 1); - crdt.put('y', 2); - await streamTest; - }); - - test('Key', () async { - final streamTest = expectLater( - crdt.watch(key: 'y'), - emits( - (event) => event.key == 'y' && event.value == 2, - )); - crdt.put('x', 1); - crdt.put('y', 2); - await streamTest; - }); - - tearDown(() async { - if (syncTearDown != null) syncTearDown(crdt); - if (asyncTearDown != null) await asyncTearDown(crdt); - }); - }); -} diff --git a/test/map_crdt_test.dart b/test/map_crdt_test.dart index 2afdb1f..25d3465 100644 --- a/test/map_crdt_test.dart +++ b/test/map_crdt_test.dart @@ -1,282 +1,371 @@ -import 'dart:io'; +import 'dart:async'; import 'package:crdt/crdt.dart'; +import 'package:crdt/map_crdt.dart'; import 'package:test/test.dart'; -import 'crdt_test.dart'; +Future get _delay => Future.delayed(Duration(milliseconds: 1)); -const _millis = 1000000000000; -const _isoTime = '2001-09-09T01:46:40.000Z'; +typedef TestCrdt = MapCrdt; -void main() { - final hlcNow = Hlc.now('abc'); +Future createCrdt(String collection, String table1, + [String? table2]) async => + MapCrdt([table1, table2].nonNulls); - crdtTests>('abc', syncSetup: () => MapCrdt('abc')); +Future deleteCrdt(TestCrdt crdt) async {} - group('Seed', () { - late Crdt crdt; +void main() { + late TestCrdt crdt; - setUp(() { - crdt = MapCrdt('abc', {'x': Record(hlcNow, 1, hlcNow)}); + group('Empty', () { + setUp(() async { + crdt = await createCrdt('crdt', 'table'); }); - test('Seed item', () { - expect(crdt.get('x'), 1); + tearDown(() async { + await deleteCrdt(crdt); }); - test('Seed and put', () { - crdt.put('x', 2); - expect(crdt.get('x'), 2); + test('Node ID', () { + print(crdt.isEmpty); + // expect(Uuid.isValidUUID(fromString: crdt.nodeId), true); }); - }); - group('Merge', () { - late Crdt crdt; + test('Empty', () { + expect(crdt.canonicalTime, Hlc.zero(crdt.nodeId)); + expect(crdt.isEmpty, true); + }); + }); - setUp(() { - crdt = MapCrdt('abc'); + group('Insert', () { + setUp(() async { + crdt = await createCrdt('crdt', 'table'); }); - test('Merge older', () { - crdt.put('x', 2); - crdt.merge({'x': Record(Hlc(_millis - 1, 0, 'xyz'), 1, hlcNow)}); - expect(crdt.get('x'), 2); + tearDown(() async { + await deleteCrdt(crdt); }); - test('Merge very old', () { - crdt.put('x', 2); - crdt.merge({'x': Record(Hlc(0, 0, 'xyz'), 1, hlcNow)}); - expect(crdt.get('x'), 2); + test('Single', () async { + await crdt.put('table', 'x', 1); + expect(crdt.isEmpty, false); + expect(crdt.getChangeset().recordCount, 1); + expect(crdt.get('table', 'x'), 1); }); - test('Merge newer', () async { - crdt.put('x', 1); - await Future.delayed(Duration(milliseconds: 1)); - crdt.merge({'x': Record(Hlc.now('xyz'), 2, hlcNow)}); - expect(crdt.get('x'), 2); + test('Null', () async { + await crdt.put('table', 'x', null); + expect(crdt.getChangeset().recordCount, 1); + expect(crdt.get('table', 'x'), null); }); - test('Disambiguate using node id', () { - crdt.merge({'x': Record(Hlc(_millis, 0, 'nodeA'), 1, hlcNow)}); - crdt.merge({'x': Record(Hlc(_millis, 0, 'nodeB'), 2, hlcNow)}); - expect(crdt.get('x'), 2); + test('Update', () async { + await crdt.put('table', 'x', 1); + await crdt.put('table', 'x', 2); + expect(crdt.getChangeset().recordCount, 1); + expect(crdt.get('table', 'x'), 2); }); - test('Merge same', () { - crdt.put('x', 2); - final remoteTs = crdt.getRecord('x')!.hlc; - crdt.merge({'x': Record(remoteTs, 1, hlcNow)}); - expect(crdt.get('x'), 2); + test('Multiple', () async { + await crdt.putAll({ + 'table': {'x': 1, 'y': 2} + }); + expect(crdt.getChangeset().recordCount, 2); + expect(crdt.getMap('table'), {'x': 1, 'y': 2}); }); - test('Merge older, newer counter', () { - crdt.put('x', 2); - crdt.merge({'x': Record(Hlc(_millis - 1, 2, 'xyz'), 1, hlcNow)}); - expect(crdt.get('x'), 2); + test('Enforce table existence', () { + expect(() async => await crdt.put('not_test', 'x', 1), + throwsA('Unknown table(s): not_test')); }); + }); - test('Merge same, newer counter', () { - crdt.put('x', 1); - final remoteTs = Hlc(crdt.getRecord('x')!.hlc.millis, 2, 'xyz'); - crdt.merge({'x': Record(remoteTs, 2, hlcNow)}); - expect(crdt.get('x'), 2); + group('Delete', () { + setUp(() async { + crdt = await createCrdt('crdt', 'table'); + await crdt.put('table', 'x', 1); }); - test('Merge new item', () { - final map = {'x': Record(Hlc.now('xyz'), 2, hlcNow)}; - crdt.merge(map); - expect(crdt.recordMap(), map); + tearDown(() async { + await deleteCrdt(crdt); }); - test('Merge deleted item', () async { - crdt.put('x', 1); - await Future.delayed(Duration(milliseconds: 1)); - crdt.merge({'x': Record(Hlc.now('xyz'), null, hlcNow)}); - expect(crdt.isDeleted('x'), isTrue); + test('Set deleted', () async { + await crdt.put('table', 'x', 1, true); + expect(crdt.isEmpty, false); + expect(crdt.getChangeset().recordCount, 1); + expect(crdt.getMap('table').length, 0); + expect(crdt.get('table', 'x'), null); }); - test('Update HLC on merge', () { - crdt.put('x', 1); - crdt.merge({'y': Record(Hlc(_millis - 1, 0, 'xyz'), 2, hlcNow)}); - expect(crdt.values, [1, 2]); + test('Undelete', () async { + await crdt.put('table', 'x', 1, true); + await crdt.put('table', 'x', 1, false); + expect(crdt.isEmpty, false); + expect(crdt.getChangeset().recordCount, 1); + expect(crdt.getMap('table').length, 1); + expect(crdt.get('table', 'x'), 1); }); }); - group('Serialization', () { - test('To map', () { - final crdt = MapCrdt('abc', { - 'x': Record(Hlc(_millis, 0, 'abc'), 1, hlcNow), - }); - expect(crdt.recordMap(), - {'x': Record(Hlc(_millis, 0, 'abc'), 1, hlcNow)}); + group('Merge', () { + late TestCrdt crdt1; + + setUp(() async { + crdt = await createCrdt('crdt', 'table'); + crdt1 = await createCrdt('crdt', 'table'); }); - test('jsonEncodeStringKey', () { - final crdt = MapCrdt('abc', { - 'x': Record(Hlc(_millis, 0, 'abc'), 1, hlcNow), - }); - expect(crdt.toJson(), '{"x":{"hlc":"$_isoTime-0000-abc","value":1}}'); + tearDown(() async { + await deleteCrdt(crdt); + await deleteCrdt(crdt1); }); - test('jsonEncodeIntKey', () { - final crdt = MapCrdt('abc', { - 1: Record(Hlc(_millis, 0, 'abc'), 1, hlcNow), - }); - expect(crdt.toJson(), '{"1":{"hlc":"$_isoTime-0000-abc","value":1}}'); + test('Into empty', () async { + await crdt1.put('table', 'x', 2); + await crdt.merge(crdt1.getChangeset()); + expect(crdt.get('table', 'x'), 2); }); - test('jsonEncodeDateTimeKey', () { - final crdt = MapCrdt('abc', { - DateTime(2000, 01, 01, 01, 20): - Record(Hlc(_millis, 0, 'abc'), 1, hlcNow), - }); - expect(crdt.toJson(), - '{"2000-01-01 01:20:00.000":{"hlc":"$_isoTime-0000-abc","value":1}}'); + test('Empty changeset', () async { + await crdt1.put('table', 'x', 2); + await crdt.merge(crdt1.getChangeset()); + expect(crdt.get('table', 'x'), 2); + }); + + test('Older', () async { + await crdt1.put('table', 'x', 2); + await _delay; + await crdt.put('table', 'x', 1); + await crdt.merge(crdt1.getChangeset()); + expect(crdt.get('table', 'x'), 1); }); - test('jsonEncodeCustomClassValue', () { - final crdt = MapCrdt('abc', { - 'x': Record(Hlc(_millis, 0, 'abc'), TestClass('test'), hlcNow), + test('Newer', () async { + await crdt.put('table', 'x', 1); + await _delay; + await crdt1.put('table', 'x', 2); + await crdt.merge(crdt1.getChangeset()); + expect(crdt.get('table', 'x'), 2); + }); + + test('Lower node id', () async { + await crdt.put('table', 'x', 1); + final changeset = crdt.getChangeset(); + changeset['table']!.first.addAll({ + 'hlc': (changeset['table']!.first['hlc'] as Hlc) + .apply(nodeId: '00000000-0000-0000-0000-000000000000'), + 'value': 2, }); - expect(crdt.toJson(), - '{"x":{"hlc":"$_isoTime-0000-abc","value":{"test":"test"}}}'); - }); - - test('jsonDecodeStringKey', () { - final crdt = MapCrdt('abc'); - final map = CrdtJson.decode( - '{"x":{"hlc":"$_isoTime-0000-abc","value":1}}', hlcNow); - crdt.putRecords(map); - expect(crdt.recordMap(), - {'x': Record(Hlc(_millis, 0, 'abc'), 1, hlcNow)}); - }); - - test('jsonDecodeIntKey', () { - final crdt = MapCrdt('abc'); - final map = CrdtJson.decode( - '{"1":{"hlc":"$_isoTime-0000-abc","value":1}}', hlcNow, - keyDecoder: (key) => int.parse(key)); - crdt.putRecords(map); - expect(crdt.recordMap(), {1: Record(Hlc(_millis, 0, 'abc'), 1, hlcNow)}); - }); - - test('jsonDecodeDateTimeKey', () { - final crdt = MapCrdt('abc'); - final map = CrdtJson.decode( - '{"2000-01-01 01:20:00.000":{"hlc":"$_isoTime-0000-abc","value":1}}', - hlcNow, - keyDecoder: (key) => DateTime.parse(key)); - crdt.putRecords(map); - expect(crdt.recordMap(), { - DateTime(2000, 01, 01, 01, 20): - Record(Hlc(_millis, 0, 'abc'), 1, hlcNow) + await crdt.merge(changeset); + expect(crdt.get('table', 'x'), 1); + }); + + test('Higher node id', () async { + await crdt.put('table', 'x', 1); + final changeset = crdt.getChangeset(); + changeset['table']!.first.addAll({ + 'hlc': (changeset['table']!.first['hlc'] as Hlc) + .apply(nodeId: 'ffffffff-ffff-ffff-ffff-ffffffffffff'), + 'value': 2, }); + await crdt.merge(changeset); + expect(crdt.get('table', 'x'), 2); + }); + + test('Enforce table existence', () async { + final other = await createCrdt('other', 'not_table'); + await other.put('not_table', 'x', 1); + expect(() => crdt.merge(other.getChangeset()), + throwsA('Unknown table(s): not_table')); + await deleteCrdt(other); }); - test('jsonDecodeCustomClassValue', () { - final crdt = MapCrdt('abc'); - final map = CrdtJson.decode( - '{"x":{"hlc":"$_isoTime-0000-abc","value":{"test":"test"}}}', hlcNow, - valueDecoder: (key, value) => TestClass.fromJson(value)); - crdt.putRecords(map); - expect(crdt.recordMap(), - {'x': Record(Hlc(_millis, 0, 'abc'), TestClass('test'), hlcNow)}); + test('Update canonical time after merge', () async { + await crdt1.put('table', 'x', 2); + await crdt.merge(crdt1.getChangeset()); + expect( + crdt.canonicalTime, crdt1.canonicalTime.apply(nodeId: crdt.nodeId)); }); }); - group('Delta subsets', () { - late Crdt crdt; - final hlc1 = Hlc(_millis, 0, 'abc'); - final hlc2 = Hlc(_millis + 1, 0, 'abc'); - final hlc3 = Hlc(_millis + 2, 0, 'abc'); + group('Changesets', () { + late TestCrdt crdt1; + late TestCrdt crdt2; - setUp(() { - crdt = MapCrdt('abc', { - 'x': Record(hlc1, 1, hlc1), - 'y': Record(hlc2, 2, hlc2), - }); + setUp(() async { + crdt = await createCrdt('crdt', 'table'); + crdt1 = await createCrdt('crdt1', 'table'); + crdt2 = await createCrdt('crdt2', 'table'); + + await crdt.put('table', 'x', 1); + await _delay; + await crdt1.put('table', 'y', 1); + await _delay; + await crdt2.put('table', 'z', 1); + + await crdt.merge(crdt1.getChangeset()); + await crdt.merge(crdt2.getChangeset()); }); - test('null modifiedSince', () { - final map = crdt.recordMap(); - expect(map.length, 2); + tearDown(() async { + await deleteCrdt(crdt); + await deleteCrdt(crdt1); + await deleteCrdt(crdt2); }); - test('modifiedSince hlc1', () { - final map = crdt.recordMap(modifiedSince: hlc1); - expect(map.length, 2); + test('Tables', () async { + final crdt3 = await createCrdt('table', 'another_table'); + await crdt3.put('another_table', 'a', 1); + final changeset = crdt3.getChangeset(onlyTables: ['another_table']); + expect(changeset.keys, ['another_table']); + await deleteCrdt(crdt3); }); - test('modifiedSince hlc2', () { - final map = crdt.recordMap(modifiedSince: hlc2); - expect(map.length, 1); + test('After HLC', () { + print(crdt1.canonicalTime); + expect(crdt.getChangeset(modifiedAfter: crdt1.canonicalTime), + crdt2.getChangeset()); }); - test('modifiedSince hlc3', () { - final map = crdt.recordMap(modifiedSince: hlc3); - expect(map.length, 0); + test('Empty changeset', () { + print(crdt2.canonicalTime); + expect(crdt.getChangeset(modifiedAfter: crdt2.canonicalTime), {}); + }); + + test('At HLC', () { + final changeset = crdt.getChangeset(modifiedOn: crdt1.canonicalTime); + expect(changeset, crdt1.getChangeset()); + }); + + test('Only node id', () { + final changeset = crdt.getChangeset(onlyNodeId: crdt1.nodeId); + expect(changeset, crdt1.getChangeset()); + }); + + test('Except node id', () { + final originalChangeset = crdt1.getChangeset(); + crdt1.merge(crdt2.getChangeset()); + final changeset = crdt1.getChangeset(exceptNodeId: crdt2.nodeId); + expect(changeset, originalChangeset); }); }); - group('Delta sync', () { - late Crdt crdtA; - late Crdt crdtB; - late Crdt crdtC; + group('Last modified', () { + late TestCrdt crdt1; + late TestCrdt crdt2; - setUp(() { - crdtA = MapCrdt('a'); - crdtB = MapCrdt('b'); - crdtC = MapCrdt('c'); + setUp(() async { + crdt = await createCrdt('crdt', 'table'); + crdt1 = await createCrdt('crdt1', 'table'); + crdt2 = await createCrdt('crdt2', 'table'); - crdtA.put('x', 1); - sleep(Duration(milliseconds: 100)); - crdtB.put('x', 2); + await crdt.put('table', 'x', 1); + await _delay; + await crdt1.put('table', 'y', 1); + await _delay; + await crdt2.put('table', 'z', 1); + + await crdt.merge(crdt1.getChangeset()); + await crdt.merge(crdt2.getChangeset()); + }); + + tearDown(() async { + await deleteCrdt(crdt); + await deleteCrdt(crdt1); + await deleteCrdt(crdt2); }); - test('Merge in order', () { - _sync(crdtA, crdtC); - _sync(crdtB, crdtC); + test('Everything', () { + expect(crdt.getLastModified(), + crdt2.canonicalTime.apply(nodeId: crdt.nodeId)); + }); - expect(crdtA.get('x'), 1); // node A still contains the old value - expect(crdtB.get('x'), 2); - expect(crdtC.get('x'), 2); + test('Only node id', () { + expect(crdt.getLastModified(onlyNodeId: crdt1.nodeId), + crdt1.canonicalTime.apply(nodeId: crdt.nodeId)); }); - test('Merge in reverse order', () { - _sync(crdtB, crdtC); - _sync(crdtA, crdtC); - _sync(crdtB, crdtC); + test('Except node id', () async { + // Move canonical time forward in crdt + await _delay; + await crdt.put('table', 'a', 1); + expect(crdt.getLastModified(exceptNodeId: crdt.nodeId), + crdt2.canonicalTime.apply(nodeId: crdt.nodeId)); + }); - expect(crdtA.get('x'), 2); - expect(crdtB.get('x'), 2); - expect(crdtC.get('x'), 2); + test('Assert exclusive parameters', () { + expect( + () => crdt.getLastModified( + onlyNodeId: crdt.nodeId, exceptNodeId: crdt.nodeId), + throwsA(isA())); }); }); -} -void _sync(Crdt local, Crdt remote) { - final time = local.canonicalTime; - final l = local.recordMap(); - remote.merge(l); - final r = remote.recordMap(modifiedSince: time); - local.merge(r); -} + group('Tables changed stream', () { + setUp(() async { + crdt = await createCrdt('crdt', 'table_1', 'table_2'); + }); -class TestClass { - final String test; + tearDown(() async { + await deleteCrdt(crdt); + }); - TestClass(this.test); + test('Single change', () { + expectLater( + crdt.onTablesChanged.map((e) => e.tables), emits(['table_1'])); + crdt.put('table_1', 'x', 1); + }); - static TestClass fromJson(dynamic map) => TestClass(map['test']); + test('Multiple changes to same table', () { + expectLater( + crdt.onTablesChanged.map((e) => e.tables), emits(['table_1'])); + crdt.putAll({ + 'table_1': { + 'x': 1, + 'y': 2, + } + }); + }); - Map toJson() => {'test': test}; + test('Multiple tables', () { + expectLater(crdt.onTablesChanged.map((e) => e.tables), + emits(['table_1', 'table_2'])); + crdt.putAll({ + 'table_1': {'x': 1}, + 'table_2': {'y': 2}, + }); + }); - @override - bool operator ==(other) => other is TestClass && test == other.test; + test('Do not notify empty changes', () { + expectLater( + crdt.onTablesChanged.map((e) => e.tables), emits(['table_1'])); + crdt.putAll({ + 'table_1': {'x': 1}, + 'table_2': {}, + }); + }); - @override - int get hashCode => test.hashCode; + test('Merge', () async { + final crdt1 = await createCrdt('crdt1', 'table_1', 'table_2'); + await crdt1.put('table_1', 'x', 1); + // ignore: unawaited_futures + expectLater( + crdt.onTablesChanged.map((e) => e.tables), emits(['table_1'])); + await crdt.merge(crdt1.getChangeset()); + await deleteCrdt(crdt1); + }); + }); - @override - String toString() => test; + // group('Watch', () { + // setUp(() { + // crdt = MapCrdt(nodeId: 'crdt')..createTable('table'); + // }); + // + // test('Single change', () { + // expectLater( + // crdt.watch.map((e) => e.tables), emits(['table_1'])); + // crdt.put('table_1', 'x', 1); + // }); + // }); }