From add9333d731313741919b005d91477e28a46496e Mon Sep 17 00:00:00 2001 From: VolodiaHunkalo Date: Mon, 23 Sep 2024 17:36:39 +0300 Subject: [PATCH] v0.1.27: changed upload interface ; added upload queue ; increased upload speed on slow Internet --- .idea/libraries/Dart_Packages.xml | 16 + .../builder/request_dialog_message.dart | 92 +- lib/src/backbone/cert_helper.dart | 8 + lib/src/backbone/constants.dart | 73 ++ lib/src/backbone/helper/message.dart | 2 +- lib/src/data/dialog_impl.dart | 79 +- lib/src/data/download_impl.dart | 2 +- lib/src/data/grpc/grpc_channel.dart | 2 +- .../data/service_impl/auth_service_impl.dart | 7 +- .../data/service_impl/chat_service_impl.dart | 811 +++++++++++++++--- lib/src/data/upload_impl.dart | 74 ++ lib/src/domain/entities/button.dart | 26 +- lib/src/domain/entities/dialog.dart | 33 +- .../entities/dialog_message_request.dart | 6 +- .../entities/dialog_message_response.dart | 2 +- .../domain/entities/media_file_response.dart | 18 +- lib/src/domain/entities/progress.dart | 9 + lib/src/domain/entities/upload.dart | 19 + lib/src/domain/entities/upload_file.dart | 11 + lib/src/domain/entities/upload_response.dart | 69 ++ lib/src/domain/entities/upload_task.dart | 24 + lib/src/domain/services/chat_service.dart | 27 +- lib/webitel_portal_sdk.dart | 3 + 23 files changed, 1158 insertions(+), 255 deletions(-) create mode 100644 lib/src/backbone/cert_helper.dart create mode 100644 lib/src/data/upload_impl.dart create mode 100644 lib/src/domain/entities/progress.dart create mode 100644 lib/src/domain/entities/upload.dart create mode 100644 lib/src/domain/entities/upload_file.dart create mode 100644 lib/src/domain/entities/upload_response.dart create mode 100644 lib/src/domain/entities/upload_task.dart diff --git a/.idea/libraries/Dart_Packages.xml b/.idea/libraries/Dart_Packages.xml index 5ff8c3c..ffe261e 100644 --- a/.idea/libraries/Dart_Packages.xml +++ b/.idea/libraries/Dart_Packages.xml @@ -296,6 +296,13 @@ + + + + + + @@ -464,6 +471,13 @@ + + + + + + @@ -744,6 +758,7 @@ + @@ -768,6 +783,7 @@ + diff --git a/lib/src/backbone/builder/request_dialog_message.dart b/lib/src/backbone/builder/request_dialog_message.dart index faaf205..4239f02 100644 --- a/lib/src/backbone/builder/request_dialog_message.dart +++ b/lib/src/backbone/builder/request_dialog_message.dart @@ -1,46 +1,46 @@ -import 'package:webitel_portal_sdk/src/domain/entities/dialog_message_request.dart'; -import 'package:webitel_portal_sdk/src/domain/entities/media_file_request.dart'; - -final class RequestDialogMessageBuilder { - late String _content; - late String _requestId; - late String _chatId; - late String _messageId; - - late MediaFileRequest _file; - - RequestDialogMessageBuilder setContent(String content) { - _content = content; - return this; - } - - RequestDialogMessageBuilder setRequestId(String requestId) { - _requestId = requestId; - return this; - } - - RequestDialogMessageBuilder setChatId(String chatId) { - _chatId = chatId; - return this; - } - - RequestDialogMessageBuilder setMessageId(String messageId) { - _messageId = messageId; - return this; - } - - RequestDialogMessageBuilder setFile(MediaFileRequest file) { - _file = file; - return this; - } - - DialogMessageRequest build() { - return DialogMessageRequest( - chatId: _chatId, - content: _content, - requestId: _requestId, - messageId: _messageId, - file: _file, - ); - } -} +// import 'package:webitel_portal_sdk/src/domain/entities/dialog_message_request.dart'; +// import 'package:webitel_portal_sdk/src/domain/entities/media_file_request.dart'; +// +// final class RequestDialogMessageBuilder { +// late String _content; +// late String _requestId; +// late String _chatId; +// late String _messageId; +// +// late MediaFileRequest _file; +// +// RequestDialogMessageBuilder setContent(String content) { +// _content = content; +// return this; +// } +// +// RequestDialogMessageBuilder setRequestId(String requestId) { +// _requestId = requestId; +// return this; +// } +// +// RequestDialogMessageBuilder setChatId(String chatId) { +// _chatId = chatId; +// return this; +// } +// +// RequestDialogMessageBuilder setMessageId(String messageId) { +// _messageId = messageId; +// return this; +// } +// +// RequestDialogMessageBuilder setFile(MediaFileRequest file) { +// _file = file; +// return this; +// } +// +// DialogMessageRequest build() { +// return DialogMessageRequest( +// chatId: _chatId, +// content: _content, +// requestId: _requestId, +// messageId: _messageId, +// file: _file, +// ); +// } +// } diff --git a/lib/src/backbone/cert_helper.dart b/lib/src/backbone/cert_helper.dart new file mode 100644 index 0000000..3ababee --- /dev/null +++ b/lib/src/backbone/cert_helper.dart @@ -0,0 +1,8 @@ +import 'dart:convert'; + +// Function to convert PEM string to List +List pemToBytes(String pem) { + final lines = pem.split('\n'); + final base64 = lines.where((line) => !line.startsWith('-----')).join(); + return base64Decode(base64); +} diff --git a/lib/src/backbone/constants.dart b/lib/src/backbone/constants.dart index f076f76..76c0e69 100644 --- a/lib/src/backbone/constants.dart +++ b/lib/src/backbone/constants.dart @@ -36,3 +36,76 @@ final class Constants { /// The default timeout duration for requests. static const Duration requestTimeout = Duration(seconds: 5); } + +//Solva cert +const String pemCertificate = ''' +-----BEGIN CERTIFICATE----- +MIIGEzCCA/ugAwIBAgIQfVtRJrR2uhHbdBYLvFMNpzANBgkqhkiG9w0BAQwFADCB +iDELMAkGA1UEBhMCVVMxEzARBgNVBAgTCk5ldyBKZXJzZXkxFDASBgNVBAcTC0pl +cnNleSBDaXR5MR4wHAYDVQQKExVUaGUgVVNFUlRSVVNUIE5ldHdvcmsxLjAsBgNV +BAMTJVVTRVJUcnVzdCBSU0EgQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkwHhcNMTgx +MTAyMDAwMDAwWhcNMzAxMjMxMjM1OTU5WjCBjzELMAkGA1UEBhMCR0IxGzAZBgNV +BAgTEkdyZWF0ZXIgTWFuY2hlc3RlcjEQMA4GA1UEBxMHU2FsZm9yZDEYMBYGA1UE +ChMPU2VjdGlnbyBMaW1pdGVkMTcwNQYDVQQDEy5TZWN0aWdvIFJTQSBEb21haW4g +VmFsaWRhdGlvbiBTZWN1cmUgU2VydmVyIENBMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEA1nMz1tc8INAA0hdFuNY+B6I/x0HuMjDJsGz99J/LEpgPLT+N +TQEMgg8Xf2Iu6bhIefsWg06t1zIlk7cHv7lQP6lMw0Aq6Tn/2YHKHxYyQdqAJrkj +eocgHuP/IJo8lURvh3UGkEC0MpMWCRAIIz7S3YcPb11RFGoKacVPAXJpz9OTTG0E +oKMbgn6xmrntxZ7FN3ifmgg0+1YuWMQJDgZkW7w33PGfKGioVrCSo1yfu4iYCBsk +Haswha6vsC6eep3BwEIc4gLw6uBK0u+QDrTBQBbwb4VCSmT3pDCg/r8uoydajotY +uK3DGReEY+1vVv2Dy2A0xHS+5p3b4eTlygxfFQIDAQABo4IBbjCCAWowHwYDVR0j +BBgwFoAUU3m/WqorSs9UgOHYm8Cd8rIDZsswHQYDVR0OBBYEFI2MXsRUrYrhd+mb ++ZsF4bgBjWHhMA4GA1UdDwEB/wQEAwIBhjASBgNVHRMBAf8ECDAGAQH/AgEAMB0G +A1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAbBgNVHSAEFDASMAYGBFUdIAAw +CAYGZ4EMAQIBMFAGA1UdHwRJMEcwRaBDoEGGP2h0dHA6Ly9jcmwudXNlcnRydXN0 +LmNvbS9VU0VSVHJ1c3RSU0FDZXJ0aWZpY2F0aW9uQXV0aG9yaXR5LmNybDB2Bggr +BgEFBQcBAQRqMGgwPwYIKwYBBQUHMAKGM2h0dHA6Ly9jcnQudXNlcnRydXN0LmNv +bS9VU0VSVHJ1c3RSU0FBZGRUcnVzdENBLmNydDAlBggrBgEFBQcwAYYZaHR0cDov +L29jc3AudXNlcnRydXN0LmNvbTANBgkqhkiG9w0BAQwFAAOCAgEAMr9hvQ5Iw0/H +ukdN+Jx4GQHcEx2Ab/zDcLRSmjEzmldS+zGea6TvVKqJjUAXaPgREHzSyrHxVYbH +7rM2kYb2OVG/Rr8PoLq0935JxCo2F57kaDl6r5ROVm+yezu/Coa9zcV3HAO4OLGi +H19+24rcRki2aArPsrW04jTkZ6k4Zgle0rj8nSg6F0AnwnJOKf0hPHzPE/uWLMUx +RP0T7dWbqWlod3zu4f+k+TY4CFM5ooQ0nBnzvg6s1SQ36yOoeNDT5++SR2RiOSLv +xvcRviKFxmZEJCaOEDKNyJOuB56DPi/Z+fVGjmO+wea03KbNIaiGCpXZLoUmGv38 +sbZXQm2V0TP2ORQGgkE49Y9Y3IBbpNV9lXj9p5v//cWoaasm56ekBYdbqbe4oyAL +l6lFhd2zi+WJN44pDfwGF/Y4QA5C5BIG+3vzxhFoYt/jmPQT2BVPi7Fp2RBgvGQq +6jG35LWjOhSbJuMLe/0CjraZwTiXWTb2qHSihrZe68Zk6s+go/lunrotEbaGmAhY +LcmsJWTyXnW0OMGuf1pGg+pRyrbxmRE1a6Vqe8YAsOf4vmSyrcjC8azjUeqkk+B5 +yOGBQMkKW+ESPMFgKuOXwIlCypTPRpgSabuY0MLTDXJLR27lk8QyKGOHQ+SwMj4K +00u/I5sUKUErmgQfky3xxzlIPK1aEn8= +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIF3jCCA8agAwIBAgIQAf1tMPyjylGoG7xkDjUDLTANBgkqhkiG9w0BAQwFADCB +iDELMAkGA1UEBhMCVVMxEzARBgNVBAgTCk5ldyBKZXJzZXkxFDASBgNVBAcTC0pl +cnNleSBDaXR5MR4wHAYDVQQKExVUaGUgVVNFUlRSVVNUIE5ldHdvcmsxLjAsBgNV +BAMTJVVTRVJUcnVzdCBSU0EgQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkwHhcNMTAw +MjAxMDAwMDAwWhcNMzgwMTE4MjM1OTU5WjCBiDELMAkGA1UEBhMCVVMxEzARBgNV +BAgTCk5ldyBKZXJzZXkxFDASBgNVBAcTC0plcnNleSBDaXR5MR4wHAYDVQQKExVU +aGUgVVNFUlRSVVNUIE5ldHdvcmsxLjAsBgNVBAMTJVVTRVJUcnVzdCBSU0EgQ2Vy +dGlmaWNhdGlvbiBBdXRob3JpdHkwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIK +AoICAQCAEmUXNg7D2wiz0KxXDXbtzSfTTK1Qg2HiqiBNCS1kCdzOiZ/MPans9s/B +3PHTsdZ7NygRK0faOca8Ohm0X6a9fZ2jY0K2dvKpOyuR+OJv0OwWIJAJPuLodMkY +tJHUYmTbf6MG8YgYapAiPLz+E/CHFHv25B+O1ORRxhFnRghRy4YUVD+8M/5+bJz/ +Fp0YvVGONaanZshyZ9shZrHUm3gDwFA66Mzw3LyeTP6vBZY1H1dat//O+T23LLb2 +VN3I5xI6Ta5MirdcmrS3ID3KfyI0rn47aGYBROcBTkZTmzNg95S+UzeQc0PzMsNT +79uq/nROacdrjGCT3sTHDN/hMq7MkztReJVni+49Vv4M0GkPGw/zJSZrM233bkf6 +c0Plfg6lZrEpfDKEY1WJxA3Bk1QwGROs0303p+tdOmw1XNtB1xLaqUkL39iAigmT +Yo61Zs8liM2EuLE/pDkP2QKe6xJMlXzzawWpXhaDzLhn4ugTncxbgtNMs+1b/97l +c6wjOy0AvzVVdAlJ2ElYGn+SNuZRkg7zJn0cTRe8yexDJtC/QV9AqURE9JnnV4ee +UB9XVKg+/XRjL7FQZQnmWEIuQxpMtPAlR1n6BB6T1CZGSlCBst6+eLf8ZxXhyVeE +Hg9j1uliutZfVS7qXMYoCAQlObgOK6nyTJccBz8NUvXt7y+CDwIDAQABo0IwQDAd +BgNVHQ4EFgQUU3m/WqorSs9UgOHYm8Cd8rIDZsswDgYDVR0PAQH/BAQDAgEGMA8G +A1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEMBQADggIBAFzUfA3P9wF9QZllDHPF +Up/L+M+ZBn8b2kMVn54CVVeWFPFSPCeHlCjtHzoBN6J2/FNQwISbxmtOuowhT6KO +VWKR82kV2LyI48SqC/3vqOlLVSoGIG1VeCkZ7l8wXEskEVX/JJpuXior7gtNn3/3 +ATiUFJVDBwn7YKnuHKsSjKCaXqeYalltiz8I+8jRRa8YFWSQEg9zKC7F4iRO/Fjs +8PRF/iKz6y+O0tlFYQXBl2+odnKPi4w2r78NBc5xjeambx9spnFixdjQg3IM8WcR +iQycE0xyNN+81XHfqnHd4blsjDwSXWXavVcStkNr/+XeTWYRUc+ZruwXtuhxkYze +Sf7dNXGiFSeUHM9h4ya7b6NnJSFd5t0dCy5oGzuCr+yDZ4XUmFF0sbmZgIn/f3gZ +XHlKYC6SQK5MNyosycdiyA5d9zZbyuAlJQG03RoHnHcAP9Dc1ew91Pq7P8yF1m9/ +qS3fuQL39ZeatTXaw2ewh0qpKJ4jjv9cJ2vhsE/zB+4ALtRZh8tSQZXq9EfX7mRB +VXyNWQKV3WKdwrnuWih0hKWbt5DHDAff9Yk2dDLWKMGwsAvgnEzDHNb842m1R0aB +L6KCq9NjRHDEjf8tM7qtj3u1cIiuPhnPQCjY/MiQu12ZIvVS5ljFH4gxQ+6IHdfG +jjxDah2nGN59PRbxYvnKkKj9 +-----END CERTIFICATE----- +'''; diff --git a/lib/src/backbone/helper/message.dart b/lib/src/backbone/helper/message.dart index f757050..8bad70c 100644 --- a/lib/src/backbone/helper/message.dart +++ b/lib/src/backbone/helper/message.dart @@ -40,7 +40,7 @@ final class MessageHelper { /// [dialogMessageRequestEntity] The dialog message request to be checked. static MessageType determineMessageTypeRequest( DialogMessageRequest dialogMessageRequestEntity) { - if (dialogMessageRequestEntity.file.name.isNotEmpty) { + if (dialogMessageRequestEntity.uploadFile.id != '') { return MessageType.media; } else { return MessageType.text; diff --git a/lib/src/data/dialog_impl.dart b/lib/src/data/dialog_impl.dart index 62bc1f6..c868b28 100644 --- a/lib/src/data/dialog_impl.dart +++ b/lib/src/data/dialog_impl.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:io'; import 'package:injectable/injectable.dart'; import 'package:webitel_portal_sdk/src/domain/entities/call_error.dart'; @@ -6,10 +7,10 @@ import 'package:webitel_portal_sdk/src/domain/entities/dialog.dart'; import 'package:webitel_portal_sdk/src/domain/entities/dialog_message_request.dart'; import 'package:webitel_portal_sdk/src/domain/entities/dialog_message_response.dart'; import 'package:webitel_portal_sdk/src/domain/entities/download.dart'; -import 'package:webitel_portal_sdk/src/domain/entities/media_file_request.dart'; -import 'package:webitel_portal_sdk/src/domain/entities/media_file_response.dart'; import 'package:webitel_portal_sdk/src/domain/entities/portal_chat_member.dart'; import 'package:webitel_portal_sdk/src/domain/entities/postback.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload_file.dart'; import 'package:webitel_portal_sdk/src/domain/services/chat_service.dart'; import 'package:webitel_portal_sdk/src/injection/injection.dart'; @@ -20,13 +21,13 @@ final class DialogImpl implements Dialog { @override final String id; - /// The top message in the dialog. + /// The current status of the dialog whether it's closed. @override - final String topMessage; + final bool isClosed; - /// Flag indicating whether the chat is closed or not. + /// The top message in the dialog. @override - final bool isClosed; + final String topMessage; /// The error associated with the dialog, if any. @override @@ -36,11 +37,11 @@ final class DialogImpl implements Dialog { @override final Stream onNewMessage; - /// Stream [PortalChatMember] for new member added to the chat.. + /// Stream [PortalChatMember] for new member added to the chat. @override final Stream onMemberAdded; - /// Stream [PortalChatMember] for member left the chat.. + /// Stream [PortalChatMember] for member left the chat. @override final Stream onMemberLeft; @@ -48,11 +49,6 @@ final class DialogImpl implements Dialog { late final ChatService _chatService; /// Constructor for initializing a [DialogImpl] instance. - /// - /// [id] The unique identifier for the dialog. - /// [topMessage] The top message in the dialog. - /// [onNewMessage] The stream for new messages in the dialog. - /// [error] The error associated with the dialog, if any. DialogImpl({ required this.id, required this.topMessage, @@ -76,19 +72,9 @@ final class DialogImpl implements Dialog { onMemberAdded = Stream.empty(); /// Sends a message in the dialog. - /// - /// [mediaType] The type of the media (optional). - /// [mediaName] The name of the media (optional). - /// [mediaData] The stream of media data (optional). - /// [content] The content of the message. - /// [requestId] The unique identifier for the request. - /// - /// Returns a [Future] that completes with a [DialogMessageResponse]. @override Future sendMessage({ - String? mediaType, - String? mediaName, - Stream>? mediaData, + UploadFile? uploadFile, required String content, required String requestId, }) async { @@ -97,22 +83,16 @@ final class DialogImpl implements Dialog { message: DialogMessageRequest( content: content, requestId: requestId, - file: MediaFileRequest( - data: mediaData ?? Stream>.empty(), - name: mediaName ?? '', - type: mediaType ?? '', - requestId: requestId, + uploadFile: UploadFile( + name: uploadFile?.name ?? '', + type: uploadFile?.type ?? '', + id: uploadFile?.id ?? '', ), ), ); } /// Sends a postback in the dialog. - /// - /// [postback] The postback object, - /// [requestId] The unique identifier for the request. - /// - /// Returns a [Future] that completes with a [DialogMessageResponse]. @override Future sendPostback({ required Postback postback, @@ -126,11 +106,6 @@ final class DialogImpl implements Dialog { } /// Fetches messages in the dialog. - /// - /// [limit] The maximum number of messages to fetch (optional). - /// [offset] The offset from which to start fetching messages (optional). - /// - /// Returns a [Future] that completes with a list of [DialogMessageResponse]. @override Future> fetchMessages({ int? limit, @@ -144,11 +119,6 @@ final class DialogImpl implements Dialog { } /// Fetches updates in the dialog. - /// - /// [limit] The maximum number of updates to fetch (optional). - /// [offset] The offset from which to start fetching updates (optional). - /// - /// Returns a [Future] that completes with a list of [DialogMessageResponse]. @override Future> fetchUpdates({ int? limit, @@ -162,14 +132,27 @@ final class DialogImpl implements Dialog { } /// Downloads a media file associated with a dialog. - /// - /// [fileId] The ID of the file to be downloaded. - /// - /// Returns a stream of [MediaFileResponse] representing the downloaded file. @override Download downloadFile({required String fileId, int? offset}) => _chatService.downloadFile( fileId: fileId, offset: offset, ); + + /// Uploads a media file associated with a dialog. + @override + Upload uploadFile({ + required String mediaType, + required String mediaName, + required File file, + String? pid, + int? offset, + }) => + _chatService.uploadFile( + mediaType: mediaType, + mediaName: mediaName, + file: file, + pid: pid, + offset: offset, + ); } diff --git a/lib/src/data/download_impl.dart b/lib/src/data/download_impl.dart index 7ad2ad4..24ad5ba 100644 --- a/lib/src/data/download_impl.dart +++ b/lib/src/data/download_impl.dart @@ -1 +1 @@ -import 'dart:async'; import 'package:injectable/injectable.dart'; import 'package:webitel_portal_sdk/src/domain/entities/download.dart'; import 'package:webitel_portal_sdk/src/domain/entities/media_file_response.dart'; import 'package:webitel_portal_sdk/src/domain/services/chat_service.dart'; import 'package:webitel_portal_sdk/src/generated/portal/media.pbgrpc.dart'; import 'package:webitel_portal_sdk/src/injection/injection.dart'; /// Implementation of the [Download] interface, representing a file download operation. @LazySingleton(as: Download) final class DownloadImpl implements Download { /// The ID of the file being downloaded. final String fileId; /// The current offset in bytes of the download. int offset; /// The subscription for the download stream. StreamSubscription? subscription; @override final StreamController onData; late final ChatService _chatService; /// Constructs a [DownloadImpl] with the given file ID, offset, and data stream. /// /// [fileId] is the ID of the file being downloaded. /// [offset] is the current offset in bytes of the download. /// [onData] is a stream of [MediaFileResponse] providing updates on the download progress. DownloadImpl({ this.subscription, required this.offset, required this.onData, required this.fileId, }) { _chatService = getIt.get(); } /// Pauses the download operation. /// /// Cancels the current subscription and delegates the pause operation to the [ChatService]. @override Future pause() async { await subscription?.cancel(); _chatService.pauseDownload(fileId: fileId, subscription: subscription!); } /// Resumes the download operation. /// /// Delegates the resume operation to the [ChatService]. @override Future resume() async { _chatService.resumeDownload( fileId: fileId, controller: onData, offset: offset, download: this, ); } /// Updates the offset of the download. /// /// [newOffset] is the new offset in bytes to update. void updateOffset(int newOffset) { offset = newOffset; } /// Sets the subscription for the download. /// /// [newSubscription] is the new [StreamSubscription] to set. void setSubscription(StreamSubscription newSubscription) { subscription = newSubscription; } } \ No newline at end of file +import 'dart:async'; import 'package:injectable/injectable.dart'; import 'package:webitel_portal_sdk/src/domain/entities/download.dart'; import 'package:webitel_portal_sdk/src/domain/entities/media_file_response.dart'; import 'package:webitel_portal_sdk/src/domain/services/chat_service.dart'; import 'package:webitel_portal_sdk/src/generated/portal/media.pbgrpc.dart'; import 'package:webitel_portal_sdk/src/injection/injection.dart'; /// Implementation of the [Download] interface, representing a file download operation. @LazySingleton(as: Download) final class DownloadImpl implements Download { /// The ID of the file being downloaded. final String fileId; /// The current offset in bytes of the download. int offset; /// The subscription for the download stream. StreamSubscription? subscription; @override final StreamController onData; late final ChatService _chatService; /// Constructs a [DownloadImpl] with the given file ID, offset, and data stream. /// /// [fileId] is the ID of the file being downloaded. /// [offset] is the current offset in bytes of the download. /// [onData] is a stream of [MediaFileResponse] providing updates on the download progress. DownloadImpl({ this.subscription, required this.offset, required this.onData, required this.fileId, }) { _chatService = getIt.get(); } /// Pauses the download operation. /// /// Cancels the current subscription and delegates the pause operation to the [ChatService]. @override Future pause() async { await subscription?.cancel(); } /// Resumes the download operation. /// /// Delegates the resume operation to the [ChatService]. @override Future resume() async { _chatService.resumeDownload( fileId: fileId, controller: onData, offset: offset, download: this, ); } /// Updates the offset of the download. /// /// [newOffset] is the new offset in bytes to update. void updateOffset(int newOffset) { offset = newOffset; } /// Sets the subscription for the download. /// /// [newSubscription] is the new [StreamSubscription] to set. void setSubscription(StreamSubscription newSubscription) { subscription = newSubscription; } } \ No newline at end of file diff --git a/lib/src/data/grpc/grpc_channel.dart b/lib/src/data/grpc/grpc_channel.dart index 3f71181..6657672 100644 --- a/lib/src/data/grpc/grpc_channel.dart +++ b/lib/src/data/grpc/grpc_channel.dart @@ -195,7 +195,7 @@ class GrpcChannel { Future _reconnect() async { try { await retry( - () async { + () async { await _createChannel( url: _url, port: _port, diff --git a/lib/src/data/service_impl/auth_service_impl.dart b/lib/src/data/service_impl/auth_service_impl.dart index f1b5dab..e60949b 100644 --- a/lib/src/data/service_impl/auth_service_impl.dart +++ b/lib/src/data/service_impl/auth_service_impl.dart @@ -383,14 +383,11 @@ class AuthServiceImpl implements AuthService { try { // Send the logout request to the server. - final logoutRes = await _grpcChannel.customerStub.logout(LogoutRequest()); + await _grpcChannel.customerStub.logout(LogoutRequest()); log.info('User logged out successfully.'); - return res.PortalResponse( - status: PortalResponseStatus.success, - message: logoutRes.cause.message, - ); + return res.PortalResponse(status: PortalResponseStatus.success); } catch (err) { log.severe( 'Failed to logout the current user. Error: ${err.toString()}', err); diff --git a/lib/src/data/service_impl/chat_service_impl.dart b/lib/src/data/service_impl/chat_service_impl.dart index 0458a02..90cbf20 100644 --- a/lib/src/data/service_impl/chat_service_impl.dart +++ b/lib/src/data/service_impl/chat_service_impl.dart @@ -1,4 +1,6 @@ import 'dart:async'; +import 'dart:collection'; +import 'dart:io'; import 'package:fixnum/fixnum.dart' as fixnum; import 'package:grpc/grpc.dart'; @@ -19,12 +21,18 @@ import 'package:webitel_portal_sdk/src/data/dialog_impl.dart'; import 'package:webitel_portal_sdk/src/data/download_impl.dart'; import 'package:webitel_portal_sdk/src/data/grpc/grpc_channel.dart'; import 'package:webitel_portal_sdk/src/data/grpc/grpc_connect.dart'; +import 'package:webitel_portal_sdk/src/data/upload_impl.dart'; import 'package:webitel_portal_sdk/src/domain/entities/button.dart'; import 'package:webitel_portal_sdk/src/domain/entities/channel.dart'; import 'package:webitel_portal_sdk/src/domain/entities/connect.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/dialog_message_request.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/dialog_message_response.dart'; import 'package:webitel_portal_sdk/src/domain/entities/download.dart'; import 'package:webitel_portal_sdk/src/domain/entities/keyboard.dart'; -import 'package:webitel_portal_sdk/src/domain/entities/media_file_response.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/message_type.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/progress.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload_task.dart'; import 'package:webitel_portal_sdk/src/domain/services/chat_service.dart'; import 'package:webitel_portal_sdk/src/generated/chat/messages/dialog.pb.dart' as dialog; @@ -101,6 +109,10 @@ final class ChatServiceImpl implements ChatService { final Map> _onMemberLeftControllers = {}; + // Queue and state flag for uploads + final Queue _uploadQueue = Queue(); + bool _isUploading = false; + // Constructor for initializing the chat service with the required dependencies. ChatServiceImpl( this._grpcChannel, @@ -256,12 +268,12 @@ final class ChatServiceImpl implements ChatService { final onNewMemberAddedController = await getControllerForMemberAdded(dialog.id); - final onMemberLeftController = - await getControllerForMemberLeft(dialog.id); - final bool isClosed = (dialog.left != 0 || (dialog.left == 0 && dialog.join == 0)); + final onMemberLeftController = + await getControllerForMemberLeft(dialog.id); + return DialogImpl( isClosed: isClosed, topMessage: dialog.message.text, @@ -404,8 +416,8 @@ final class ChatServiceImpl implements ChatService { statusCode: statusCode, errorMessage: response.err.message, ), - isClosed: true, topMessage: 'ERROR', + isClosed: true, id: response.id, onNewMessage: Stream.empty(), onMemberAdded: Stream.empty(), @@ -422,6 +434,250 @@ final class ChatServiceImpl implements ChatService { return DialogImpl.initial(); } + // Add a task to the queue and start processing if not already uploading + void _addToUploadQueue({ + required String mediaType, + required String mediaName, + required File file, + String? pid, + int? offset, + required StreamController controller, + }) { + final completer = Completer(); + _uploadQueue.add( + UploadTask( + mediaType: mediaType, + mediaName: mediaName, + file: file, + pid: pid, + offset: offset, + controller: controller, + completer: completer, + ), + ); + + // Start processing the queue if not already uploading + if (!_isUploading) { + _processQueue(); + } + } + + // Process the upload queue sequentially + void _processQueue() { + if (_uploadQueue.isEmpty) { + _isUploading = false; + return; + } + + _isUploading = true; + final task = _uploadQueue.removeFirst(); + + final subscription = _grpcChannel.mediaStorageStub + .upload( + _uploadMediaStream( + pid: task.pid, + offset: task.offset, + file: task.file, + mediaName: task.mediaName, + mediaType: task.mediaType, + completer: task.completer, + ), + ) + .listen( + (progress) { + if (progress.hasPart()) { + log.info('Upload progress: ${progress.part.size} bytes uploaded.'); + task.controller.add( + UploadResponse( + progress: Progress( + progressSize: progress.part.size.toInt(), + progressId: progress.part.pid, + ), + ), + ); + if (!task.completer.isCompleted) { + task.completer.complete(); + } + task.completer = + Completer(); // Create a new completer for the next batch + } else if (progress.hasStat()) { + log.info('Upload completed for file: ${progress.stat.file.name}'); + task.controller.add( + UploadResponse( + id: progress.stat.file.id, + type: progress.stat.file.type, + name: progress.stat.file.name, + size: progress.stat.file.size.toInt(), + ), + ); + } + }, + onError: (error) { + log.severe('Upload error: $error'); + final callError = CallError( + statusCode: 'UPLOAD_ERROR', + errorMessage: error.toString(), + ); + task.controller.addError(callError); + task.controller.close(); + _processQueue(); // Continue to the next task in the queue + }, + onDone: () { + log.info('Upload stream closed.'); + task.controller.close(); + _processQueue(); // Continue to the next task in the queue + }, + cancelOnError: true, + ); + + // Set the subscription in the upload object to manage cancellation + final upload = UploadImpl( + onProgress: task.controller, + offset: task.offset ?? 0, + ); + upload.setSubscription(subscription); + } + + @override + Upload uploadFile({ + required String mediaType, + required String mediaName, + required File file, + String? pid, + int? offset, + }) { + // Create a StreamController for managing upload progress + final StreamController controller = + StreamController(); + + _addToUploadQueue( + mediaType: mediaType, + mediaName: mediaName, + file: file, + pid: pid, + offset: offset, + controller: controller, + ); + + return UploadImpl( + onProgress: controller, + offset: 0, + ); + } + + Stream _uploadMediaStream({ + required File file, + required String mediaName, + required String mediaType, + required Completer completer, + String? pid, + int? offset, + }) async* { + log.info('Starting upload stream for file: $mediaName of type: $mediaType'); + + // Existing code logic for managing chunk size and upload... + int chunkSize = 512; + final int minChunkSize = 100; + final int maxChunkSize = 131072; + final int maxDelayThreshold = 5000; + int currentOffset = offset ?? 0; + int chunkCount = 0; + final int fileLength = await file.length(); + int fastUploadCount = 0; + DateTime lastDecreaseTime = DateTime.now(); + final Duration decreaseCooldown = Duration(seconds: 3); + + List uploadSpeeds = []; + int adjustmentsCount = 0; + final int adjustmentInterval = 5; + + if (pid == null) { + yield UploadRequest( + new_2: UploadRequest_Start( + file: InputFile( + name: mediaName, + type: mediaType, + ), + progress: true, + ), + ); + } else { + log.info('Resuming upload with PID: $pid from offset: $currentOffset'); + } + + while (currentOffset < fileLength) { + int end = (currentOffset + chunkSize > fileLength) + ? fileLength + : currentOffset + chunkSize; + Stream> fileStream = file.openRead(currentOffset, end); + + await for (var bytes in fileStream) { + yield UploadRequest( + part: bytes, + pid: pid ?? '', + ); + } + + currentOffset += chunkSize; + chunkCount++; + + if (chunkCount == 3) { + final startTime = DateTime.now(); + await completer.future; + + final endTime = DateTime.now(); + int elapsedTimeMs = endTime.difference(startTime).inMilliseconds; + if (elapsedTimeMs == 0) { + elapsedTimeMs = 1; + } + + final currentSpeed = (chunkSize / elapsedTimeMs) * 1000; + uploadSpeeds.add(currentSpeed); + if (uploadSpeeds.length > 5) { + uploadSpeeds.removeAt(0); + } + + final averageSpeed = + uploadSpeeds.reduce((a, b) => a + b) / uploadSpeeds.length; + adjustmentsCount++; + if (adjustmentsCount >= adjustmentInterval) { + final currentTime = DateTime.now(); + + if (elapsedTimeMs > maxDelayThreshold && + currentTime.difference(lastDecreaseTime) > decreaseCooldown) { + chunkSize = + (chunkSize * 0.9).clamp(minChunkSize, maxChunkSize).toInt(); + log.warning( + 'Significant delay detected. Gradually reducing chunk size to: $chunkSize bytes.'); + lastDecreaseTime = currentTime; + } else if (averageSpeed > 50000) { + fastUploadCount++; + if (fastUploadCount >= 3) { + chunkSize = + (chunkSize * 1.15).clamp(minChunkSize, maxChunkSize).toInt(); + fastUploadCount = 0; + } + } else if (averageSpeed < 20000 && + currentTime.difference(lastDecreaseTime) > decreaseCooldown) { + chunkSize = + (chunkSize * 0.99).clamp(minChunkSize, maxChunkSize).toInt(); + log.warning( + 'Consistently slow speed. Gradually reducing chunk size to: $chunkSize bytes.'); + lastDecreaseTime = currentTime; + } + + log.info( + 'Elapsed time: $elapsedTimeMs ms, Average upload speed: ${averageSpeed.toStringAsFixed(2)} B/s, Adjusted chunk size: $chunkSize bytes'); + adjustmentsCount = 0; + } + + chunkCount = 0; + } + } + + log.info('Completed streaming upload requests for file: $mediaName'); + } + /// Downloads a media file associated with a dialog. /// /// [fileId] The ID of the file to be downloaded. @@ -527,8 +783,8 @@ final class ChatServiceImpl implements ChatService { } // Add the received data to the file - file!.bytes.clear(); - file!.bytes.addAll(mediaFile.data); + file!.bytes?.clear(); + file!.bytes?.addAll(mediaFile.data); offset += mediaFile.data.length; // Update the download offset @@ -545,7 +801,7 @@ final class ChatServiceImpl implements ChatService { "GrpcError encountered while downloading file '${file?.name ?? "unknown"}': ${err.message}"); // Attempt to resume the download if it was interrupted - if (file != null && offset < fixnum.Int64(file!.size)) { + if (file != null && offset < fixnum.Int64(file!.size ?? 0)) { log.info( "Attempting to resume file download for '${file!.name}' from offset $offset."); @@ -608,8 +864,8 @@ final class ChatServiceImpl implements ChatService { // Listen to the resumed media stream await for (MediaFile mediaFile in resumedMedia) { - file.bytes.clear(); - file.bytes.addAll(mediaFile.data); + file.bytes?.clear(); + file.bytes?.addAll(mediaFile.data); offset += mediaFile.data.length; log.info( @@ -632,20 +888,20 @@ final class ChatServiceImpl implements ChatService { ); } - /// Pauses the download of a media file. - /// - /// [fileId] The ID of the file to be paused. - /// [subscription] The StreamSubscription managing the download stream. - @override - Future pauseDownload({ - required String fileId, - required StreamSubscription subscription, - }) async { - // Cancel the subscription to pause the download - await subscription.cancel(); - - log.info("Paused download for file ID: $fileId"); - } + // /// Pauses the download of a media file. + // /// + // /// [fileId] The ID of the file to be paused. + // /// [subscription] The StreamSubscription managing the download stream. + // @override + // Future pauseDownload({ + // required String fileId, + // required StreamSubscription subscription, + // }) async { + // // Cancel the subscription to pause the download + // await subscription.cancel(); + // + // log.info("Paused download for file ID: $fileId"); + // } /// Resumes the download of a media file. /// @@ -776,6 +1032,7 @@ final class ChatServiceImpl implements ChatService { return buttonRow.row .map((button) { log.info('Processing button: $button'); + // Check if the code or url property is not empty if (button.code.isNotEmpty || button.url.isNotEmpty) { return Button( @@ -785,8 +1042,7 @@ final class ChatServiceImpl implements ChatService { ); } else { log.warning( - 'Skipping button with empty code or url - text:' - ' ${button.share.name}'); + 'Skipping button with empty code - ${button.text}'); return null; } }) @@ -871,10 +1127,7 @@ final class ChatServiceImpl implements ChatService { log.info("Sending message of type $messageType for user $userId"); final request = await _buildSendMessageRequest( - message, - userId ?? '', - messageType, - ); + message, userId ?? '', messageType, message.uploadFile); _grpcConnect.sendRequest(request); @@ -883,7 +1136,7 @@ final class ChatServiceImpl implements ChatService { userId ?? '', _handleSendMessageResponse, _handleSendMessageError, - ).timeout(const Duration(seconds: 5)); + ).timeout(const Duration(seconds: 30)); } on GrpcError catch (err) { log.severe("GRPC Error on sendMessage: ${err.message}"); @@ -1042,51 +1295,73 @@ final class ChatServiceImpl implements ChatService { /// [pid] The process ID for resuming incomplete uploads, if any. /// /// Yields a stream of [UploadRequest] messages. - Stream _uploadMediaStream({ - required Stream> data, - required String name, - required String type, - String? pid, - int? offset, - }) async* { - log.info( - 'Starting to stream UploadRequest with file name: $name and type: $type. Process ID: $pid, Offset: $offset'); - - if (pid != null) { - // Resume incomplete upload - log.info('Resuming incomplete upload with Process ID: $pid'); - - yield UploadRequest(pid: pid); - } else { - // Start new upload - log.info('Starting new upload for file: $name of type: $type'); - - yield UploadRequest( - new_2: UploadRequest_Start( - file: InputFile( - name: name, - type: type, - ), - )); - } - - int currentOffset = offset ?? 0; - - await for (var bytes in data) { - if (currentOffset > 0) { - log.info( - 'Resuming from offset: $currentOffset, skipping already uploaded bytes'); - - bytes = bytes.sublist(currentOffset); - currentOffset = 0; // Reset offset after using it once - } - log.info('Streaming data chunk of size: ${bytes.length} bytes.'); - - yield UploadRequest(part: bytes); - } - - log.info('Completed streaming UploadRequest messages for file: $name'); - } + // Stream _uploadMediaStream({ + // required Stream> data, + // required String name, + // required String type, + // String? pid, + // int? offset, + // required Function setProgressCompleter, + // }) async* { + // log.info( + // 'Starting to stream UploadRequest with file name: $name and type: $type. Process ID: $pid, Offset: $offset'); + // + // if (pid != null) { + // // Resume incomplete upload + // log.info('Resuming incomplete upload with Process ID: $pid'); + // yield UploadRequest(pid: pid); + // } else { + // // Start new upload + // log.info('Starting new upload for file: $name of type: $type'); + // yield UploadRequest( + // new_2: UploadRequest_Start( + // file: InputFile( + // name: name, + // type: type, + // ), + // ), + // ); + // } + // + // int currentOffset = offset ?? 0; + // List> chunkBuffer = []; + // + // await for (var bytes in data) { + // if (currentOffset > 0) { + // log.info( + // 'Resuming from offset: $currentOffset, skipping already uploaded bytes'); + // bytes = bytes.sublist(currentOffset); + // currentOffset = 0; // Reset offset after using it once + // } + // + // chunkBuffer.add(bytes); + // log.info('Buffered data chunk of size: ${bytes.length} bytes.'); + // + // // Buffer 10 chunks before yielding them + // if (chunkBuffer.length >= 10) { + // for (var chunk in chunkBuffer) { + // log.info('Streaming buffered chunk of size: ${chunk.length} bytes.'); + // yield UploadRequest(part: chunk); + // } + // + // // Clear buffer after sending + // chunkBuffer.clear(); + // + // // Set a new completer to wait for progress response + // final completer = Completer(); + // setProgressCompleter(completer); + // // await completer.future; // Wait for progress.hasPart + // } + // } + // + // // Stream remaining chunks in the buffer + // for (var chunk in chunkBuffer) { + // log.info('Streaming remaining chunk of size: ${chunk.length} bytes.'); + // yield UploadRequest(part: chunk); + // } + // + // log.info('Completed streaming UploadRequest messages for file: $name'); + // } /// Builds the request for sending a message. /// @@ -1099,6 +1374,7 @@ final class ChatServiceImpl implements ChatService { DialogMessageRequest message, String userId, MessageType messageType, + UploadFile? uploadFile, ) async { log.info( "Building request for sending a message. User ID: $userId, Message Type: $messageType, Message Content: ${message.content}"); @@ -1108,52 +1384,11 @@ final class ChatServiceImpl implements ChatService { ); if (messageType == MessageType.media) { - log.info( - "Detected outgoing media message. Preparing to upload media file: ${message.file.name} of type: ${message.file.type}"); - - String? pid; - int offset = 0; - - final r = RetryOptions(maxAttempts: 3); - - try { - await r.retry( - () async { - await for (var progress in _grpcChannel.mediaStorageStub.upload( - _uploadMediaStream( - data: message.file.data, - name: message.file.name, - type: message.file.type, - pid: pid, - offset: offset, - ), - )) { - if (progress.hasPart()) { - pid = progress.part.pid; - offset = progress.part.size.toInt(); - - log.info( - 'Upload progress update: Process ID=$pid, Uploaded Size=$offset bytes'); - } else if (progress.hasStat()) { - log.info( - 'Upload complete. File details - Name: ${progress.stat.file.name}, ID: ${progress.stat.file.id}, Type: ${progress.stat.file.type}'); - - baseRequest.file = file.File( - id: progress.stat.file.id, - name: progress.stat.file.name, - type: progress.stat.file.type, - ); - } - } - }, - retryIf: (err) => err is GrpcError, - onRetry: (err) => log.warning( - 'Encountered error during upload: $err. Retrying upload...'), - ); - } catch (err) { - log.severe( - 'Failed to upload media file: ${message.file.name} after 3 retry attempts. Error: $err'); - } + baseRequest.file = file.File( + id: uploadFile?.id.toString(), + name: uploadFile?.name, + type: uploadFile?.type, + ); } log.info( @@ -1518,3 +1753,335 @@ final class ChatServiceImpl implements ChatService { ); } } + +/// Uploads a media file to be sent in the dialog. +/// +/// [mediaType] The type of the media to be uploaded. +/// [mediaName] The name of the media to be uploaded. +/// [mediaData] The data stream of the media to be uploaded. +/// +/// Returns an [Upload] object representing the upload operation. +// @override +// Upload uploadFile({ +// required String mediaType, +// required String mediaName, +// required File file, +// String? pid, +// int? offset, +// }) { +// // Create a StreamController for managing upload progress +// final StreamController controller = +// StreamController(); +// +// Completer completer = Completer(); +// +// // Initialize the UploadImpl to manage the upload process +// final UploadImpl upload = UploadImpl( +// onProgress: controller, +// offset: 0, +// ); +// +// // Start the upload process +// final subscription = _grpcChannel.mediaStorageStub +// .upload( +// _uploadMediaStream( +// pid: pid, +// offset: offset, +// file: file, +// mediaName: mediaName, +// mediaType: mediaType, +// completer: completer, +// ), +// ) +// .listen( +// (progress) { +// if (progress.hasPart()) { +// log.info('Upload progress: ${progress.part.size} bytes uploaded.'); +// upload.updateOffset(progress.part.size.toInt()); +// controller.add( +// UploadResponse( +// progress: Progress( +// progressSize: progress.part.size.toInt(), +// progressId: progress.part.pid, +// ), +// ), +// ); +// // Complete the current completer and create a new one for the next batch +// if (!completer.isCompleted) { +// completer.complete(); +// } +// completer = +// Completer(); // Create a new completer for the next batch +// } else if (progress.hasStat()) { +// log.info('Upload completed for file: ${progress.stat.file.name}'); +// //16337697 +// //16337697 +// //16337697 +// //16337697 +//16337697 +// controller.add( +// UploadResponse( +// id: progress.stat.file.id, +// type: progress.stat.file.type, +// name: progress.stat.file.name, +// size: progress.stat.file.size.toInt(), +// ), +// ); +// } +// }, +// onError: (error) { +// log.severe('Upload error: $error'); +// final callError = CallError( +// statusCode: 'UPLOAD_ERROR', +// errorMessage: error.toString(), +// ); +// upload.setError(callError); // Set the error in the upload +// controller.addError(callError); +// controller.close(); +// }, +// onDone: () { +// log.info('Upload stream closed.'); +// controller.close(); +// }, +// cancelOnError: true, +// ); +// +// // Set the subscription in the upload object to manage cancellation +// upload.setSubscription(subscription); +// +// return upload; +// } +// +// Stream _uploadMediaStream({ +// required File file, +// required String mediaName, +// required String mediaType, +// required Completer completer, +// String? pid, +// int? offset, +// }) async* { +// log.info('Starting upload stream for file: $mediaName of type: $mediaType'); +// +// int chunkSize = 512; // Start with 0.5 KB chunk size +// final int minChunkSize = 100; // 0.1 KB +// final int maxChunkSize = 131072; // 128 KB +// final int maxDelayThreshold = +// 5000; // 5 seconds delay threshold for upload adjustment +// int currentOffset = offset ?? 0; // Use the provided offset if it's not null +// int chunkCount = 0; +// final int fileLength = await file.length(); // Get the total file size +// int fastUploadCount = 0; // To count consecutive fast uploads +// DateTime lastDecreaseTime = +// DateTime.now(); // To track the last time chunk size was decreased +// final Duration decreaseCooldown = +// Duration(seconds: 3); // 3-second cooldown for decreases +// +// // List to store recent upload speeds for averaging +// List uploadSpeeds = []; +// int adjustmentsCount = +// 0; // Track number of adjustments to implement a grace period +// final int adjustmentInterval = +// 5; // Number of chunks before adjusting the size again +// +// // Only send the initial upload request if `pid` is null +// if (pid == null) { +// yield UploadRequest( +// new_2: UploadRequest_Start( +// file: InputFile( +// name: mediaName, +// type: mediaType, +// ), +// progress: true, +// ), +// ); +// } else { +// // If `pid` is not null, log that we are resuming the upload +// log.info('Resuming upload with PID: $pid from offset: $currentOffset'); +// } +// +// while (currentOffset < fileLength) { +// // Calculate the end position for the current chunk +// int end = (currentOffset + chunkSize > fileLength) +// ? fileLength +// : currentOffset + chunkSize; +// +// // Read the file chunk from 'currentOffset' to 'end' +// Stream> fileStream = file.openRead(currentOffset, end); +// +// await for (var bytes in fileStream) { +// // Include `pid` in the upload request if it is not null +// yield UploadRequest( +// part: bytes, +// pid: pid ?? '', +// ); +// } +// +// // Update the offset for the next chunk +// currentOffset += chunkSize; +// chunkCount++; +// +// // Wait for the completer after every 3 chunks +// if (chunkCount == 3) { +// final startTime = DateTime.now(); +// await completer.future; +// +// final endTime = DateTime.now(); +// int elapsedTimeMs = endTime.difference(startTime).inMilliseconds; +// +// // Prevent zero division +// if (elapsedTimeMs == 0) { +// elapsedTimeMs = 1; // Set a minimum value to avoid division by zero +// } +// +// // Calculate upload speed (bytes per second) for the current chunk +// final currentSpeed = (chunkSize / elapsedTimeMs) * 1000; +// +// // Add the current speed to the list and limit its size to 5 for averaging +// uploadSpeeds.add(currentSpeed); +// if (uploadSpeeds.length > 5) { +// uploadSpeeds.removeAt(0); +// } +// +// // Calculate the average speed over the stored speeds +// final averageSpeed = +// uploadSpeeds.reduce((a, b) => a + b) / uploadSpeeds.length; +// +// // Adjust chunk size based on average speed only after every `adjustmentInterval` chunks +// adjustmentsCount++; +// if (adjustmentsCount >= adjustmentInterval) { +// final currentTime = DateTime.now(); +// +// if (elapsedTimeMs > maxDelayThreshold && +// currentTime.difference(lastDecreaseTime) > decreaseCooldown) { +// // If there's a substantial delay and cooldown period has passed, reduce chunk size smoothly +// chunkSize = (chunkSize * 0.9) +// .clamp(minChunkSize, maxChunkSize) +// .toInt(); // Decrease by 10% +// log.warning( +// 'Significant delay detected. Gradually reducing chunk size to: $chunkSize bytes.'); +// lastDecreaseTime = currentTime; // Update last decrease time +// } else if (averageSpeed > 50000) { +// fastUploadCount++; +// if (fastUploadCount >= 3) { +// // Increase chunk size only after 3 consecutive fast uploads +// chunkSize = (chunkSize * 1.15) +// .clamp(minChunkSize, maxChunkSize) +// .toInt(); // Gradual increase by 15% +// fastUploadCount = 0; // Reset fast upload count after increasing +// } +// } else if (averageSpeed < 20000 && +// currentTime.difference(lastDecreaseTime) > decreaseCooldown) { +// // If speed is consistently slow and cooldown period has passed, decrease the chunk size smoothly +// chunkSize = (chunkSize * 0.99) +// .clamp(minChunkSize, maxChunkSize) +// .toInt(); // Decrease by 10% +// log.warning( +// 'Consistently slow speed. Gradually reducing chunk size to: $chunkSize bytes.'); +// lastDecreaseTime = currentTime; // Update last decrease time +// } +// +// log.info( +// 'Elapsed time: $elapsedTimeMs ms, Average upload speed: ${averageSpeed.toStringAsFixed(2)} B/s, Adjusted chunk size: $chunkSize bytes'); +// +// adjustmentsCount = +// 0; // Reset the adjustment count to implement the grace period +// } +// +// chunkCount = 0; // Reset the chunk count +// } +// } +// +// log.info('Completed streaming upload requests for file: $mediaName'); +// } + +// Stream _uploadMediaStream({ +// required File file, +// required String mediaName, +// required String mediaType, +// required Completer completer, +// String? pid, +// int? offset, +// }) async* { +// log.info('Starting upload stream for file: $mediaName of type: $mediaType'); +// +// int chunkSize = 512; // Start with 0.5 KB chunk size +// final int minChunkSize = 100; // 0.1 KB +// final int maxChunkSize = 40960; // 40 KB +// int currentOffset = +// offset ?? 0; // To keep track of the current file read position +// int chunkCount = 0; +// final int fileLength = await file.length(); // Get the total file size +// +// // Only send the initial upload request if `pid` is null +// if (pid == null) { +// yield UploadRequest( +// new_2: UploadRequest_Start( +// file: InputFile( +// name: mediaName, +// type: mediaType, +// ), +// progress: true, +// ), +// ); +// } else { +// // If `pid` is not null, log that we are resuming the upload +// log.info('Resuming upload with PID: $pid from offset: $currentOffset'); +// } +// +// while (currentOffset < fileLength) { +// // Calculate the end position for the current chunk +// int end = (currentOffset + chunkSize > fileLength) +// ? fileLength +// : currentOffset + chunkSize; +// +// // Read the file chunk from 'offset' to 'end' +// Stream> fileStream = file.openRead(currentOffset, end); +// +// await for (var bytes in fileStream) { +// // Include `pid` in the upload request if it is not null +// yield UploadRequest( +// part: bytes, +// pid: pid ?? '', +// ); +// } +// +// // Update the offset for the next chunk +// currentOffset += chunkSize; +// chunkCount++; +// +// // Wait for the completer after every 3 chunks +// if (chunkCount == 3) { +// final startTime = DateTime.now(); +// await completer.future; +// +// final endTime = DateTime.now(); +// int elapsedTimeMs = endTime.difference(startTime).inMilliseconds; +// +// // Prevent zero division +// if (elapsedTimeMs == 0) { +// elapsedTimeMs = 1; // Set a minimum value to avoid division by zero +// } +// +// // Calculate upload speed (bytes per second) +// final uploadSpeed = (chunkSize / elapsedTimeMs) * 1000; +// +// // Adjust chunk size based on upload speed +// if (uploadSpeed > 50000) { +// // If speed is greater than 50KB/s, increase the chunk size +// chunkSize = +// (chunkSize * 1.25).clamp(minChunkSize, maxChunkSize).toInt(); +// } else if (uploadSpeed < 20000) { +// // If speed is less than 20KB/s, decrease the chunk size +// chunkSize = +// (chunkSize / 1.25).clamp(minChunkSize, maxChunkSize).toInt(); +// } +// +// log.info( +// 'Elapsed time: $elapsedTimeMs ms, Upload speed: ${uploadSpeed.toStringAsFixed(2)} B/s, Adjusted chunk size: $chunkSize bytes'); +// chunkCount = 0; // Reset the chunk count +// } +// } +// +// log.info('Completed streaming upload requests for file: $mediaName'); +// } +// } diff --git a/lib/src/data/upload_impl.dart b/lib/src/data/upload_impl.dart new file mode 100644 index 0000000..3ff37a0 --- /dev/null +++ b/lib/src/data/upload_impl.dart @@ -0,0 +1,74 @@ +import 'dart:async'; + +import 'package:injectable/injectable.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/call_error.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/media_file_response.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload_response.dart'; +import 'package:webitel_portal_sdk/src/domain/services/chat_service.dart'; +import 'package:webitel_portal_sdk/src/generated/portal/media.pbgrpc.dart'; +import 'package:webitel_portal_sdk/src/injection/injection.dart'; + +/// Implementation of the [Upload] interface, representing a file upload operation. +@LazySingleton(as: Upload) +final class UploadImpl implements Upload { + /// The current offset in bytes of the upload. + int offset; + + /// The subscription for the upload stream. + StreamSubscription? subscription; + + @override + final StreamController onProgress; + + /// An optional error that might have occurred during the upload. + CallError? _error; + + late final ChatService _chatService; + + /// Constructs an [UploadImpl] with the given media type, name, data stream, and data stream controller. + /// + /// [mediaType] is the type of the media being uploaded. + /// [mediaName] is the name of the media being uploaded. + /// [mediaData] is the data stream of the media being uploaded. + /// [onData] is a stream of [MediaFileResponse] providing updates on the upload progress. + UploadImpl({ + required this.onProgress, + this.subscription, + this.offset = 0, + }) { + _chatService = getIt.get(); + } + + /// Cancels the upload operation. + /// + /// Cancels the current subscription to stop the upload and closes the [onData] stream controller. + @override + Future cancel() async { + if (subscription != null) { + await subscription!.cancel(); + subscription = null; // Clear the subscription to indicate cancellation + onProgress + .close(); // Close the stream controller to signal the end of the upload + } + } + + /// Sets an error if an upload error occurs. + void setError(CallError error) { + _error = error; + } + + /// Updates the offset of the upload. + /// + /// [newOffset] is the new offset in bytes to update. + void updateOffset(int newOffset) { + offset = newOffset; + } + + /// Sets the subscription for the upload. + /// + /// [newSubscription] is the new [StreamSubscription] to set. + void setSubscription(StreamSubscription newSubscription) { + subscription = newSubscription; + } +} diff --git a/lib/src/domain/entities/button.dart b/lib/src/domain/entities/button.dart index 6ad9dab..bee747b 100644 --- a/lib/src/domain/entities/button.dart +++ b/lib/src/domain/entities/button.dart @@ -1 +1,25 @@ -/// Represents a button in the keyboard. class Button { /// The text displayed on the button. final String text; /// The callback code executed when the button is pressed (optional). /// This corresponds to postback/callback data. final String? code; /// The URL to navigate to when the button is pressed (optional). final String? url; /// Constructs a [Button] instance with the given details. /// /// [text] The text displayed on the button. This is a required parameter. /// [code] The callback code executed when the button is pressed (optional). /// [url] The URL to navigate to when the button is pressed (optional). Button({ required this.text, this.code, this.url, }); } \ No newline at end of file +/// Represents a button in the keyboard. +library; + +class Button { + /// The text displayed on the button. + final String text; + + /// The callback code executed when the button is pressed (optional). + /// This corresponds to postback/callback data. + final String? code; + + /// The URL to navigate to when the button is pressed (optional). + final String? url; + + /// Constructs a [Button] instance with the given details. + /// + /// [text] The text displayed on the button. This is a required parameter. + /// [code] The callback code executed when the button is pressed (optional). + /// [url] The URL to navigate to when the button is pressed (optional). + Button({ + required this.text, + this.code, + this.url, + }); +} diff --git a/lib/src/domain/entities/dialog.dart b/lib/src/domain/entities/dialog.dart index 0a8974a..5538909 100644 --- a/lib/src/domain/entities/dialog.dart +++ b/lib/src/domain/entities/dialog.dart @@ -1,10 +1,13 @@ import 'dart:async'; +import 'dart:io'; import 'package:webitel_portal_sdk/src/domain/entities/call_error.dart'; import 'package:webitel_portal_sdk/src/domain/entities/dialog_message_response.dart'; import 'package:webitel_portal_sdk/src/domain/entities/download.dart'; import 'package:webitel_portal_sdk/src/domain/entities/portal_chat_member.dart'; import 'package:webitel_portal_sdk/src/domain/entities/postback.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload_file.dart'; /// Interface for dialog operations, providing methods to manage messages, /// download files, and handle dialog events. @@ -24,7 +27,8 @@ abstract interface class Dialog { /// Returns the top message as a string. String get topMessage; - /// Retrieves flag whether chat is closed or not. + /// Retrieves the current status of the dialog whether it's closed. + /// bool get isClosed; /// Stream of new messages in the dialog. @@ -34,21 +38,36 @@ abstract interface class Dialog { /// Stream of chat member added. /// - /// Returns a stream of [ChatMember] for new member added.. + /// Returns a stream of [ChatMember] for new member added. Stream get onMemberAdded; /// Stream of chat member left. /// - /// Returns a stream of [PortalChatMember] for member left. + /// Returns a stream of [ChatMember] for member left. Stream get onMemberLeft; /// Downloads a media file associated with the dialog. /// /// [fileId] The unique identifier of the file to be downloaded. /// - /// Returns a stream of [Download] object. + /// Returns a [Download] object representing the download operation. Download downloadFile({required String fileId, int? offset}); + /// Uploads a media file to be sent in the dialog. + /// + /// [mediaType] The type of the media to be uploaded. + /// [mediaName] The name of the media to be uploaded. + /// [mediaData] The data stream of the media to be uploaded. + /// + /// Returns an [Upload] object representing the upload operation. + Upload uploadFile({ + required String mediaType, + required String mediaName, + required File file, + String? pid, + int? offset, + }); + /// Sends a message in the dialog. /// /// [mediaType] The type of the media to be sent (optional). @@ -59,14 +78,12 @@ abstract interface class Dialog { /// /// Returns a [DialogMessageResponse] indicating the result of the send operation. Future sendMessage({ - String? mediaType, - String? mediaName, - Stream>? mediaData, + UploadFile? uploadFile, required String content, required String requestId, }); - /// Sends a postback + /// Sends a postback. /// /// [postback] The postback object. /// [requestId] The unique identifier for the message request. diff --git a/lib/src/domain/entities/dialog_message_request.dart b/lib/src/domain/entities/dialog_message_request.dart index 258c735..f9cfb72 100644 --- a/lib/src/domain/entities/dialog_message_request.dart +++ b/lib/src/domain/entities/dialog_message_request.dart @@ -1,9 +1,9 @@ -import 'package:webitel_portal_sdk/src/domain/entities/media_file_request.dart'; +import 'package:webitel_portal_sdk/src/domain/entities/upload_file.dart'; /// Represents a request to send a dialog message, which may include a media file. class DialogMessageRequest { /// The media file associated with the message. - final MediaFileRequest file; + final UploadFile uploadFile; /// The content of the message. final String content; @@ -27,7 +27,7 @@ class DialogMessageRequest { DialogMessageRequest({ required this.requestId, required this.content, - required this.file, + required this.uploadFile, this.chatId, this.messageId, }); diff --git a/lib/src/domain/entities/dialog_message_response.dart b/lib/src/domain/entities/dialog_message_response.dart index 6b0df59..5ff016d 100644 --- a/lib/src/domain/entities/dialog_message_response.dart +++ b/lib/src/domain/entities/dialog_message_response.dart @@ -61,8 +61,8 @@ class DialogMessageResponse { this.type, this.sender, this.chatId, - this.keyboard, required this.messageId, + this.keyboard, required this.timestamp, required this.file, required this.requestId, diff --git a/lib/src/domain/entities/media_file_response.dart b/lib/src/domain/entities/media_file_response.dart index 2759493..a67fe77 100644 --- a/lib/src/domain/entities/media_file_response.dart +++ b/lib/src/domain/entities/media_file_response.dart @@ -1,19 +1,19 @@ /// Represents a response containing information about a media file. class MediaFileResponse { /// The name of the media file. - final String name; + final String? name; /// The type of the media file. - final String type; + final String? type; /// The unique identifier of the media file. - final String id; + final String? id; /// The size of the media file in bytes. - final int size; + final int? size; /// The byte data of the media file. - final List bytes; + final List? bytes; /// Constructs a [MediaFileResponse] instance with the given file details. /// @@ -24,10 +24,10 @@ class MediaFileResponse { /// [bytes] The byte data of the media file (optional). MediaFileResponse({ List? bytes, - required this.size, - required this.name, - required this.type, - required this.id, + this.size, + this.name, + this.type, + this.id, }) : bytes = bytes ?? []; /// Named constructor for creating an initial/default instance of [MediaFileResponse]. diff --git a/lib/src/domain/entities/progress.dart b/lib/src/domain/entities/progress.dart new file mode 100644 index 0000000..503f08d --- /dev/null +++ b/lib/src/domain/entities/progress.dart @@ -0,0 +1,9 @@ +class Progress { + final int progressSize; + final String progressId; + + Progress({ + required this.progressSize, + required this.progressId, + }); +} diff --git a/lib/src/domain/entities/upload.dart b/lib/src/domain/entities/upload.dart new file mode 100644 index 0000000..e0fc933 --- /dev/null +++ b/lib/src/domain/entities/upload.dart @@ -0,0 +1,19 @@ +import 'dart:async'; + +import 'package:webitel_portal_sdk/src/domain/entities/upload_response.dart'; + +import 'media_file_response.dart'; + +/// An abstract interface class representing a file upload operation. +/// This interface provides methods to upload media files and handle the upload progress. +/// +abstract interface class Upload { + /// A stream of [MediaFileResponse] providing updates on the upload progress. + /// This stream should emit events to notify the caller of the upload progress. + /// The [MediaFileResponse] object should contain information about the upload status, + /// such as the current progress, the total size of the file, and any errors that may occur. + StreamController get onProgress; + + /// Cancels the upload operation. + Future cancel(); +} diff --git a/lib/src/domain/entities/upload_file.dart b/lib/src/domain/entities/upload_file.dart new file mode 100644 index 0000000..c5c5680 --- /dev/null +++ b/lib/src/domain/entities/upload_file.dart @@ -0,0 +1,11 @@ +class UploadFile { + final String id; + final String name; + final String type; + + UploadFile({ + required this.id, + required this.name, + required this.type, + }); +} diff --git a/lib/src/domain/entities/upload_response.dart b/lib/src/domain/entities/upload_response.dart new file mode 100644 index 0000000..77e7e07 --- /dev/null +++ b/lib/src/domain/entities/upload_response.dart @@ -0,0 +1,69 @@ +import 'package:webitel_portal_sdk/src/domain/entities/progress.dart'; + +/// Represents a response containing information about an uploaded response. +class UploadResponse { + /// The name of the media file. + final String? name; + + /// The type of the media file. + final String? type; + + /// The unique identifier of the media file. + final String? id; + + /// The size of the media file in bytes. + final int? size; + + /// The byte data of the media file. + final Progress? progress; + + /// Constructs a [MediaFileResponse] instance with the given file details. + /// + /// [name] The name of the media file. + /// [type] The type of the media file. + /// [id] The unique identifier of the media file. + /// [size] The size of the media file in bytes. + /// [bytes] The byte data of the media file (optional). + UploadResponse({ + Progress? progress, + this.size, + this.name, + this.type, + this.id, + }) : progress = progress ?? Progress(progressSize: 0, progressId: ''); + + /// Named constructor for creating an initial/default instance of [MediaFileResponse]. + /// + /// The initial instance has default values for all fields. + UploadResponse.initial() + : name = '', + type = '', + id = '', + size = 0, + progress = Progress(progressSize: 0, progressId: ''); + + /// Creates a copy of this [MediaFileResponse] with the given fields replaced with new values. + /// + /// [name] The new name of the media file (optional). + /// [type] The new type of the media file (optional). + /// [id] The new unique identifier of the media file (optional). + /// [size] The new size of the media file in bytes (optional). + /// [bytes] The new byte data of the media file (optional). + /// + /// Returns a new [MediaFileResponse] instance with the updated values. + UploadResponse copyWith({ + String? name, + String? type, + String? id, + int? size, + Progress? progress, + }) { + return UploadResponse( + name: name ?? this.name, + type: type ?? this.type, + id: id ?? this.id, + size: size ?? this.size, + progress: progress ?? this.progress, + ); + } +} diff --git a/lib/src/domain/entities/upload_task.dart b/lib/src/domain/entities/upload_task.dart new file mode 100644 index 0000000..62c7aba --- /dev/null +++ b/lib/src/domain/entities/upload_task.dart @@ -0,0 +1,24 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:webitel_portal_sdk/src/domain/entities/upload_response.dart'; + +class UploadTask { + final String mediaType; + final String mediaName; + final File file; + final String? pid; + final int? offset; + final StreamController controller; + Completer completer; + + UploadTask({ + required this.mediaType, + required this.mediaName, + required this.file, + this.pid, + this.offset, + required this.controller, + required this.completer, + }); +} diff --git a/lib/src/domain/services/chat_service.dart b/lib/src/domain/services/chat_service.dart index c715bbc..cce9595 100644 --- a/lib/src/domain/services/chat_service.dart +++ b/lib/src/domain/services/chat_service.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:io'; import 'package:webitel_portal_sdk/src/data/download_impl.dart'; import 'package:webitel_portal_sdk/src/domain/entities/call_error.dart'; @@ -11,7 +12,8 @@ import 'package:webitel_portal_sdk/src/domain/entities/dialog_message_response.d import 'package:webitel_portal_sdk/src/domain/entities/download.dart'; import 'package:webitel_portal_sdk/src/domain/entities/media_file_response.dart'; import 'package:webitel_portal_sdk/src/domain/entities/postback.dart'; -import 'package:webitel_portal_sdk/src/generated/portal/media.pbgrpc.dart'; + +import '../entities/upload.dart'; /// Interface for the chat service, providing methods for fetching messages, /// sending messages, handling dialogs, and managing streams and errors. @@ -93,14 +95,6 @@ abstract interface class ChatService { /// Returns a stream of [MediaFileResponse] for the downloaded file. Download downloadFile({required String fileId, int? offset}); - /// Pauses the download of a media file. - /// - /// [fileId] The ID of the file to be paused. - Future pauseDownload({ - required String fileId, - required StreamSubscription subscription, - }); - /// Resumes the download of a media file. /// /// [fileId] The ID of the file to be resumed. @@ -111,6 +105,21 @@ abstract interface class ChatService { required DownloadImpl download, }); + /// Uploads a media file to be sent in a dialog. + /// + /// [mediaType] The type of the media to be uploaded. + /// [mediaName] The name of the media to be uploaded. + /// [mediaData] The data stream of the media to be uploaded. + /// + /// Returns an [Upload] object representing the upload operation. + Upload uploadFile({ + required String mediaType, + required String mediaName, + required File file, + String? pid, + int? offset, + }); + /// Provides a stream controller for channel status changes. /// /// Returns a [StreamController] for [ChannelStatus] changes. diff --git a/lib/webitel_portal_sdk.dart b/lib/webitel_portal_sdk.dart index 5c49357..04eca04 100644 --- a/lib/webitel_portal_sdk.dart +++ b/lib/webitel_portal_sdk.dart @@ -7,6 +7,7 @@ export 'src/domain/entities/dialog.dart'; export 'src/domain/entities/dialog_message_request.dart'; export 'src/domain/entities/dialog_message_response.dart'; export 'src/domain/entities/logger_level.dart'; +export 'src/domain/entities/media_file_response.dart'; export 'src/domain/entities/message_type.dart'; export 'src/domain/entities/portal_chat_member.dart'; export 'src/domain/entities/portal_client.dart'; @@ -14,4 +15,6 @@ export 'src/domain/entities/portal_response.dart'; export 'src/domain/entities/portal_response_status.dart'; export 'src/domain/entities/portal_user.dart'; export 'src/domain/entities/postback.dart'; +export 'src/domain/entities/upload_file.dart'; +export 'src/domain/entities/upload_response.dart'; export 'src/webitel_portal_sdk.dart';