Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PDO protocol (peer data operation): Get more history sync + better message retry mechanism #919

Merged
merged 15 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 71 additions & 8 deletions Example/example.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import { Boom } from '@hapi/boom'
import NodeCache from 'node-cache'
import readline from 'readline'
import makeWASocket, { AnyMessageContent, BinaryInfo, delay, DisconnectReason, encodeWAM, fetchLatestBaileysVersion, getAggregateVotesInPollMessage, makeCacheableSignalKeyStore, makeInMemoryStore, PHONENUMBER_MCC, proto, useMultiFileAuthState, WAMessageContent, WAMessageKey } from '../src'
import MAIN_LOGGER from '../src/Utils/logger'
import makeWASocket, { AnyMessageContent, BinaryInfo, delay, DisconnectReason, downloadAndProcessHistorySyncNotification, encodeWAM, fetchLatestBaileysVersion, getAggregateVotesInPollMessage, getHistoryMsg, isJidNewsletter, makeCacheableSignalKeyStore, makeInMemoryStore, PHONENUMBER_MCC, proto, useMultiFileAuthState, WAMessageContent, WAMessageKey } from '../src'
//import MAIN_LOGGER from '../src/Utils/logger'
import open from 'open'
import fs from 'fs'
import P from 'pino'

const logger = MAIN_LOGGER.child({})
const logger = P({ timestamp: () => `,"time":"${new Date().toJSON()}"` }, P.destination('./wa-logs.txt'))
logger.level = 'trace'

const useStore = !process.argv.includes('--no-store')
const doReplies = !process.argv.includes('--no-reply')
const doReplies = process.argv.includes('--do-reply')
const usePairingCode = process.argv.includes('--use-pairing-code')
const useMobile = process.argv.includes('--mobile')

// external map to store retry counts of messages when decryption/encryption fails
// keep this out of the socket itself, so as to prevent a message decryption/encryption loop across socket restarts
const msgRetryCounterCache = new NodeCache()

const onDemandMap = new Map<string, string>()

// Read line interface
const rl = readline.createInterface({ input: process.stdin, output: process.stdout })
const question = (text: string) => new Promise<string>((resolve) => rl.question(text, resolve))
Expand Down Expand Up @@ -231,8 +234,11 @@ const startSock = async() => {

// history received
if(events['messaging-history.set']) {
const { chats, contacts, messages, isLatest } = events['messaging-history.set']
console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest})`)
const { chats, contacts, messages, isLatest, progress, syncType } = events['messaging-history.set']
if (syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) {
console.log('received on-demand history sync, messages=', messages)
}
console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest}, progress: ${progress}%), type: ${syncType}`)
}

