Skip to content

Commit

Permalink
work on #387 improve memory usage during compact by writing by 5MB ch…
Browse files Browse the repository at this point in the history
…unks
  • Loading branch information
alextekartik committed Sep 5, 2024
1 parent 0d66b83 commit 64015df
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 47 deletions.
98 changes: 60 additions & 38 deletions sembast/lib/src/database_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -324,57 +324,79 @@ class SembastDatabase extends Object
await tmpStorage.delete();
await tmpStorage.findOrCreate();

final lines = <String>[];

Future addStringLine(String line) async {
await cooperate();
exportStat.lineCount++;
if (_debugStorage) {
// ignore: avoid_print
print('tmp: $line');
}
lines.add(line);
}

var hasAsyncCodec = _contentCodec.isAsyncCodec;
Future addLine(Map map) async {
String encoded;
try {
if (hasAsyncCodec) {
encoded = await encodeRecordMapAsync(map);
} else {
encoded = encodeRecordMapSync(map);
/// Write by 5MB chunks
const maxWriteSize = 5000000;
var currentWriteSize = 0;
var sink = await tmpStorage.openAppend();
try {
final lines = <String>[];
Future<void> writeCurrent() async {
if (currentWriteSize > 0) {
var linesCopy = List.of(lines);
lines.clear();
await sink.appendLines(linesCopy);
currentWriteSize = 0;
}
}

await addStringLine(encoded);
} catch (e, st) {
Future addStringLine(String line) async {
await cooperate();
exportStat.lineCount++;
if (_debugStorage) {
// useful for debugging...
// ignore: avoid_print
print(map);
// ignore: avoid_print
print(e);
// ignore: avoid_print
print(st);
print('tmp: $line');
}
lines.add(line);
currentWriteSize += line.length;
if (currentWriteSize > maxWriteSize) {
await writeCurrent();
}
}

var hasAsyncCodec = _contentCodec.isAsyncCodec;
Future addLine(Map map) async {
String encoded;
try {
if (hasAsyncCodec) {
encoded = await encodeRecordMapAsync(map);
} else {
encoded = encodeRecordMapSync(map);
}

await addStringLine(encoded);
} catch (e, st) {
if (_debugStorage) {
// useful for debugging...
// ignore: avoid_print
print(map);
// ignore: avoid_print
print(e);
// ignore: avoid_print
print(st);
}
rethrow;
}
rethrow;
}
}

// meta is always json
await addStringLine(json.encode(_meta!.toMap()));
// meta is always json
await addStringLine(json.encode(_meta!.toMap()));

var stores = getCurrentStores();
for (var store in stores) {
final records = getCurrentRecords(store);
for (var record in records) {
await addLine(record.toDatabaseRowMap());
var stores = getCurrentStores();
for (var store in stores) {
final records = getCurrentRecords(store);
for (var record in records) {
await addLine(record.toDatabaseRowMap());
}
}
await writeCurrent();
//await tmpStorage.appendLines(lines);
} finally {
await sink.close();
}
await tmpStorage.appendLines(lines);
await _storageFs!.tmpRecover();

_exportStat = exportStat;

// devPrint('compacted: $_exportStat');
} else if (_storageJdb?.supported ?? false) {
await _storageJdb!.compact();
Expand Down
2 changes: 1 addition & 1 deletion sembast/lib/src/file_system.dart
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ abstract class IOSink {
void writeln([Object obj = '']);

/// Close the target consumer.
Future close();
Future<void> close();
}

/// The type of an entity on the file system, such as a file, directory, or link.
Expand Down
14 changes: 10 additions & 4 deletions sembast/lib/src/memory/database_factory_memory.dart
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,26 @@ class DatabaseStorageMemory extends DatabaseStorage {
}

@override
Stream<String> readLines() => throw UnimplementedError();
Stream<String> readLines() => throw UnimplementedError('readLines');

@override
Future<void> appendLines(List<String> lines) => throw UnimplementedError();
Future<void> appendLines(List<String> lines) =>
throw UnimplementedError('appendLines');

@override
DatabaseStorage? get tmpStorage => null;

@override
Future<void> tmpRecover() => throw UnimplementedError();
Future<void> tmpRecover() => throw UnimplementedError('tmpRecover');

@override
Stream<String> readSafeLines() {
throw UnimplementedError();
throw UnimplementedError('readSafeLines');
}

@override
Future<DatabaseStorageSink> openAppend() {
throw UnimplementedError('openAppend');
}
}

Expand Down
28 changes: 27 additions & 1 deletion sembast/lib/src/sembast_fs.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ import 'package:sembast/src/storage.dart';
import 'common_import.dart';
import 'file_system.dart';

class _FsDatabaseStorageSink
with DatabaseStorageSinkMixin
implements DatabaseStorageSink {
final IOSink sink;

_FsDatabaseStorageSink(this.sink);

@override
Future<void> appendLines(List<String> lines) async {
for (var line in lines) {
sink.writeln(line);
}
}

@override
Future<void> close() {
return sink.close();
}
}

/// File system storage.
class FsDatabaseStorage extends DatabaseStorage {
/// File system
Expand Down Expand Up @@ -178,7 +198,7 @@ class FsDatabaseStorage extends DatabaseStorage {
}

@override
Future appendLines(List<String> lines) {
Future<void> appendLines(List<String> lines) {
// devPrint('${file.path} lines $lines');
final sink = file.openWrite(mode: FileMode.append);

Expand All @@ -189,6 +209,12 @@ class FsDatabaseStorage extends DatabaseStorage {
return sink.close();
}

@override
Future<DatabaseStorageSink> openAppend() async {
final sink = file.openWrite(mode: FileMode.append);
return _FsDatabaseStorageSink(sink);
}

@override
String toString() {
final map = <String, Object?>{'file': file.toString(), 'fs': fs.toString()};
Expand Down
27 changes: 24 additions & 3 deletions sembast/lib/src/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,25 @@ abstract class StorageBase {
Future<bool> find();

/// Create the storage if needed
Future findOrCreate();
Future<void> findOrCreate();
}

/// Storage sink
abstract class DatabaseStorageSink {
/// Append multiple lines.
Future<void> appendLines(List<String> lines);

/// Append one line
Future<void> appendLine(String line);

/// Close the sink
Future<void> close();
}

/// Storage sink mixin
mixin DatabaseStorageSinkMixin implements DatabaseStorageSink {
@override
Future<void> appendLine(String line) => appendLines([line]);
}

///
Expand All @@ -40,10 +58,13 @@ abstract class DatabaseStorage extends StorageBase {
Stream<String> readSafeLines();

/// Append multiple lines.
Future appendLines(List<String> lines);
Future<void> appendLines(List<String> lines);

/// Append one line
Future appendLine(String line) => appendLines([line]);
Future<void> appendLine(String line) => appendLines([line]);

/// Open the storage
Future<DatabaseStorageSink> openAppend();
}

/// State update
Expand Down

0 comments on commit 64015df

Please sign in to comment.