Skip to content
This repository has been archived by the owner on Jun 11, 2021. It is now read-only.

Commit

Permalink
[cloud_firestore][wip] migrate to present
Browse files Browse the repository at this point in the history
  • Loading branch information
long1eu committed Jan 24, 2021
1 parent 1d6f449 commit ed5b632
Show file tree
Hide file tree
Showing 220 changed files with 9,536 additions and 6,367 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ abstract class CredentialsProvider {
/// Requests token for the current user. Use [invalidateToken] to
/// force-refresh the token. Returns future that will be completed with the
/// current token.
// todo: make this a method
Future<String> get token;

/// Marks the last retrieved token as invalid, making the next [token] request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@ import 'dart:async';
import 'package:_firebase_internal_vm/_firebase_internal_vm.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/auth/credentials_provider.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/auth/user.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/firestore_error.dart';
import 'package:firebase_core_vm/firebase_core_vm.dart';
import 'package:rxdart/rxdart.dart';

/// [FirebaseAuthCredentialsProvider] uses Firebase Auth via [FirebaseApp] to
/// get an auth token.
class FirebaseAuthCredentialsProvider extends CredentialsProvider {
FirebaseAuthCredentialsProvider(this.authProvider)
: _onUserChange = BehaviorSubject<User>.seeded(authProvider.uid != null
? User(authProvider.uid)
: User.unauthenticated);
: _onUserChange =
BehaviorSubject<User>.seeded(authProvider.uid != null ? User(authProvider.uid) : User.unauthenticated);

/// Stream that will receive credential changes (sign-in / sign-out, token
/// changes).
Expand Down Expand Up @@ -47,15 +45,14 @@ class FirebaseAuthCredentialsProvider extends CredentialsProvider {
// the request is outstanding.
final int savedCounter = _tokenCounter;

final GetTokenResult result =
await authProvider.getAccessToken(forceRefresh: doForceRefresh);
final GetTokenResult result = await authProvider.getAccessToken(forceRefresh: doForceRefresh);

// Cancel the request since the token changed while the request was
// outstanding so the response is potentially for a previous user (which
// user, we can't be sure).
if (savedCounter != _tokenCounter) {
throw FirestoreError('getToken aborted due to token change',
FirestoreErrorCode.aborted);
Log.d('FirebaseAuthCredentialsProvider', 'getToken aborted due to token change');
return token;
}

return result?.token;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import 'dart:typed_data';
import 'package:cloud_firestore_vm/src/firebase/firestore/util/util.dart';
import 'package:collection/collection.dart';

/// Immutable class representing an array of bytes in Cloud Firestore.
class Blob implements Comparable<Blob> {
Blob(Uint8List bytes) : bytes = Uint8List.fromList(bytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,29 @@ import 'package:_firebase_internal_vm/_firebase_internal_vm.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/model/document.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/model/document_key.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/model/field_path.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/model/value/field_value.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/model/values.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/util/assert.dart';
import 'package:cloud_firestore_vm/src/proto/google/firestore/v1/index.dart' show Value;
import 'package:collection/collection.dart';

import 'order_by.dart';

/// Represents a bound of a query.
///
/// The bound is specified with the given components representing a position and
/// whether it's just before or just after the position (relative to whatever
/// the query order is). The position represents a logical index position for a
/// query. It's a prefix of values for the (potentially implicit) order by
/// clauses of a query. Bound provides a function to determine whether a
/// document comes before or after a bound. This is influenced by whether the
/// position is just before or just after the provided values.
/// The bound is specified with the given components representing a position and whether it's just before or just after
/// the position (relative to whatever the query order is).
///
/// The position represents a logical index position for a query. It's a prefix of values for the (potentially implicit)
/// order by clauses of a query. Bound provides a function to determine whether a document comes before or after a
/// bound. This is influenced by whether the position is just before or just after the provided values.
class Bound {
const Bound({this.position, this.before});

/// Whether this bound is just before or just after the provided position
final bool before;

/// The index position of this bound
final List<FieldValue> position;
final List<Value> position;

String canonicalString() {
// TODO(long1eu): Make this collision robust.
Expand All @@ -38,31 +38,33 @@ class Bound {
} else {
builder.write('a:');
}
position.forEach(builder.write);
bool first = true;
for (Value indexComponent in position) {
if (!first) {
builder.write(',');
}
first = false;
builder.write(canonicalId(indexComponent));
}
return builder.toString();
}

/// Returns true if a document sorts before a bound using the provided sort
/// order.
bool sortsBeforeDocument(List<OrderBy> orderBy, Document document) {
hardAssert(position.length <= orderBy.length,
'Bound has more components than query\'s orderBy');
hardAssert(position.length <= orderBy.length, 'Bound has more components than query\'s orderBy');
int comparison = 0;
for (int i = 0; i < position.length; i++) {
final OrderBy orderByComponent = orderBy[i];
final FieldValue component = position[i];
final Value component = position[i];
if (orderByComponent.field == FieldPath.keyPath) {
final Object refValue = component.value;
hardAssert(refValue is DocumentKey,
'Bound has a non-key value where the key path is being used $component');

final DocumentKey documentKey = refValue;
comparison = documentKey.compareTo(document.key);
hardAssert(
isReferenceValue(component), 'Bound has a non-key value where the key path is being used $component');
comparison = DocumentKey.fromName(component.referenceValue).compareTo(document.key);
} else {
final FieldValue docValue = document.getField(orderByComponent.field);
hardAssert(docValue != null,
'Field should exist since document matched the orderBy already.');
comparison = component.compareTo(docValue);
final Value docValue = document.getField(orderByComponent.field);
hardAssert(docValue != null, 'Field should exist since document matched the orderBy already.');
comparison = compare(component, docValue);
}

if (orderByComponent.direction == OrderByDirection.descending) {
Expand All @@ -83,11 +85,10 @@ class Bound {
other is Bound &&
runtimeType == other.runtimeType &&
before == other.before &&
const ListEquality<FieldValue>().equals(position, other.position);
const ListEquality<Value>().equals(position, other.position);

@override
int get hashCode =>
before.hashCode ^ const ListEquality<FieldValue>().hash(position);
int get hashCode => before.hashCode ^ const ListEquality<Value>().hash(position);

@override
String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// File created by
// Lung Razvan <long1eu>
// on 16/01/2021

import 'package:cloud_firestore_vm/src/firebase/firestore/auth/user.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/core/database_info.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/core/event_manager.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/core/sync_engine.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/firestore_settings.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/local/garbage_collection_scheduler.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/local/local_store.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/local/persistence/persistence.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/remote/datastore.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/remote/remote_store.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/util/async_task.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/util/database.dart';
import 'package:rxdart/rxdart.dart';

// ignore_for_file: close_sinks

/// Initializes and wires up all core components for Firestore.
///
/// Implementations provide custom components by overriding the `createX()` methods.
abstract class ComponentProvider {
Persistence _persistence;
LocalStore _localStore;
SyncEngine _syncEngine;
RemoteStore _remoteStore;
EventManager _eventManager;
BehaviorSubject<bool> _onNetworkConnected;
GarbageCollectionScheduler _gargabeCollectionScheduler;

Persistence get persistence => _persistence;

LocalStore get localStore => _localStore;

SyncEngine get syncEngine => _syncEngine;

RemoteStore get remoteStore => _remoteStore;

EventManager get eventManager => _eventManager;

BehaviorSubject<bool> get onNetworkConnected => _onNetworkConnected;

GarbageCollectionScheduler get gargabeCollectionScheduler => _gargabeCollectionScheduler;

Future<void> initialize(ComponentProviderConfiguration configuration) async {
_persistence = await createPersistence(configuration);
await persistence.start();
_localStore = createLocalStore(configuration);
_onNetworkConnected = configuration.onNetworkConnected;
_remoteStore = createRemoteStore(configuration);
_syncEngine = createSyncEngine(configuration);
_eventManager = createEventManager(configuration);
await localStore.start();
await remoteStore.start();
_gargabeCollectionScheduler = createGarbageCollectionScheduler(configuration);
}

GarbageCollectionScheduler createGarbageCollectionScheduler(ComponentProviderConfiguration configuration);

EventManager createEventManager(ComponentProviderConfiguration configuration);

LocalStore createLocalStore(ComponentProviderConfiguration configuration);

Future<Persistence> createPersistence(ComponentProviderConfiguration configuration);

RemoteStore createRemoteStore(ComponentProviderConfiguration configuration);

SyncEngine createSyncEngine(ComponentProviderConfiguration configuration);
}

class ComponentProviderConfiguration {
ComponentProviderConfiguration({
this.asyncQueue,
this.databaseInfo,
this.datastore,
this.initialUser,
this.maxConcurrentLimboResolutions,
this.settings,
this.onNetworkConnected,
this.openDatabase,
});

final AsyncQueue asyncQueue;
final DatabaseInfo databaseInfo;
final Datastore datastore;
final User initialUser;
final int maxConcurrentLimboResolutions;
final FirestoreSettings settings;
final BehaviorSubject<bool> onNetworkConnected;
final OpenDatabase openDatabase;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,32 @@ import 'package:cloud_firestore_vm/src/firebase/firestore/core/query.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/core/query_stream.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/core/sync_engine.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/core/view_snapshot.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/util/assert.dart';
import 'package:cloud_firestore_vm/src/firebase/firestore/util/util.dart';
import 'package:grpc/grpc.dart';
import 'package:rxdart/rxdart.dart';

/// EventManager is responsible for mapping queries to query event listeners.
/// It handles 'fan-out.' (Identical queries will re-use the same watch on the
/// backend.)
class EventManager implements SyncEngineCallback {
EventManager(this._syncEngine) : _queries = <Query, _QueryListenersInfo>{} {
EventManager(this._syncEngine)
: _queries = <Query, _QueryListenersInfo>{},
_controller = BehaviorSubject<void>.seeded(null) {
_syncEngine.syncEngineListener = this;
}

final SyncEngine _syncEngine;
final Map<Query, _QueryListenersInfo> _queries;

// We user BehaviorSubject because it emits the last value received
final BehaviorSubject<void> _controller;

OnlineState _onlineState = OnlineState.unknown;

/// Global snapshots stream
Stream<void> get snapshotsInSync => _controller;

/// Adds a query listener that will be called with new snapshots for the
/// query. The [EventManager] is responsible for multiplexing many listeners
/// to a single listen in the [SyncEngine] and will perform a listen if it's
Expand All @@ -44,10 +54,15 @@ class EventManager implements SyncEngineCallback {

queryInfo.listeners.add(queryListener);

queryListener.onOnlineStateChanged(_onlineState);
// Run global snapshot listeners if a consistent snapshot has been emitted.
bool raisedEvent = queryListener.onOnlineStateChanged(_onlineState);
hardAssert(!raisedEvent, "onOnlineStateChanged() shouldn't raise an event for brand-new listeners.");

if (queryInfo.viewSnapshot != null) {
await queryListener.onViewSnapshot(queryInfo.viewSnapshot);
raisedEvent = queryListener.onViewSnapshot(queryInfo.viewSnapshot);
if (raisedEvent) {
_controller.add(null);
}
}

if (firstListen) {
Expand All @@ -56,38 +71,40 @@ class EventManager implements SyncEngineCallback {
return queryInfo.targetId;
}

/// Removes a previously added listener and returns true if the listener was
/// found.
Future<bool> removeQueryListener(QueryStream listener) async {
/// It's a no-op if the listener is not found.
Future<void> removeQueryListener(QueryStream listener) async {
final Query query = listener.query;
final _QueryListenersInfo queryInfo = _queries[query];
bool lastListen = false;
bool found = false;
if (queryInfo != null) {
found = queryInfo.listeners.remove(listener);
queryInfo.listeners.remove(listener);
lastListen = queryInfo.listeners.isEmpty;
}

if (lastListen) {
_queries.remove(query);
await _syncEngine.stopListening(query);
}

return found;
}

@override
Future<void> onViewSnapshots(List<ViewSnapshot> snapshotList) async {
bool raisedEvent = false;
for (ViewSnapshot viewSnapshot in snapshotList) {
final Query query = viewSnapshot.query;
final _QueryListenersInfo info = _queries[query];
if (info != null) {
for (QueryStream listener in info.listeners.toList()) {
await listener.onViewSnapshot(viewSnapshot);
if (listener.onViewSnapshot(viewSnapshot)) {
raisedEvent = true;
}
}
info.viewSnapshot = viewSnapshot;
}
}
if (raisedEvent) {
_controller.add(null);
}
}

@override
Expand All @@ -103,12 +120,18 @@ class EventManager implements SyncEngineCallback {

@override
void handleOnlineStateChange(OnlineState onlineState) {
bool raisedEvent = false;
_onlineState = onlineState;
for (_QueryListenersInfo info in _queries.values) {
for (QueryStream listener in info.listeners.toList()) {
listener.onOnlineStateChanged(onlineState);
if (listener.onOnlineStateChanged(onlineState)) {
raisedEvent = true;
}
}
}
if (raisedEvent) {
_controller.add(null);
}
}
}

Expand All @@ -126,7 +149,8 @@ class ListenOptions {
this.includeDocumentMetadataChanges = false,
this.includeQueryMetadataChanges = false,
this.waitForSyncWhenOnline = false,
}) : assert(includeDocumentMetadataChanges != null),
})
: assert(includeDocumentMetadataChanges != null),
assert(includeQueryMetadataChanges != null),
assert(waitForSyncWhenOnline != null);

Expand All @@ -151,12 +175,9 @@ class ListenOptions {
bool waitForSyncWhenOnline,
}) {
return ListenOptions(
includeDocumentMetadataChanges:
includeDocumentMetadataChanges ?? this.includeDocumentMetadataChanges,
includeQueryMetadataChanges:
includeQueryMetadataChanges ?? this.includeQueryMetadataChanges,
waitForSyncWhenOnline:
waitForSyncWhenOnline ?? this.waitForSyncWhenOnline,
includeDocumentMetadataChanges: includeDocumentMetadataChanges ?? this.includeDocumentMetadataChanges,
includeQueryMetadataChanges: includeQueryMetadataChanges ?? this.includeQueryMetadataChanges,
waitForSyncWhenOnline: waitForSyncWhenOnline ?? this.waitForSyncWhenOnline,
);
}
}
Loading

0 comments on commit ed5b632

Please sign in to comment.