// received a new message
Expand All @@ -241,8 +247,65 @@ const startSock = async() => {
console.log('recv messages ', JSON.stringify(upsert, undefined, 2))

if(upsert.type === 'notify') {
for(const msg of upsert.messages) {
if(!msg.key.fromMe && doReplies) {
for (const msg of upsert.messages) {
//TODO: More built-in implementation of this
/* if (
msg.message?.protocolMessage?.type ===
proto.Message.ProtocolMessage.Type.HISTORY_SYNC_NOTIFICATION
) {
const historySyncNotification = getHistoryMsg(msg.message)
if (
historySyncNotification?.syncType ==
proto.HistorySync.HistorySyncType.ON_DEMAND
) {
const { messages } =
await downloadAndProcessHistorySyncNotification(
historySyncNotification,
{}
)


const chatId = onDemandMap.get(
historySyncNotification!.peerDataRequestSessionId!
)

console.log(messages)

onDemandMap.delete(
historySyncNotification!.peerDataRequestSessionId!
)

/*
// 50 messages is the limit imposed by whatsapp
//TODO: Add ratelimit of 7200 seconds
//TODO: Max retries 10
const messageId = await sock.fetchMessageHistory(
50,
oldestMessageKey,
oldestMessageTimestamp
)
onDemandMap.set(messageId, chatId)
}
} */

if (msg.message?.conversation || msg.message?.extendedTextMessage?.text) {
const text = msg.message?.conversation || msg.message?.extendedTextMessage?.text
if (text == "requestPlaceholder" && !upsert.requestId) {
const messageId = await sock.requestPlaceholderResend(msg.key)
console.log('requested placeholder resync, id=', messageId)
} else if (upsert.requestId) {
console.log('Message received from phone, id=', upsert.requestId, msg)
}

// go to an old chat and send this
if (text == "onDemandHistSync") {
const messageId = await sock.fetchMessageHistory(50, msg.key, msg.messageTimestamp!)
console.log('requested on-demand sync, id=', messageId)
}
}

if(!msg.key.fromMe && doReplies && !isJidNewsletter(msg.key?.remoteJid!)) {

console.log('replying to', msg.key.remoteJid)
await sock!.readMessages([msg.key])
await sendMessageWTyping({ text: 'Hello there!' }, msg.key.remoteJid!)
Expand Down
3 changes: 2 additions & 1 deletion src/Defaults/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ export const PROCESSABLE_HISTORY_TYPES = [
proto.Message.HistorySyncNotification.HistorySyncType.INITIAL_BOOTSTRAP,
proto.Message.HistorySyncNotification.HistorySyncType.PUSH_NAME,
proto.Message.HistorySyncNotification.HistorySyncType.RECENT,
proto.Message.HistorySyncNotification.HistorySyncType.FULL
proto.Message.HistorySyncNotification.HistorySyncType.FULL,
proto.Message.HistorySyncNotification.HistorySyncType.ON_DEMAND,
]

export const DEFAULT_CONNECTION_CONFIG: SocketConfig = {
Expand Down
13 changes: 12 additions & 1 deletion src/Socket/chats.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Boom } from '@hapi/boom'
import NodeCache from 'node-cache'
import { proto } from '../../WAProto'
import { PROCESSABLE_HISTORY_TYPES } from '../Defaults'
import { DEFAULT_CACHE_TTLS, PROCESSABLE_HISTORY_TYPES } from '../Defaults'
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence, WAPrivacyCallValue, WAPrivacyOnlineValue, WAPrivacyValue, WAReadReceiptsValue } from '../Types'
import { chatModificationToAppPatch, ChatMutationMap, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils'
import { makeMutex } from '../Utils/make-mutex'
Expand Down Expand Up @@ -36,6 +37,15 @@ export const makeChatsSocket = (config: SocketConfig) => {
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
const processingMutex = makeMutex()

const placeholderResendCache = config.placeholderResendCache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour
useClones: false
})

if(!config.placeholderResendCache) {
config.placeholderResendCache = placeholderResendCache
}

/** helper function to fetch the given app state sync key */
const getAppStateSyncKey = async(keyId: string) => {
const { [keyId]: key } = await authState.keys.get('app-state-sync-key', [keyId])
Expand Down Expand Up @@ -876,6 +886,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
msg,
{
shouldProcessHistoryMsg,
placeholderResendCache,
ev,
creds: authState.creds,
keyStore: authState.keys,
Expand Down
124 changes: 113 additions & 11 deletions src/Socket/messages-recv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
aesEncryptGCM,
Curve,
decodeMediaRetryNode,
decodeMessageNode,
decryptMessageNode,
delay,
derivePairingCodeKey,
Expand All @@ -19,6 +20,7 @@
getHistoryMsg,
getNextPreKeys,
getStatusFromReceiptType, hkdf,
NO_MESSAGE_FOUND_ERROR_TEXT,
unixTimestampSeconds,
xmppPreKey,
xmppSignedPreKey
Expand Down Expand Up @@ -65,6 +67,7 @@
relayMessage,
sendReceipt,
uploadPreKeys,
sendPeerDataOperationMessage,
} = sock

/** this mutex ensures that each retryRequest will wait for the previous one to finish */
Expand All @@ -79,6 +82,11 @@
useClones: false
})

const placeholderResendCache = config.placeholderResendCache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour
useClones: false
})

let sendActiveReceipts = false

const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => {
Expand All @@ -99,14 +107,13 @@
stanza.attrs.recipient = attrs.recipient
}

if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) {
stanza.attrs.type = attrs.type
}

if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) {
stanza.attrs.type = attrs.type
}

if(tag === 'message' && getBinaryNodeChild({ tag, attrs, content }, 'unavailable')) {
stanza.attrs.from = authState.creds.me!.id
}
if(tag === 'message' && getBinaryNodeChild({ tag, attrs, content }, 'unavailable')) {
stanza.attrs.from = authState.creds.me!.id
}

logger.debug({ recv: { tag, attrs }, sent: stanza.attrs }, 'sent ack')
await sendNode(stanza)
Expand All @@ -133,9 +140,11 @@
}

const sendRetryRequest = async(node: BinaryNode, forceIncludeKeys = false) => {
const { id: msgId, participant } = node.attrs
const { fullMessage } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '')
const { key: msgKey } = fullMessage
const msgId = msgKey.id!

