Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support compression on encoded content types #115

Merged
merged 1 commit into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions lib/src/content/codec_registry.dart
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
import 'dart:convert' as convert;
import 'dart:io' as io;

import 'package:xmtp_proto/xmtp_proto.dart' as xmtp;

import 'codec.dart';
import 'decoded.dart';
import 'text_codec.dart';

typedef Compressor = convert.Codec<List<int>, List<int>>;

/// This is a registry of codecs for particular types.
///
/// It knows how to apply the codecs to [decode] or [encode]
/// [xmtp.EncodedContent] to [DecodedContent]..
class CodecRegistry implements Codec<DecodedContent> {
final Map<String, Codec> _codecs = {};
static final Map<xmtp.Compression, Compressor> _compressors = {
xmtp.Compression.COMPRESSION_GZIP: io.gzip,
xmtp.Compression.COMPRESSION_DEFLATE: io.zlib,
}; // TODO: consider supporting custom compressors
static final Set<xmtp.Compression> supportedCompressions =
_compressors.keys.toSet();

void registerCodec(Codec codec) => _codecs[_key(codec.contentType)] = codec;

Expand All @@ -20,6 +31,18 @@ class CodecRegistry implements Codec<DecodedContent> {
/// Use the registered codecs to decode the [encoded] content.
@override
Future<DecodedContent> decode(xmtp.EncodedContent encoded) async {
if (encoded.hasCompression()) {
var compressor = _compressors[encoded.compression];
if (compressor == null) {
throw StateError(
"unable to decode unsupported compression ${encoded.compression}");
}
var decompressed = compressor.decode(encoded.content);
encoded = xmtp.EncodedContent()
..mergeFromMessage(encoded)
..clearCompression()
..content = decompressed;
}
var codec = _codecFor(encoded.type);
if (codec == null) {
if (encoded.hasFallback()) {
Expand All @@ -34,13 +57,30 @@ class CodecRegistry implements Codec<DecodedContent> {

/// Use the registered codecs to encode the [content].
@override
Future<xmtp.EncodedContent> encode(DecodedContent decoded) async {
Future<xmtp.EncodedContent> encode(
DecodedContent decoded, {
xmtp.Compression? compression,
}) async {
var type = decoded.contentType;
var codec = _codecFor(type);
if (codec == null) {
throw StateError("unable to encode unsupported type ${_key(type)}");
}
return codec.encode(decoded.content);
var encoded = await codec.encode(decoded.content);
// TODO: consider warning if it isn't compressed but should be
if (compression != null) {
var compressor = _compressors[compression];
if (compressor == null) {
throw StateError(
"unable to encode unsupported compression $compression");
}
var compressed = compressor.encode(encoded.content);
encoded = xmtp.EncodedContent()
..mergeFromMessage(encoded)
..compression = compression
..content = compressed;
}
return encoded;
}

@override
Expand Down
28 changes: 28 additions & 0 deletions test/content/codec_registry_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,34 @@ void main() {
expect(decoded.content, "foo bar");
});

test('compression should work during encoding and decoding', () async {
var registry = CodecRegistry();
registry.registerCodec(TextCodec());
var someText = "blah blah blah" * 100;
for (var compression in CodecRegistry.supportedCompressions) {
var encodedSmall = await registry.encode(
DecodedContent(contentTypeText, someText),
compression: compression,
);
var encodedLarge = await registry.encode(
DecodedContent(contentTypeText, someText),
);
expect(encodedSmall.type, contentTypeText);
expect(encodedLarge.type, contentTypeText);
expect(encodedSmall.hasCompression(), true);
expect(encodedLarge.hasCompression(), false);
expect(encodedSmall.content.isNotEmpty, true);
expect(encodedLarge.content.isNotEmpty, true);
expect(encodedSmall.content.length < encodedLarge.content.length, true);
var decodedSmall = await registry.decode(encodedSmall);
var decodedLarge = await registry.decode(encodedLarge);
expect(decodedSmall.contentType, contentTypeText);
expect(decodedLarge.contentType, contentTypeText);
expect(decodedSmall.content, someText);
expect(decodedLarge.content, someText);
}
});

test('unknown types should throw', () async {
var registry = CodecRegistry();
registry.registerCodec(TextCodec());
Expand Down
Loading