const key = `${msgId}:${participant}`
const key = `${msgId}:${msgKey?.participant}`
let retryCount = msgRetryCache.get<number>(key) || 0
if(retryCount >= maxMsgRetryCount) {
logger.debug({ retryCount, msgId }, 'reached retry limit, clearing')
Expand All @@ -148,6 +157,12 @@

const { account, signedPreKey, signedIdentityKey: identityKey } = authState.creds

if(retryCount === 1) {
//request a resend via phone
const msgId = await requestPlaceholderResend(msgKey)
logger.debug(`sendRetryRequest: requested placeholder resend for message ${msgId}`)
}

const deviceIdentity = encodeSignedDeviceIdentity(account!, true)
await authState.keys.transaction(
async() => {
Expand Down Expand Up @@ -322,7 +337,7 @@

break
case 'membership_approval_mode':
const approvalMode: any = getBinaryNodeChild(child, 'group_join')

Check warning on line 340 in src/Socket/messages-recv.ts

View workflow job for this annotation

GitHub Actions / check-lint

Unexpected any. Specify a different type
if(approvalMode) {
msg.messageStubType = WAMessageStubType.GROUP_MEMBERSHIP_JOIN_APPROVAL_MODE
msg.messageStubParameters = [ approvalMode.attrs.state ]
Expand Down Expand Up @@ -699,12 +714,30 @@
}

const handleMessage = async(node: BinaryNode) => {
if(shouldIgnoreJid(node.attrs.from!) && node.attrs.from! !== '@s.whatsapp.net') {
if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') {
logger.debug({ key: node.attrs.key }, 'ignored message')
await sendMessageAck(node)
return
}

let response: string | undefined

if(getBinaryNodeChild(node, 'unavailable') && !getBinaryNodeChild(node, 'enc')) {
await sendMessageAck(node)
const { key } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '').fullMessage
response = await requestPlaceholderResend(key)
if(response === 'RESOLVED') {
return
}

logger.debug('received unavailable message, acked and requested resend from phone')
} else {
if(placeholderResendCache.get(node.attrs.id)) {
placeholderResendCache.del(node.attrs.id)
}
}


const { fullMessage: msg, category, author, decrypt } = decryptMessageNode(
node,
authState.creds.me!.id,
Expand All @@ -713,6 +746,10 @@
logger,
)

if(response && msg?.messageStubParameters?.[0] === NO_MESSAGE_FOUND_ERROR_TEXT) {
msg.messageStubParameters = [NO_MESSAGE_FOUND_ERROR_TEXT, response]
}

if(msg.message?.protocolMessage?.type === proto.Message.ProtocolMessage.Type.SHARE_PHONE_NUMBER) {
if(node.attrs.sender_pn) {
ev.emit('chats.phoneNumberShare', { lid: node.attrs.from, jid: node.attrs.sender_pn })
Expand All @@ -728,6 +765,10 @@
retryMutex.mutex(
async() => {
if(ws.isOpen) {
if(getBinaryNodeChild(node, 'unavailable')) {
return
}

const encNode = getBinaryNodeChild(node, 'enc')
await sendRetryRequest(node, !encNode)
if(retryRequestDelayMs) {
Expand Down Expand Up @@ -773,6 +814,65 @@
])
}

const fetchMessageHistory = async(
count: number,
oldestMsgKey: WAMessageKey,
oldestMsgTimestamp: number | Long
): Promise<string> => {
if(!authState.creds.me?.id) {
throw new Boom('Not authenticated')
}

const pdoMessage = {
historySyncOnDemandRequest: {
chatJid: oldestMsgKey.remoteJid,
oldestMsgFromMe: oldestMsgKey.fromMe,
oldestMsgId: oldestMsgKey.id,
oldestMsgTimestampMs: oldestMsgTimestamp,
onDemandMsgCount: count
},
peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.HISTORY_SYNC_ON_DEMAND
}

return sendPeerDataOperationMessage(pdoMessage)
}

const requestPlaceholderResend = async(messageKey: WAMessageKey): Promise<'RESOLVED'| string | undefined> => {

Check warning on line 840 in src/Socket/messages-recv.ts

View workflow job for this annotation

GitHub Actions / check-lint

"RESOLVED" is overridden by string in this union type
if(!authState.creds.me?.id) {
throw new Boom('Not authenticated')
}

if(placeholderResendCache.get(messageKey?.id!)) {
logger.debug('already requested resend', { messageKey })
return
} else {
placeholderResendCache.set(messageKey?.id!, true)
}

await delay(5000)

if(!placeholderResendCache.get(messageKey?.id!)) {
logger.debug('message received while resend requested', { messageKey })
return 'RESOLVED'
}

const pdoMessage = {
placeholderMessageResendRequest: [{
messageKey
}],
peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.PLACEHOLDER_MESSAGE_RESEND
}

setTimeout(() => {
if(placeholderResendCache.get(messageKey?.id!)) {
logger.debug('PDO message without response after 15 seconds. Phone possibly offline', { messageKey })
placeholderResendCache.del(messageKey?.id!)
}
}, 15_000)

return sendPeerDataOperationMessage(pdoMessage)
}

const handleCall = async(node: BinaryNode) => {
const { attrs } = node
const [infoChild] = getAllBinaryNodeChildren(node)
Expand Down Expand Up @@ -925,6 +1025,8 @@
...sock,
sendMessageAck,
sendRetryRequest,
rejectCall
rejectCall,
fetchMessageHistory,
requestPlaceholderResend,
}
}
Loading
Loading