diff --git a/shinkai-app/src/api/index.ts b/shinkai-app/src/api/index.ts index e2394e829..7e8b5b429 100644 --- a/shinkai-app/src/api/index.ts +++ b/shinkai-app/src/api/index.ts @@ -11,17 +11,18 @@ import { receiveAllInboxesForProfile, receiveLoadMoreMessagesFromInbox, addAgents, + receiveUnreadMessagesFromInbox, } from "../store/actions"; import { AppThunk } from "../types"; import { ShinkaiMessageBuilderWrapper } from "../lib/wasm/ShinkaiMessageBuilderWrapper"; import { MergedSetupType } from "../pages/Connect"; import { ApiConfig } from "./api_config"; -import { SetupDetailsState } from "../store/reducers"; import { ShinkaiMessage } from "../models/ShinkaiMessage"; import { ShinkaiNameWrapper } from "../lib/wasm/ShinkaiNameWrapper"; import { InboxNameWrapper } from "../pkg/shinkai_message_wasm"; import { SerializedAgent } from "../models/SchemaTypes"; import { SerializedAgentWrapper } from "../lib/wasm/SerializedAgentWrapper"; +import { SetupDetailsState } from "../store/reducers/setupDetailsReducer"; // Helper function to handle HTTP errors export const handleHttpError = (response: any) => { @@ -234,6 +235,51 @@ export const getLastMessagesFromInbox = } }; +export const getLastUnreadMessagesFromInbox = + ( + inbox: string, + count: number, + fromKey: string | undefined, + setupDetailsState: SetupDetailsState + ) => + async (dispatch: AppDispatch) => { + try { + console.log("fromKey: ", fromKey); + let sender = + setupDetailsState.shinkai_identity + "/" + setupDetailsState.profile; + + const messageStr = + ShinkaiMessageBuilderWrapper.get_last_messages_from_inbox( + setupDetailsState.profile_encryption_sk, + setupDetailsState.profile_identity_sk, + setupDetailsState.node_encryption_pk, + inbox, + count, + fromKey, + sender, + "", + setupDetailsState.shinkai_identity + ); + + const message = JSON.parse(messageStr); + console.log("Message:", message); + + const apiEndpoint = ApiConfig.getInstance().getEndpoint(); + const response = await axios.post( + `${apiEndpoint}/v1/last_unread_messages_from_inbox`, + message + ); + + handleHttpError(response); + let results = response.data; + + console.log("getLastUnreadMessagesFromInbox Response:", results); + dispatch(receiveUnreadMessagesFromInbox(inbox, results)); + } catch (error) { + console.error("Error getting last messages from inbox:", error); + } + }; + export const submitRequestRegistrationCode = ( identity_permissions: string, @@ -409,7 +455,7 @@ export const sendMessageToJob = ); handleHttpError(response); - dispatch(response.data); + dispatch({ type: "SEND_MESSAGE_SUCCESS", payload: response.data }); } catch (error) { console.error("Error sending message to job:", error); } diff --git a/shinkai-app/src/components/ChatMessages.tsx b/shinkai-app/src/components/ChatMessages.tsx new file mode 100644 index 000000000..c159c5a90 --- /dev/null +++ b/shinkai-app/src/components/ChatMessages.tsx @@ -0,0 +1,173 @@ +import React, { useEffect, useState } from "react"; +import { useSelector, useDispatch } from "react-redux"; +import { getLastMessagesFromInbox, getLastUnreadMessagesFromInbox } from "../api/index"; +import { ShinkaiMessage } from "../models/ShinkaiMessage"; +import { IonList, IonItem, IonButton } from "@ionic/react"; +import Avatar from "../components/ui/Avatar"; +import { cn } from "../theme/lib/utils"; +import { IonContentCustom } from "./ui/Layout"; +import { calculateMessageHash } from "../utils/shinkai_message_handler"; +import { RootState } from "../store"; + +interface ChatMessagesProps { + deserializedId: string; +} + +const ChatMessages: React.FC = ({ deserializedId }) => { + console.log("Loading ChatMessages.tsx"); + const dispatch = useDispatch(); + const setupDetailsState = useSelector( + (state: RootState) => state.setupDetails + ); + const reduxMessages = useSelector( + (state: RootState) => state.messages.inboxes[deserializedId] + ); + + const [lastKey, setLastKey] = useState(undefined); + const [mostRecentKey, setMostRecentKey] = useState(undefined); + const [prevMessagesLength, setPrevMessagesLength] = useState(0); + const [hasMoreMessages, setHasMoreMessages] = useState(true); + const [messages, setMessages] = useState([]); + + useEffect(() => { + console.log("deserializedId:", deserializedId); + dispatch( + getLastMessagesFromInbox(deserializedId, 10, lastKey, setupDetailsState) + ); + }, [dispatch, setupDetailsState]); + + useEffect(() => { + const interval = setInterval(() => { + const lastMessage = reduxMessages[reduxMessages.length - 1]; + const hashKey = calculateMessageHash(lastMessage); + dispatch( + getLastUnreadMessagesFromInbox(deserializedId, 10, mostRecentKey, setupDetailsState) + ); + }, 5000); // 2000 milliseconds = 2 seconds + return () => clearInterval(interval); + }, [dispatch, deserializedId, mostRecentKey, setupDetailsState, reduxMessages]); + + useEffect(() => { + if (reduxMessages && reduxMessages.length > 0) { + // console.log("Redux Messages:", reduxMessages); + const lastMessage = reduxMessages[reduxMessages.length - 1]; + console.log("Last Message:", lastMessage); + const timeKey = lastMessage.external_metadata.scheduled_time; + const hashKey = calculateMessageHash(lastMessage); + const lastMessageKey = `${timeKey}:::${hashKey}`; + setLastKey(lastMessageKey); + + const mostRecentMessage = reduxMessages[0]; + const mostRecentTimeKey = mostRecentMessage.external_metadata.scheduled_time; + const mostRecentHashKey = calculateMessageHash(mostRecentMessage); + const mostRecentMessageKey = `${mostRecentTimeKey}:::${mostRecentHashKey}`; + setMostRecentKey(mostRecentMessageKey); + + setMessages(reduxMessages); + + if (reduxMessages.length - prevMessagesLength < 10) { + setHasMoreMessages(false); + } + setPrevMessagesLength(reduxMessages.length); + } + }, [reduxMessages]); + + const extractContent = (messageBody: any) => { + // TODO: extend it so it can be re-used by JobChat or normal Chat + if (messageBody && "unencrypted" in messageBody) { + if ("unencrypted" in messageBody.unencrypted.message_data) { + return JSON.parse( + messageBody.unencrypted.message_data.unencrypted.message_raw_content + ).content; + } else { + return JSON.parse( + messageBody.unencrypted.message_data.encrypted.content + ).content; + } + } else if (messageBody?.encrypted) { + return JSON.parse(messageBody.encrypted.content).content; + } + return ""; + }; + + return ( + +
+ {hasMoreMessages && ( + + dispatch( + getLastMessagesFromInbox( + deserializedId, + 10, + lastKey, + setupDetailsState, + true + ) + ) + } + > + Load More + + )} + + {messages && + messages + .slice() + .map((message, index) => { + const { shinkai_identity, profile, registration_name } = + setupDetailsState; + + const localIdentity = `${profile}/device/${registration_name}`; + // console.log("Message:", message); + let isLocalMessage = false; + if (message.body && "unencrypted" in message.body) { + isLocalMessage = + message.body.unencrypted.internal_metadata + .sender_subidentity === localIdentity; + } + + return ( + +
+ + +

{extractContent(message.body)}

+ {message?.external_metadata?.scheduled_time && ( + + {new Date( + message.external_metadata.scheduled_time + ).toLocaleString(undefined, { + year: "numeric", + month: "long", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + })} + + )} +
+
+ ); + })} +
+
+
+ ); +}; + +export default ChatMessages; diff --git a/shinkai-app/src/hooks/usetSetup.ts b/shinkai-app/src/hooks/usetSetup.ts index 21afda9ce..fcd31e21e 100644 --- a/shinkai-app/src/hooks/usetSetup.ts +++ b/shinkai-app/src/hooks/usetSetup.ts @@ -1,14 +1,17 @@ // hooks/useSetup.ts import { useEffect } from "react"; -import { useSelector } from "react-redux"; +import { shallowEqual, useSelector } from "react-redux"; import { RootState } from "../store"; import { ApiConfig } from "../api/api_config"; export const useSetup = () => { - const { setupDetailsState } = useSelector((state: RootState) => state); + const setupDetails = useSelector( + (state: RootState) => state.setupDetails, + shallowEqual + ); useEffect(() => { - console.log("Redux State:", setupDetailsState); - ApiConfig.getInstance().setEndpoint(setupDetailsState.node_address); - }, [setupDetailsState]); + console.log("Redux State:", setupDetails); + ApiConfig.getInstance().setEndpoint(setupDetails.node_address); + }, [setupDetails]); }; \ No newline at end of file diff --git a/shinkai-app/src/lib/wasm/ShinkaiMessageBuilderWrapper.ts b/shinkai-app/src/lib/wasm/ShinkaiMessageBuilderWrapper.ts index ea616cbc3..96a95bc7d 100644 --- a/shinkai-app/src/lib/wasm/ShinkaiMessageBuilderWrapper.ts +++ b/shinkai-app/src/lib/wasm/ShinkaiMessageBuilderWrapper.ts @@ -265,7 +265,7 @@ export class ShinkaiMessageBuilderWrapper { receiver_public_key: string, inbox: string, count: number, - offset: string | undefined, + until_offset: string | undefined, sender: string, sender_profile_name: string, receiver: string @@ -276,7 +276,32 @@ export class ShinkaiMessageBuilderWrapper { receiver_public_key, inbox, count, - offset, + until_offset, + sender, + sender_profile_name, + receiver, + "" + ); + } + + static get_last_unread_messages_from_inbox( + my_subidentity_encryption_sk: string, + my_subidentity_signature_sk: string, + receiver_public_key: string, + inbox: string, + count: number, + from_offset: string | undefined, + sender: string, + sender_profile_name: string, + receiver: string + ): string { + return ShinkaiMessageBuilderWrapperWASM.get_last_unread_messages_from_inbox( + my_subidentity_encryption_sk, + my_subidentity_signature_sk, + receiver_public_key, + inbox, + count, + from_offset, sender, sender_profile_name, receiver, diff --git a/shinkai-app/src/pages/AddAgent.tsx b/shinkai-app/src/pages/AddAgent.tsx index 77bcd45ac..5ee71dc06 100644 --- a/shinkai-app/src/pages/AddAgent.tsx +++ b/shinkai-app/src/pages/AddAgent.tsx @@ -23,18 +23,19 @@ import { import { useEffect, useState } from "react"; import { IonContentCustom, IonHeaderCustom } from "../components/ui/Layout"; import Button from "../components/ui/Button"; -import Input from "../components/ui/Input"; import { useDispatch, useSelector } from "react-redux"; import { RootState } from "../store"; import { SerializedAgent, AgentAPIModel } from "../models/SchemaTypes"; import { addAgent } from "../api"; import { useSetup } from "../hooks/usetSetup"; +import { useHistory } from 'react-router-dom'; + const AddAgent: React.FC = () => { useSetup(); const dispatch = useDispatch(); const setupDetailsState = useSelector( - (state: RootState) => state.setupDetailsState + (state: RootState) => state.setupDetails ); const [agent, setAgent] = useState>({ perform_locally: false, @@ -70,7 +71,7 @@ const AddAgent: React.FC = () => { }); }; - const handleSubmit = () => { + const handleSubmit = async () => { const { shinkai_identity, profile } = setupDetailsState; let node_name = shinkai_identity; @@ -82,14 +83,11 @@ const AddAgent: React.FC = () => { } console.log("Submitting agent:", agent); - dispatch( - addAgent( - profile, - node_name, - agent as SerializedAgent, - setupDetailsState - ) - ); + const resp = await addAgent(profile, node_name, agent as SerializedAgent, setupDetailsState); + if (resp) { + // TODO: show a success toast + history.back(); + } }; return ( diff --git a/shinkai-app/src/pages/AdminCommands.tsx b/shinkai-app/src/pages/AdminCommands.tsx index d1d742897..a3d66293b 100644 --- a/shinkai-app/src/pages/AdminCommands.tsx +++ b/shinkai-app/src/pages/AdminCommands.tsx @@ -24,7 +24,7 @@ import { useSetup } from "../hooks/usetSetup"; const AdminCommands: React.FC = () => { useSetup(); const setupDetailsState = useSelector( - (state: RootState) => state.setupDetailsState + (state: RootState) => state.setupDetails ); const [showCodeRegistrationActionSheet, setShowCodeRegistrationActionSheet] = useState(false); @@ -36,7 +36,7 @@ const AdminCommands: React.FC = () => { const [profileName, setProfileName] = useState(""); const dispatch = useDispatch(); const registrationCode = useSelector( - (state: RootState) => state.registrationCode + (state: RootState) => state.other.registrationCode ); const commands = [ "Get Peers", diff --git a/shinkai-app/src/pages/Chat.tsx b/shinkai-app/src/pages/Chat.tsx index 75a54df86..6317af4cc 100644 --- a/shinkai-app/src/pages/Chat.tsx +++ b/shinkai-app/src/pages/Chat.tsx @@ -52,7 +52,7 @@ const Chat: React.FC = () => { const dispatch = useDispatch(); const setupDetailsState = useSelector( - (state: RootState) => state.setupDetailsState + (state: RootState) => state.setupDetails ); const { id } = useParams<{ id: string }>(); @@ -63,7 +63,7 @@ const Chat: React.FC = () => { const [prevMessagesLength, setPrevMessagesLength] = useState(0); const reduxMessages = useSelector( - (state: RootState) => state.inboxes[deserializedId] + (state: RootState) => state.messages.inboxes[deserializedId] ); const [messages, setMessages] = useState([]); diff --git a/shinkai-app/src/pages/Connect.tsx b/shinkai-app/src/pages/Connect.tsx index a2698e0e7..a49a93877 100644 --- a/shinkai-app/src/pages/Connect.tsx +++ b/shinkai-app/src/pages/Connect.tsx @@ -20,7 +20,7 @@ import { BrowserQRCodeReader } from "@zxing/browser"; import { Camera, CameraResultType, CameraSource } from "@capacitor/camera"; import { useHistory } from "react-router-dom"; import { toast } from "react-toastify"; -import type { AppDispatch, RootState } from "../store"; +import type { AppDispatch } from "../store"; import { QrScanner, QrScannerProps } from "@yudiel/react-qr-scanner"; import { BarcodeScanner } from "@capacitor-community/barcode-scanner"; import { isPlatform } from "@ionic/react"; @@ -29,13 +29,14 @@ import { generateSignatureKeys, } from "../utils/wasm_helpers"; import { QRSetupData } from "../models/QRSetupData"; -import { SetupDetailsState } from "../store/reducers"; import { InputCustomEvent } from "@ionic/core/dist/types/components/input/input-interface"; import { cn } from "../theme/lib/utils"; import Button from "../components/ui/Button"; import { IonHeaderCustom } from "../components/ui/Layout"; import Input from "../components/ui/Input"; import { scan, cloudUpload, checkmarkSharp } from "ionicons/icons"; +import { SetupDetailsState } from "../store/reducers/setupDetailsReducer"; +import { RootState } from "../store/main"; export type MergedSetupType = SetupDetailsState & QRSetupData; @@ -65,7 +66,7 @@ const Connect: React.FC = () => { const [error, setError] = useState(null); const dispatch = useDispatch(); const history = useHistory(); - const errorFromState = useSelector((state: RootState) => state.error); + const errorFromState = useSelector((state: RootState) => state.other.error); // Generate keys when the component mounts useEffect(() => { diff --git a/shinkai-app/src/pages/CreateChat.tsx b/shinkai-app/src/pages/CreateChat.tsx index a84666f3e..42c40a544 100644 --- a/shinkai-app/src/pages/CreateChat.tsx +++ b/shinkai-app/src/pages/CreateChat.tsx @@ -20,17 +20,17 @@ import { useState } from "react"; import { useDispatch, useSelector } from "react-redux"; import { createChatWithMessage } from "../api"; import { useSetup } from "../hooks/usetSetup"; -import { RootState } from "../store/reducers"; import { useHistory } from "react-router-dom"; import { History } from "history"; import { IonContentCustom, IonHeaderCustom } from "../components/ui/Layout"; import Input from "../components/ui/Input"; import Button from "../components/ui/Button"; +import { RootState } from "../store"; const CreateChat: React.FC = () => { useSetup(); const setupDetailsState = useSelector( - (state: RootState) => state.setupDetailsState, + (state: RootState) => state.setupDetails, ); const [shinkaiIdentity, setShinkaiIdentity] = useState(""); const [messageText, setMessageText] = useState(""); diff --git a/shinkai-app/src/pages/CreateJob.tsx b/shinkai-app/src/pages/CreateJob.tsx index a8c98f252..d33031fd1 100644 --- a/shinkai-app/src/pages/CreateJob.tsx +++ b/shinkai-app/src/pages/CreateJob.tsx @@ -39,13 +39,13 @@ const CreateJob: React.FC = () => { useSetup(); const dispatch = useDispatch(); const setupDetailsState = useSelector( - (state: RootState) => state.setupDetailsState + (state: RootState) => state.setupDetails ); const [jobContent, setJobContent] = useState(""); const [selectedAgent, setSelectedAgent] = useState( null ); - const agents = useSelector((state: RootState) => state.agents); + const agents = useSelector((state: RootState) => state.other.agents); const history: History = useHistory(); useEffect(() => { diff --git a/shinkai-app/src/pages/Home.tsx b/shinkai-app/src/pages/Home.tsx index fc936f118..51e98cec9 100644 --- a/shinkai-app/src/pages/Home.tsx +++ b/shinkai-app/src/pages/Home.tsx @@ -17,7 +17,6 @@ import { import { addOutline, arrowForward, cloudUpload } from "ionicons/icons"; import "./Home.css"; import { useHistory } from "react-router-dom"; -import { RootState } from "../store"; import { useDispatch, useSelector } from "react-redux"; import React, { useEffect, useState } from "react"; import { ApiConfig } from "../api/api_config"; @@ -25,14 +24,15 @@ import { clearStore } from "../store/actions"; import { getAllInboxesForProfile } from "../api"; import Avatar from "../components/ui/Avatar"; import { IonContentCustom, IonHeaderCustom } from "../components/ui/Layout"; +import { RootState } from "../store"; const Home: React.FC = () => { - const { setupDetailsState } = useSelector((state: RootState) => state); + const setupDetails = useSelector((state: RootState) => state.setupDetails); const history = useHistory(); const dispatch = useDispatch(); const { shinkai_identity, profile, registration_name, permission_type } = - setupDetailsState; + setupDetails; const displayString = ( <> {`${shinkai_identity}/${profile}/device/${registration_name}`}{" "} @@ -41,20 +41,20 @@ const Home: React.FC = () => { ); const [showActionSheet, setShowActionSheet] = useState(false); const [showLogoutAlert, setShowLogoutAlert] = useState(false); - const inboxes = useSelector((state: RootState) => state.inboxes); + const inboxes = useSelector((state: RootState) => state.other.just_inboxes); console.log("Inboxes:", inboxes); useEffect(() => { - console.log("Redux State:", setupDetailsState); - ApiConfig.getInstance().setEndpoint(setupDetailsState.node_address); + console.log("Redux State:", setupDetails); + ApiConfig.getInstance().setEndpoint(setupDetails.node_address); }, []); useEffect(() => { - console.log("Redux State:", setupDetailsState); - ApiConfig.getInstance().setEndpoint(setupDetailsState.node_address); + console.log("Redux State:", setupDetails); + ApiConfig.getInstance().setEndpoint(setupDetails.node_address); // Local Identity - const { shinkai_identity, profile, registration_name } = setupDetailsState; + const { shinkai_identity, profile, registration_name } = setupDetails; let sender = shinkai_identity; let sender_subidentity = `${profile}/device/${registration_name}`; @@ -68,7 +68,7 @@ const Home: React.FC = () => { sender_subidentity, receiver, target_shinkai_name_profile, - setupDetailsState, + setupDetails, ), ); }, []); @@ -98,13 +98,13 @@ const Home: React.FC = () => {
- {Object.entries(inboxes).map(([position, inboxId]) => ( + {inboxes && inboxes.map((inbox_name) => ( { - const encodedInboxId = position.toString().replace(/\./g, "~"); + const encodedInboxId = inbox_name.toString().replace(/\./g, "~"); if (encodedInboxId.startsWith("inbox")) { history.push(`/chat/${encodeURIComponent(encodedInboxId)}`); } else if (encodedInboxId.startsWith("job_inbox")) { @@ -113,11 +113,11 @@ const Home: React.FC = () => { }} > - {JSON.stringify(position)} + {JSON.stringify(inbox_name)} { - return new Date(dateString); -}; +import ChatMessages from "../components/ChatMessages"; +import { RootState } from "../store"; const JobChat: React.FC = () => { - console.log("Loading Chat.tsx"); + console.log("Loading JobChat.tsx"); useSetup(); const dispatch = useDispatch(); const setupDetailsState = useSelector( - (state: RootState) => state.setupDetailsState + (state: RootState) => state.setupDetails, + shallowEqual ); + const { shinkai_identity, profile } = setupDetailsState; const { id } = useParams<{ id: string }>(); - const bottomChatRef = useRef(null); const deserializedId = decodeURIComponent(id).replace(/~/g, "."); - const [lastKey, setLastKey] = useState(undefined); - const [hasMoreMessages, setHasMoreMessages] = useState(true); - const [prevMessagesLength, setPrevMessagesLength] = useState(0); - - const reduxMessages = useSelector( - (state: RootState) => state.inboxes[deserializedId] - ); - - const [messages, setMessages] = useState([]); const [inputMessage, setInputMessage] = useState(""); const otherPersonIdentity = getOtherPersonIdentity( deserializedId, setupDetailsState.shinkai_identity ); - const extractContent = (messageBody: any) => { - if (messageBody && "unencrypted" in messageBody) { - if ("unencrypted" in messageBody.unencrypted.message_data) { - return JSON.parse( - messageBody.unencrypted.message_data.unencrypted.message_raw_content - ).content; - } else { - return JSON.parse( - messageBody.unencrypted.message_data.encrypted.content - ).content; - } - } else if (messageBody?.encrypted) { - return JSON.parse(messageBody.encrypted.content).content; - } - return ""; - }; - - useEffect(() => { - console.log("deserializedId:", deserializedId); - dispatch( - getLastMessagesFromInbox(deserializedId, 10, lastKey, setupDetailsState) - ); - }, [id, dispatch, setupDetailsState]); - - useEffect(() => { - if (reduxMessages && reduxMessages.length > 0) { - console.log("Redux Messages:", reduxMessages); - const lastMessage = reduxMessages[reduxMessages.length - 1]; - console.log("Last Message:", lastMessage); - const timeKey = lastMessage.external_metadata.scheduled_time; - const hashKey = calculateMessageHash(lastMessage); - const lastMessageKey = `${timeKey}:${hashKey}`; - setLastKey(lastMessageKey); - setMessages(reduxMessages); - - if (reduxMessages.length - prevMessagesLength < 10) { - setHasMoreMessages(false); - } - setPrevMessagesLength(reduxMessages.length); - } - }, [reduxMessages]); - - useEffect(() => { - // Check if the user is at the bottom of the chat - const isUserAtBottom = - bottomChatRef.current && - bottomChatRef.current.getBoundingClientRect().bottom <= - window.innerHeight; - - // If the user is at the bottom, scroll to the bottom - if (isUserAtBottom) { - bottomChatRef.current?.scrollIntoView({ behavior: "smooth" }); - } - }, [messages]); - - const sendMessage = () => { - console.log("Sending message: ", inputMessage); - - // Local Identity - const { shinkai_identity, profile, registration_name } = setupDetailsState; - // let sender = shinkai_identity; + const sendMessage = useCallback(async () => { let sender = `${shinkai_identity}/${profile}`; - + console.log("Sending message: ", inputMessage); console.log("Sender:", sender); - dispatch( + let message_to_send = inputMessage; + setInputMessage(""); + const result = await dispatch( sendMessageToJob( extractJobIdFromInbox(deserializedId.toString()), - inputMessage, + message_to_send, sender, shinkai_identity, "", setupDetailsState ) ); - setInputMessage(""); - }; + + }, [inputMessage, dispatch, setupDetailsState, shinkai_identity, deserializedId]); return ( @@ -166,79 +79,9 @@ const JobChat: React.FC = () => { {/**/}
- - -
- {hasMoreMessages && ( - - dispatch( - getLastMessagesFromInbox( - deserializedId, - 10, - lastKey, - setupDetailsState, - true - ) - ) - } - > - Load More - - )} - - {messages && - messages - .slice() - .reverse() - .map((message, index) => { - const { shinkai_identity, profile, registration_name } = - setupDetailsState; - - const localIdentity = `${profile}/device/${registration_name}`; - // console.log("Message:", message); - let isLocalMessage = false; - if (message.body && "unencrypted" in message.body) { - isLocalMessage = - message.body.unencrypted.internal_metadata - .sender_subidentity === localIdentity; - } - - return ( - -
- - -

{extractContent(message.body)}

- {message?.external_metadata?.scheduled_time && ( - - {parseDate( - message.external_metadata.scheduled_time - ).toLocaleTimeString()} - - )} -
-
- ); - })} -
-
-
- +
{ fill="outline" className="m-0 w-full bg-transparent p-0 pl-2 pr-12 md:pl-0" value={inputMessage} - onIonChange={(e) => setInputMessage(e.detail.value!)} + onIonChange={(e) => { + const newMessage = e.detail.value!; + setInputMessage(newMessage); + }} placeholder="Type a message" > diff --git a/shinkai-app/src/store/actions.ts b/shinkai-app/src/store/actions.ts index 9f5a49754..04cf1af11 100644 --- a/shinkai-app/src/store/actions.ts +++ b/shinkai-app/src/store/actions.ts @@ -1,5 +1,5 @@ import { SerializedAgent } from "../models/SchemaTypes"; -import { SetupDetailsState } from "./reducers"; +import { SetupDetailsState } from "./reducers/setupDetailsReducer"; import { GET_PUBLIC_KEY, USE_REGISTRATION_CODE, @@ -15,6 +15,7 @@ import { GET_AVAILABLE_AGENTS, CLEAR_MESSAGES, ADD_AGENTS, + RECEIVE_UNREAD_MESSAGES_FROM_INBOX, } from "./types"; export const getPublicKey = (publicKey: string) => ({ @@ -40,6 +41,14 @@ export const receiveLastMessagesFromInbox = ( payload: { inboxId, messages }, }); +export const receiveUnreadMessagesFromInbox = ( + inboxId: string, + messages: any[] +) => ({ + type: RECEIVE_UNREAD_MESSAGES_FROM_INBOX, + payload: { inboxId, messages }, +}); + export const receiveLoadMoreMessagesFromInbox = ( inboxId: string, messages: any[] diff --git a/shinkai-app/src/store/index.ts b/shinkai-app/src/store/index.ts index 5859f738b..ea0afbf22 100644 --- a/shinkai-app/src/store/index.ts +++ b/shinkai-app/src/store/index.ts @@ -1,12 +1,11 @@ -import { createStore, applyMiddleware, Store } from 'redux'; +import { createStore, applyMiddleware, Store, compose } from 'redux'; import thunk, { ThunkAction } from 'redux-thunk'; import storage from 'redux-persist/lib/storage'; import { persistStore, persistReducer } from 'redux-persist'; -import rootReducer, { RootState as RootStateFromReducer } from './reducers'; +import rootReducer from './reducers'; import { Action } from 'redux'; -export type RootState = RootStateFromReducer; - +export type RootState = ReturnType; export type AppDispatch = Store['dispatch']; export type AppThunk = ThunkAction< @@ -19,10 +18,17 @@ export type AppThunk = ThunkAction< const persistConfig = { key: 'root', storage, - whitelist: ['registrationStatus', 'setupDetailsState'] + whitelist: ['other', 'setupDetails'], + debug: true, }; const persistedReducer = persistReducer(persistConfig, rootReducer); -export const store = createStore(persistedReducer, applyMiddleware(thunk)); +// Use Redux DevTools extension if it's installed in the user's browser +const composeEnhancers = (typeof window !== 'undefined' && window.__REDUX_DEVTOOLS_EXTENSION_COMPOSE__) || compose; + +export const store = createStore( + persistedReducer, + composeEnhancers(applyMiddleware(thunk)) +); export const persistor = persistStore(store); diff --git a/shinkai-app/src/store/main.ts b/shinkai-app/src/store/main.ts deleted file mode 100644 index 1bb10ff48..000000000 --- a/shinkai-app/src/store/main.ts +++ /dev/null @@ -1,12 +0,0 @@ -// src/store/main.ts -import { configureStore } from '@reduxjs/toolkit'; -import chatReducer from '../features/chat/chatSlice'; - -export const store = configureStore({ - reducer: { - chat: chatReducer, - }, -}); - -export type RootState = ReturnType; -export type AppDispatch = typeof store.dispatch; diff --git a/shinkai-app/src/store/reducers.ts b/shinkai-app/src/store/reducers.ts index c20358dcc..140eb9327 100644 --- a/shinkai-app/src/store/reducers.ts +++ b/shinkai-app/src/store/reducers.ts @@ -1,229 +1,12 @@ -import { Base58String } from "../models/QRSetupData"; -import { SerializedAgent } from "../models/SchemaTypes"; -import { ShinkaiMessage } from "../models/ShinkaiMessage"; -import { calculateMessageHash } from "../utils/shinkai_message_handler"; -import { - Action, - GET_PUBLIC_KEY, - USE_REGISTRATION_CODE, - PING_ALL, - REGISTRATION_ERROR, - CREATE_REGISTRATION_CODE, - CLEAR_REGISTRATION_CODE, - RECEIVE_LAST_MESSAGES_FROM_INBOX, - CLEAR_STORE, - ADD_MESSAGE_TO_INBOX, - RECEIVE_ALL_INBOXES_FOR_PROFILE, - RECEIVE_LOAD_MORE_MESSAGES_FROM_INBOX, - ADD_AGENTS, -} from "./types"; - -export type SetupDetailsState = { - profile: string; - permission_type: string; - registration_name: string; - node_address: string; - shinkai_identity: string; - node_encryption_pk: Base58String; - node_signature_pk: Base58String; - profile_encryption_sk: Base58String; - profile_encryption_pk: Base58String; - profile_identity_sk: Base58String; - profile_identity_pk: Base58String; - my_device_encryption_sk: Base58String; - my_device_encryption_pk: Base58String; - my_device_identity_sk: Base58String; - my_device_identity_pk: Base58String; -}; - -const setupInitialState: SetupDetailsState = { - profile: "", - permission_type: "", - registration_name: "", - node_address: "", - shinkai_identity: "", - node_encryption_pk: "", - node_signature_pk: "", - profile_encryption_sk: "", - profile_encryption_pk: "", - profile_identity_sk: "", - profile_identity_pk: "", - my_device_encryption_sk: "", - my_device_encryption_pk: "", - my_device_identity_sk: "", - my_device_identity_pk: "", -}; - -export interface RootState { - registrationCode: string; - publicKey: string; - registrationStatus: boolean; - pingResult: string; - setupDetailsState: SetupDetailsState; - error: string | null; - inboxes: { - [inboxId: string]: any[]; - }; - messageHashes: { - [inboxId: string]: Set; - }; - agents: { - [agentId: string]: SerializedAgent; - }; -} - -const initialState: RootState = { - publicKey: "", - registrationStatus: false, - pingResult: "", - setupDetailsState: setupInitialState, - registrationCode: "", - error: null, - inboxes: {}, - messageHashes: {}, - agents: {}, -}; - -const rootReducer = (state = initialState, action: Action): RootState => { - switch (action.type) { - case GET_PUBLIC_KEY: - return { ...state, publicKey: action.payload }; - case USE_REGISTRATION_CODE: - return { - ...state, - registrationStatus: true, - setupDetailsState: action.payload, - }; - case RECEIVE_LOAD_MORE_MESSAGES_FROM_INBOX: { - const { inboxId, messages } = action.payload; - const currentMessages = state.inboxes[inboxId] || []; - const currentMessageHashes = state.messageHashes[inboxId] || new Set(); - - const uniqueNewMessages = messages.filter((msg: ShinkaiMessage) => { - const hash = calculateMessageHash(msg); - if (currentMessageHashes.has(hash)) { - return false; - } else { - currentMessageHashes.add(hash); - return true; - } - }); - - return { - ...state, - inboxes: { - ...state.inboxes, - [inboxId]: [...currentMessages, ...uniqueNewMessages], - }, - messageHashes: { - ...state.messageHashes, - [inboxId]: currentMessageHashes, - }, - }; - } - case RECEIVE_LAST_MESSAGES_FROM_INBOX: { - const { inboxId, messages } = action.payload; - const currentMessages = state.inboxes[inboxId] || []; - const currentMessageHashes = state.messageHashes[inboxId] || new Set(); - - const uniqueNewMessages = messages.filter((msg: ShinkaiMessage) => { - const hash = calculateMessageHash(msg); - if (currentMessageHashes.has(hash)) { - return false; - } else { - currentMessageHashes.add(hash); - return true; - } - }); - - return { - ...state, - inboxes: { - ...state.inboxes, - [inboxId]: [...currentMessages, ...uniqueNewMessages], - }, - messageHashes: { - ...state.messageHashes, - [inboxId]: currentMessageHashes, - }, - }; - } - case ADD_MESSAGE_TO_INBOX: { - const { inboxId, message } = action.payload; - const currentMessages = state.inboxes[inboxId] || []; - const currentMessageHashes = state.messageHashes[inboxId] || new Set(); - - const hash = calculateMessageHash(message); - if (currentMessageHashes.has(hash)) { - // If the message is a duplicate, don't add it - return state; - } else { - // If the message is unique, add it to the inbox and the hash to the set - currentMessageHashes.add(hash); - return { - ...state, - inboxes: { - ...state.inboxes, - [inboxId]: [message, ...currentMessages], - }, - messageHashes: { - ...state.messageHashes, - [inboxId]: currentMessageHashes, - }, - }; - } - } - case RECEIVE_ALL_INBOXES_FOR_PROFILE: { - const newInboxes = action.payload; - if (typeof newInboxes !== "object") { - console.error( - "Invalid payload for RECEIVE_ALL_INBOXES_FOR_PROFILE: ", - newInboxes - ); - return state; - } - return { - ...state, - inboxes: { - ...state.inboxes, - ...Object.keys(newInboxes).reduce( - (result: { [key: string]: any[] }, key) => { - if (!state.inboxes[key]) { - console.log("value for key: ", newInboxes[key]); - result[newInboxes[key]] = []; - } - return result; - }, - {} - ), - }, - }; - } - case ADD_AGENTS: { - const newAgents = action.payload; - const updatedAgents = { ...state.agents }; - newAgents.forEach((agent: SerializedAgent) => { - updatedAgents[agent.id] = agent; - }); - return { - ...state, - agents: updatedAgents, - }; - } - case CREATE_REGISTRATION_CODE: - return { ...state, registrationCode: action.payload }; - case REGISTRATION_ERROR: - return { ...state, error: action.payload }; - case CLEAR_REGISTRATION_CODE: - return { ...state, registrationCode: "" }; - case PING_ALL: - return { ...state, pingResult: action.payload }; - case CLEAR_STORE: - state = initialState; - return state; - default: - return state; - } -}; +import { combineReducers } from "redux"; +import { setupDetailsReducer } from "./reducers/setupDetailsReducer"; +import { messagesReducer } from "./reducers/messagesReducer"; +import otherReducer from "./reducers/otherReducer"; + +const rootReducer = combineReducers({ + setupDetails: setupDetailsReducer, + messages: messagesReducer, + other: otherReducer, +}); export default rootReducer; diff --git a/shinkai-app/src/store/reducers/messagesReducer.ts b/shinkai-app/src/store/reducers/messagesReducer.ts new file mode 100644 index 000000000..c34cc9df6 --- /dev/null +++ b/shinkai-app/src/store/reducers/messagesReducer.ts @@ -0,0 +1,218 @@ +import { ShinkaiMessage } from "../../models/ShinkaiMessage"; +import { calculateMessageHash } from "../../utils/shinkai_message_handler"; +import { + ADD_MESSAGE_TO_INBOX, + Action, + RECEIVE_ALL_INBOXES_FOR_PROFILE, + RECEIVE_LAST_MESSAGES_FROM_INBOX, + RECEIVE_LOAD_MORE_MESSAGES_FROM_INBOX, + RECEIVE_UNREAD_MESSAGES_FROM_INBOX, +} from "../types"; + +export interface MessagesState { + inboxes: { + [inboxId: string]: any[]; + }; + messageHashes: { + [inboxId: string]: { [hash: string]: boolean }; + }; +} + +const messagesState: MessagesState = { + inboxes: {}, + messageHashes: {}, +}; + +interface InboxMessagesAction { + type: + | typeof RECEIVE_LOAD_MORE_MESSAGES_FROM_INBOX + | typeof RECEIVE_LAST_MESSAGES_FROM_INBOX + | typeof RECEIVE_UNREAD_MESSAGES_FROM_INBOX; + payload?: { + inboxId: string; + messages: ShinkaiMessage[]; + }; +} + +interface AddMessageAction { + type: typeof ADD_MESSAGE_TO_INBOX; + payload?: { + inboxId: string; + message: ShinkaiMessage; + }; +} + +interface ReceiveAllInboxesAction { + type: typeof RECEIVE_ALL_INBOXES_FOR_PROFILE; + payload?: string[]; +} + +type MessagesAction = + | InboxMessagesAction + | AddMessageAction + | ReceiveAllInboxesAction; + +export const messagesReducer = ( + state = messagesState, + action: MessagesAction +): MessagesState => { + switch (action.type) { + case RECEIVE_UNREAD_MESSAGES_FROM_INBOX: { + if (!action.payload) { + return state; + } + const { inboxId, messages } = action.payload; + const currentMessages = state.inboxes[inboxId] || []; + const currentMessageHashes = state.messageHashes[inboxId] || {}; + + const uniqueNewMessages = messages.filter((msg: ShinkaiMessage) => { + const hash = calculateMessageHash(msg); + if (currentMessageHashes[hash]) { + return false; + } else { + currentMessageHashes[hash] = true; + return true; + } + }); + + return { + ...state, + inboxes: { + ...state.inboxes, + [inboxId]: [...currentMessages, ...uniqueNewMessages], + }, + messageHashes: { + ...state.messageHashes, + [inboxId]: currentMessageHashes, + }, + }; + } + case RECEIVE_LOAD_MORE_MESSAGES_FROM_INBOX: { + if (!action.payload) { + return state; + } + const { inboxId, messages } = action.payload; + const currentMessages = state.inboxes[inboxId] || []; + const currentMessageHashes = state.messageHashes[inboxId] || {}; + + const uniqueNewMessages = messages.filter((msg: ShinkaiMessage) => { + const hash = calculateMessageHash(msg); + if (currentMessageHashes[hash]) { + return false; + } else { + currentMessageHashes[hash] = true; + return true; + } + }); + + return { + ...state, + inboxes: { + ...state.inboxes, + [inboxId]: [...currentMessages, ...uniqueNewMessages], + }, + messageHashes: { + ...state.messageHashes, + [inboxId]: currentMessageHashes, + }, + }; + } + case RECEIVE_LAST_MESSAGES_FROM_INBOX: { + if (!action.payload) { + return state; + } + const { inboxId, messages } = action.payload; + const currentMessages = state.inboxes[inboxId] || []; + const currentMessageHashes = state.messageHashes[inboxId] || {}; + + console.log("RECEIVE_LAST_MESSAGES_FROM_INBOX> currentMessageHashes: ", currentMessageHashes); + console.log("RECEIVE_LAST_MESSAGES_FROM_INBOX> new messages: ", messages); + const uniqueNewMessages = messages.filter((msg: ShinkaiMessage) => { + const hash = calculateMessageHash(msg); + if (currentMessageHashes[hash]) { + return false; + } else { + currentMessageHashes[hash] = true; + return true; + } + }); + + return { + ...state, + inboxes: { + ...state.inboxes, + [inboxId]: [...currentMessages, ...uniqueNewMessages], + }, + messageHashes: { + ...state.messageHashes, + [inboxId]: currentMessageHashes, + }, + }; + } + case ADD_MESSAGE_TO_INBOX: { + console.log("ADD_MESSAGE_TO_INBOX"); + console.log("action.payload: ", action.payload); + if (!action.payload) { + return state; + } + const { inboxId, message } = action.payload; + const currentMessages = state.inboxes[inboxId] || []; + const currentMessageHashes = state.messageHashes[inboxId] || new Set(); + + const hash = calculateMessageHash(message); + if (currentMessageHashes[hash]) { + // If the message is a duplicate, don't add it + return state; + } else { + // If the message is unique, add it to the inbox and the hash to the set + currentMessageHashes[hash] = true; + return { + ...state, + inboxes: { + ...state.inboxes, + [inboxId]: [message, ...currentMessages], + }, + messageHashes: { + ...state.messageHashes, + [inboxId]: currentMessageHashes, + }, + }; + } + } + case RECEIVE_ALL_INBOXES_FOR_PROFILE: { + if (!action.payload) { + return state; + } + const newInboxes: { [key: string]: any } = action.payload; + if (typeof newInboxes !== "object") { + console.error( + "Invalid payload for RECEIVE_ALL_INBOXES_FOR_PROFILE: ", + newInboxes + ); + return state; + } + return { + ...state, + inboxes: { + ...state.inboxes, + ...Object.keys(newInboxes).reduce( + (result: { [key: string]: any[] }, key) => { + // Only initialize the inbox if it doesn't already exist in the state + if (!state.inboxes[key]) { + console.log("value for key: ", newInboxes[key]); + result[newInboxes[key]] = []; + } else { + // If the inbox already exists, keep the current messages + result[newInboxes[key]] = state.inboxes[key]; + } + return result; + }, + {} + ), + }, + }; + } + default: + return state; + } +}; diff --git a/shinkai-app/src/store/reducers/otherReducer.ts b/shinkai-app/src/store/reducers/otherReducer.ts new file mode 100644 index 000000000..300703f15 --- /dev/null +++ b/shinkai-app/src/store/reducers/otherReducer.ts @@ -0,0 +1,87 @@ +import { SerializedAgent } from "../../models/SchemaTypes"; +import { + ADD_AGENTS, + Action, + CLEAR_REGISTRATION_CODE, + CLEAR_STORE, + CREATE_REGISTRATION_CODE, + GET_PUBLIC_KEY, + PING_ALL, + RECEIVE_ALL_INBOXES_FOR_PROFILE, + REGISTRATION_ERROR, + USE_REGISTRATION_CODE, +} from "../types"; + +export interface OtherState { + registrationCode: string; + publicKey: string; + registrationStatus: boolean; + pingResult: string; + error: string | null; + agents: { + [agentId: string]: SerializedAgent; + }; + just_inboxes: string[]; +} + +const initialState: OtherState = { + publicKey: "", + registrationStatus: false, + pingResult: "", + registrationCode: "", + error: null, + agents: {}, + just_inboxes: [], +}; + +const otherReducer = (state = initialState, action: Action): OtherState => { + switch (action.type) { + case USE_REGISTRATION_CODE: + return { + ...state, + registrationStatus: true, + }; + case RECEIVE_ALL_INBOXES_FOR_PROFILE: { + const newInboxes = action.payload; + if (!Array.isArray(newInboxes)) { + console.error( + "Invalid payload for RECEIVE_ALL_INBOXES_FOR_PROFILE: ", + newInboxes + ); + return state; + } + return { + ...state, + just_inboxes: newInboxes, + }; + } + case GET_PUBLIC_KEY: + return { ...state, publicKey: action.payload }; + case ADD_AGENTS: { + const newAgents = action.payload; + const updatedAgents = { ...state.agents }; + newAgents.forEach((agent: SerializedAgent) => { + updatedAgents[agent.id] = agent; + }); + return { + ...state, + agents: updatedAgents, + }; + } + case CREATE_REGISTRATION_CODE: + return { ...state, registrationCode: action.payload }; + case REGISTRATION_ERROR: + return { ...state, error: action.payload }; + case CLEAR_REGISTRATION_CODE: + return { ...state, registrationCode: "" }; + case PING_ALL: + return { ...state, pingResult: action.payload }; + case CLEAR_STORE: + state = initialState; + return state; + default: + return state; + } +}; + +export default otherReducer; diff --git a/shinkai-app/src/store/reducers/setupDetailsReducer.ts b/shinkai-app/src/store/reducers/setupDetailsReducer.ts new file mode 100644 index 000000000..cad581300 --- /dev/null +++ b/shinkai-app/src/store/reducers/setupDetailsReducer.ts @@ -0,0 +1,57 @@ +import { USE_REGISTRATION_CODE } from "../types"; +import { Base58String } from "../../models/QRSetupData"; + +export type SetupDetailsState = { + profile: string; + permission_type: string; + registration_name: string; + node_address: string; + shinkai_identity: string; + node_encryption_pk: Base58String; + node_signature_pk: Base58String; + profile_encryption_sk: Base58String; + profile_encryption_pk: Base58String; + profile_identity_sk: Base58String; + profile_identity_pk: Base58String; + my_device_encryption_sk: Base58String; + my_device_encryption_pk: Base58String; + my_device_identity_sk: Base58String; + my_device_identity_pk: Base58String; +}; + +const setupInitialState: SetupDetailsState = { + profile: "", + permission_type: "", + registration_name: "", + node_address: "", + shinkai_identity: "", + node_encryption_pk: "", + node_signature_pk: "", + profile_encryption_sk: "", + profile_encryption_pk: "", + profile_identity_sk: "", + profile_identity_pk: "", + my_device_encryption_sk: "", + my_device_encryption_pk: "", + my_device_identity_sk: "", + my_device_identity_pk: "", +}; + +interface SetupDetailsAction { + type: typeof USE_REGISTRATION_CODE; + payload?: SetupDetailsState; +} + +export const setupDetailsReducer = ( + state = setupInitialState, + action: SetupDetailsAction +): SetupDetailsState => { + switch (action.type) { + case USE_REGISTRATION_CODE: + const newState = action.payload ? action.payload : state; + console.log("New state: ", newState); + return newState; + default: + return state; + } +}; diff --git a/shinkai-app/src/store/types.ts b/shinkai-app/src/store/types.ts index ffea88d27..44c1dc788 100644 --- a/shinkai-app/src/store/types.ts +++ b/shinkai-app/src/store/types.ts @@ -5,6 +5,7 @@ export const REGISTRATION_ERROR = 'REGISTRATION_ERROR'; export const PING_ALL = 'PING_ALL'; export const CLEAR_REGISTRATION_CODE = 'CLEAR_REGISTRATION_CODE'; export const RECEIVE_LAST_MESSAGES_FROM_INBOX = "RECEIVE_LAST_MESSAGES_FROM_INBOX"; +export const RECEIVE_UNREAD_MESSAGES_FROM_INBOX = "RECEIVE_UNREAD_MESSAGES_FROM_INBOX"; export const RECEIVE_LOAD_MORE_MESSAGES_FROM_INBOX = "RECEIVE_LOAD_MORE_MESSAGES_FROM_INBOX"; export const CLEAR_STORE = 'CLEAR_STORE'; export const ADD_MESSAGE_TO_INBOX = 'ADD_MESSAGE_TO_INBOX'; diff --git a/shinkai-app/src/types.d.ts b/shinkai-app/src/types.d.ts index e8e550d94..6ffbaa30c 100644 --- a/shinkai-app/src/types.d.ts +++ b/shinkai-app/src/types.d.ts @@ -5,3 +5,8 @@ export type AppThunk = ThunkAction< Action >; +declare global { + interface Window { + __REDUX_DEVTOOLS_EXTENSION_COMPOSE__?: typeof compose; + } +} diff --git a/shinkai-message-wasm/src/shinkai_utils/shinkai_message_builder.rs b/shinkai-message-wasm/src/shinkai_utils/shinkai_message_builder.rs index a471327c8..a4ebbebb8 100644 --- a/shinkai-message-wasm/src/shinkai_utils/shinkai_message_builder.rs +++ b/shinkai-message-wasm/src/shinkai_utils/shinkai_message_builder.rs @@ -26,7 +26,7 @@ use crate::{ }; use super::{ - encryption::{clone_static_secret_key, encryption_secret_key_to_string}, + encryption::{clone_static_secret_key, encryption_secret_key_to_string, unsafe_deterministic_encryption_keypair}, signatures::{clone_signature_secret_key, signature_secret_key_to_string}, }; @@ -468,6 +468,38 @@ impl ShinkaiMessageBuilder { .build() } + pub fn job_message_from_agent( + job_id: String, + content: String, + my_signature_secret_key: SignatureStaticKey, + node_sender: ProfileName, + node_receiver: ProfileName, + ) -> Result { + let job_id_clone = job_id.clone(); + let job_message = JobMessage { job_id, content }; + let body = serde_json::to_string(&job_message).map_err(|_| "Failed to serialize job message to JSON")?; + + let inbox = InboxName::get_job_inbox_name_from_params(job_id_clone) + .map_err(|_| "Failed to get job inbox name")? + .to_string(); + + // Use for placeholder. These messages *are not* encrypted so it's not required + let (placeholder_encryption_sk, placeholder_encryption_pk) = unsafe_deterministic_encryption_keypair(0); + + ShinkaiMessageBuilder::new(placeholder_encryption_sk, my_signature_secret_key, placeholder_encryption_pk) + .message_raw_content(body) + .internal_metadata_with_schema( + "".to_string(), + "".to_string(), + inbox, + MessageSchemaType::JobMessageSchema, + EncryptionMethod::None, + ) + .body_encryption(EncryptionMethod::None) + .external_metadata(node_receiver, node_sender) + .build() + } + pub fn terminate_message( my_encryption_secret_key: EncryptionStaticKey, my_signature_secret_key: SignatureStaticKey, @@ -597,7 +629,11 @@ impl ShinkaiMessageBuilder { sender: ProfileName, receiver: ProfileName, ) -> Result { - ShinkaiMessageBuilder::new(my_subidentity_encryption_sk, my_subidentity_signature_sk, receiver_public_key) + ShinkaiMessageBuilder::new( + my_subidentity_encryption_sk, + my_subidentity_signature_sk, + receiver_public_key, + ) .message_raw_content(full_profile) .internal_metadata_with_schema( sender_subidentity, diff --git a/src/db/db.rs b/src/db/db.rs index 669b99e63..67a41c677 100644 --- a/src/db/db.rs +++ b/src/db/db.rs @@ -29,6 +29,7 @@ pub enum Topic { VectorResources, Agents, Toolkits, + MessagesToRetry, } impl Topic { @@ -53,6 +54,7 @@ impl Topic { Self::VectorResources => "resources", Self::Agents => "agents", Self::Toolkits => "toolkits", + Self::MessagesToRetry => "mesages_to_retry" } } } diff --git a/src/db/db_inbox.rs b/src/db/db_inbox.rs index c0d50e061..663350525 100644 --- a/src/db/db_inbox.rs +++ b/src/db/db_inbox.rs @@ -5,9 +5,12 @@ use shinkai_message_wasm::{ shinkai_message::shinkai_message::ShinkaiMessage, }; -use crate::schemas::{ - identity::{IdentityType, StandardIdentity}, - inbox_permission::InboxPermission, +use crate::{ + schemas::{ + identity::{IdentityType, StandardIdentity}, + inbox_permission::InboxPermission, + }, + utils::logging_helpers::print_content_time_messages, }; use super::{db::Topic, db_errors::ShinkaiDBError, ShinkaiDB}; @@ -125,10 +128,10 @@ impl ShinkaiDB { &self, inbox_name: String, n: usize, - offset_key: Option, + until_offset_key: Option, ) -> Result, ShinkaiDBError> { println!("Getting last {} messages from inbox: {}", n, inbox_name); - println!("Offset key: {:?}", offset_key); + println!("Offset key: {:?}", until_offset_key); println!("n: {:?}", n); // Fetch the column family for the specified inbox @@ -146,8 +149,7 @@ impl ShinkaiDB { let messages_cf = self.db.cf_handle(Topic::AllMessages.as_str()).unwrap(); // Create an iterator for the specified inbox - // Create an iterator for the specified inbox - let mut iter = match &offset_key { + let mut iter = match &until_offset_key { Some(offset_key) => self.db.iterator_cf( inbox_cf, rocksdb::IteratorMode::From(offset_key.as_bytes(), rocksdb::Direction::Reverse), @@ -155,7 +157,7 @@ impl ShinkaiDB { None => self.db.iterator_cf(inbox_cf, rocksdb::IteratorMode::End), }; - let mut skip_first = offset_key.is_some(); + let mut skip_first = until_offset_key.is_some(); let mut messages = Vec::new(); for item in iter.take(n) { // Skip the first entry if an offset_key was provided @@ -163,13 +165,13 @@ impl ShinkaiDB { skip_first = false; continue; } - + // Handle the Result returned by the iterator match item { Ok((_, value)) => { // The value of the inbox CF is the key in the AllMessages CF let message_key = value.to_vec(); - + // Fetch the message from the AllMessages CF match self.db.get_cf(messages_cf, &message_key)? { Some(bytes) => { @@ -182,11 +184,12 @@ impl ShinkaiDB { Err(e) => return Err(e.into()), } } - eprintln!("Inbox {} Messages: {:?}", inbox_name, messages); + messages.reverse(); + print_content_time_messages(messages.clone()); Ok(messages) } - pub fn mark_as_read_up_to(&mut self, inbox_name: String, up_to_time: String) -> Result<(), ShinkaiDBError> { + pub fn mark_as_read_up_to(&mut self, inbox_name: String, up_to_offset: String) -> Result<(), ShinkaiDBError> { // Fetch the column family for the specified unread_list let cf_name_unread_list = format!("{}_unread_list", inbox_name); let unread_list_cf = match self.db.cf_handle(&cf_name_unread_list) { @@ -198,14 +201,11 @@ impl ShinkaiDB { ))) } }; - + // Create an iterator for the specified unread_list, starting from the beginning let iter = self.db.iterator_cf(unread_list_cf, rocksdb::IteratorMode::Start); - - // Convert up_to_time to &str - let up_to_time = up_to_time.as_str(); - - // Iterate through the unread_list and delete all messages up to the specified time + + // Iterate through the unread_list and delete all messages up to the specified offset for item in iter { // Handle the Result returned by the iterator match item { @@ -214,24 +214,19 @@ impl ShinkaiDB { Ok(s) => s, Err(_) => return Err(ShinkaiDBError::SomeError("UTF-8 conversion error".to_string())), }; - - // Split the key_str to separate timestamp and hash - let mut split_key = key_str.splitn(2, ':'); - - if let Some(timestamp_str) = split_key.next() { - if timestamp_str <= up_to_time { - // Delete the message from the unread_list - self.db.delete_cf(unread_list_cf, key)?; - } else { - // We've passed the up_to_time, so we can break the loop - break; - } + + if key_str <= up_to_offset { + // Delete the message from the unread_list + self.db.delete_cf(unread_list_cf, key)?; + } else { + // We've passed the up_to_offset, so we can break the loop + break; } } Err(e) => return Err(e.into()), } } - + Ok(()) } @@ -239,7 +234,7 @@ impl ShinkaiDB { &self, inbox_name: String, n: usize, - offset_key: Option, + from_offset_key: Option, ) -> Result, ShinkaiDBError> { // Fetch the column family for the specified unread_list let cf_name_unread_list = format!("{}_unread_list", inbox_name); @@ -252,38 +247,55 @@ impl ShinkaiDB { ))) } }; - + // Fetch the column family for all messages let messages_cf = self.db.cf_handle(Topic::AllMessages.as_str()).unwrap(); - + // Create an iterator for the specified unread_list - let mut iter = match &offset_key { - Some(offset_key) => self.db.iterator_cf( + let mut iter = match &from_offset_key { + Some(from_key) => self.db.iterator_cf( unread_list_cf, - rocksdb::IteratorMode::From(offset_key.as_bytes(), rocksdb::Direction::Reverse), + rocksdb::IteratorMode::From(from_key.as_bytes(), rocksdb::Direction::Forward), ), - None => self.db.iterator_cf(unread_list_cf, rocksdb::IteratorMode::End), + None => self.db.iterator_cf(unread_list_cf, rocksdb::IteratorMode::Start), }; - - let mut skip_first = offset_key.is_some(); + + let offset_hash = match &from_offset_key { + Some(offset_key) => { + let split: Vec<&str> = offset_key.split(":::").collect(); + if split.len() < 2 { + return Err(ShinkaiDBError::SomeError("Invalid offset key format".to_string())); + } + Some(split[1].to_string()) + } + None => None, + }; + let mut messages = Vec::new(); + let mut first_message = true; for item in iter.take(n) { - // Skip the first entry if an offset_key was provided - if skip_first { - skip_first = false; - continue; - } - // Handle the Result returned by the iterator match item { Ok((_, value)) => { // The value of the unread_list CF is the key in the AllMessages CF let message_key = value.to_vec(); - + // Fetch the message from the AllMessages CF match self.db.get_cf(messages_cf, &message_key)? { Some(bytes) => { let message = ShinkaiMessage::decode_message_result(bytes)?; + + // Check if the message hash matches the offset's + if first_message { + if let Some(offset_hash) = &offset_hash { + if message.calculate_message_hash() == *offset_hash { + first_message = false; + continue; + } + } + first_message = false; + } + messages.push(message); } None => return Err(ShinkaiDBError::MessageNotFound), @@ -292,7 +304,8 @@ impl ShinkaiDB { Err(e) => return Err(e.into()), } } - + + print_content_time_messages(messages.clone()); Ok(messages) } @@ -465,7 +478,10 @@ impl ShinkaiDB { } } - pub fn get_inboxes_for_profile(&self, profile_name_identity: StandardIdentity) -> Result, ShinkaiDBError> { + pub fn get_inboxes_for_profile( + &self, + profile_name_identity: StandardIdentity, + ) -> Result, ShinkaiDBError> { // Fetch the column family for the 'inbox' topic let cf_inbox = match self.db.cf_handle(Topic::Inbox.as_str()) { Some(cf) => cf, diff --git a/src/db/db_jobs.rs b/src/db/db_jobs.rs index 4b717ef2a..81749ac4b 100644 --- a/src/db/db_jobs.rs +++ b/src/db/db_jobs.rs @@ -50,6 +50,7 @@ impl ShinkaiDB { let cf_job_id_name = format!("jobtopic_{}", &job_id); let cf_conversation_inbox_name = format!("job_inbox::{}::false", &job_id); let cf_job_id_perms_name = format!("job_inbox::{}::false_perms", &job_id); + let cf_job_id_unread_list_name = format!("job_inbox::{}::false_unread_list", &job_id); // Check that the profile name exists in ProfilesIdentityKey, ProfilesEncryptionKey and ProfilesIdentityType if self.db.cf_handle(&cf_job_id_scope_name).is_some() @@ -57,6 +58,7 @@ impl ShinkaiDB { || self.db.cf_handle(&cf_job_id_name).is_some() || self.db.cf_handle(&cf_conversation_inbox_name).is_some() || self.db.cf_handle(&cf_job_id_perms_name).is_some() + || self.db.cf_handle(&cf_job_id_unread_list_name).is_some() { return Err(ShinkaiDBError::ProfileNameAlreadyExists); } @@ -70,6 +72,7 @@ impl ShinkaiDB { self.db.create_cf(&cf_job_id_step_history_name, &cf_opts)?; self.db.create_cf(&cf_conversation_inbox_name, &cf_opts)?; self.db.create_cf(&cf_job_id_perms_name, &cf_opts)?; + self.db.create_cf(&cf_job_id_unread_list_name, &cf_opts)?; // Start a write batch let mut batch = WriteBatch::default(); @@ -293,7 +296,7 @@ impl ShinkaiDB { let cf_handle = self .db .cf_handle(&cf_conversation_inbox_name) - .ok_or(ShinkaiDBError::ColumnFamilyNotFound(cf_conversation_inbox_name))?; + .ok_or(ShinkaiDBError::ColumnFamilyNotFound(cf_conversation_inbox_name.clone()))?; // Insert the message to AllMessages column family self.insert_message_to_all(message)?; @@ -310,9 +313,21 @@ impl ShinkaiDB { // Create the composite key by concatenating the time_key and the hash_key, with a separator let composite_key = format!("{}:::{}", time_key, hash_key); + // Start a write batch + let mut batch = WriteBatch::default(); + // Use the composite_key as the key and hash_key as the value in the inbox - self.db - .put_cf(cf_handle, composite_key.as_bytes(), hash_key.as_bytes())?; + batch.put_cf(cf_handle, composite_key.as_bytes(), hash_key.as_bytes()); + + // Add the message to the unread_list inbox + let cf_unread_list = self + .db + .cf_handle(&format!("{}_unread_list", cf_conversation_inbox_name)) + .expect("Failed to get cf handle for unread_list"); + batch.put_cf(cf_unread_list, composite_key.as_bytes(), hash_key.as_bytes()); + + // Write the batch + self.db.write(batch)?; Ok(()) } diff --git a/src/managers/agent.rs b/src/managers/agent.rs index 2e05a65c2..db59b2d56 100644 --- a/src/managers/agent.rs +++ b/src/managers/agent.rs @@ -1,7 +1,13 @@ use crate::managers::providers::Provider; use reqwest::Client; use serde::{Deserialize, Serialize}; -use shinkai_message_wasm::{shinkai_message::shinkai_message_schemas::{JobPreMessage, JobRecipient}, schemas::{shinkai_name::ShinkaiName, agents::serialized_agent::{AgentAPIModel, SerializedAgent}}}; +use shinkai_message_wasm::{ + schemas::{ + agents::serialized_agent::{AgentAPIModel, SerializedAgent}, + shinkai_name::ShinkaiName, + }, + shinkai_message::shinkai_message_schemas::{JobPreMessage, JobRecipient}, +}; use std::fmt; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; @@ -10,7 +16,7 @@ use tokio::sync::{mpsc, Mutex}; pub struct Agent { pub id: String, pub full_identity_name: ShinkaiName, - pub job_manager_sender: mpsc::Sender>, + pub job_manager_sender: mpsc::Sender<(Vec, String)>, pub agent_receiver: Arc>>, pub client: Client, pub perform_locally: bool, // flag to perform computation locally or not @@ -26,7 +32,7 @@ impl Agent { pub fn new( id: String, full_identity_name: ShinkaiName, - job_manager_sender: mpsc::Sender>, + job_manager_sender: mpsc::Sender<(Vec, String)>, perform_locally: bool, external_url: Option, api_key: Option, @@ -62,18 +68,30 @@ impl Agent { match &self.model { AgentAPIModel::OpenAI(openai) => { openai - .call_api(&self.client, self.external_url.as_ref(), self.api_key.as_ref(), content, context) + .call_api( + &self.client, + self.external_url.as_ref(), + self.api_key.as_ref(), + content, + context, + ) .await } AgentAPIModel::Sleep(sleep_api) => { sleep_api - .call_api(&self.client, self.external_url.as_ref(), self.api_key.as_ref(), content, context) + .call_api( + &self.client, + self.external_url.as_ref(), + self.api_key.as_ref(), + content, + context, + ) .await } } } - pub async fn process_locally(&self, content: String, context: Vec) { + pub async fn process_locally(&self, content: String, context: Vec, job_id: String) { // Here we run our GPU-intensive task on a separate thread let handle = tokio::task::spawn_blocking(move || { // perform GPU-intensive work @@ -88,24 +106,27 @@ impl Agent { match result { Ok(response) => { // create ShinkaiMessage based on result and send to AgentManager - let _ = self.job_manager_sender.send(response).await; + let _ = self.job_manager_sender.send((response, job_id)).await; } Err(e) => eprintln!("Error in processing message: {:?}", e), } } - pub async fn execute(&self, content: String, context: Vec) { + pub async fn execute(&self, content: String, context: Vec, job_id: String) { if self.perform_locally { // No need to spawn a new task here - self.process_locally(content.clone(), context.clone()).await; + self.process_locally(content.clone(), context.clone(), job_id).await; } else { // Call external API let response = self.call_external_api(&content.clone(), context.clone()).await; match response { Ok(message) => { // Send the message to AgentManager - println!("Sending message to AgentManager {:?} with context: {:?}", message, context); - match self.job_manager_sender.send(message).await { + println!( + "Sending message to AgentManager {:?} with context: {:?}", + message, context + ); + match self.job_manager_sender.send((message, job_id.clone())).await { Ok(_) => println!("Message sent successfully"), Err(e) => eprintln!("Error when sending message: {}", e), } @@ -113,11 +134,18 @@ impl Agent { Err(e) => eprintln!("Error when calling API: {}", e), } } + // TODO: For debugging + // // Check if the sender is still connected to the channel + // if self.job_manager_sender.is_closed() { + // eprintln!("Sender is closed"); + // } else { + // println!("Sender is still connected"); + // } } } impl Agent { - pub fn from_serialized_agent(serialized_agent: SerializedAgent, sender: mpsc::Sender>) -> Self { + pub fn from_serialized_agent(serialized_agent: SerializedAgent, sender: mpsc::Sender<(Vec, String)>) -> Self { Self::new( serialized_agent.id, serialized_agent.full_identity_name, @@ -169,7 +197,7 @@ impl From for AgentError { mod tests { use super::*; use mockito::Server; - use shinkai_message_wasm::schemas::agents::serialized_agent::{SleepAPI, OpenAI}; + use shinkai_message_wasm::schemas::agents::serialized_agent::{OpenAI, SleepAPI}; use tokio::sync::mpsc; #[tokio::test] @@ -191,7 +219,10 @@ mod tests { let context = vec![String::from("context1"), String::from("context2")]; assert_eq!(agent.id, "1"); - assert_eq!(agent.full_identity_name, ShinkaiName::new("@@alice.shinkai/profileName/agent/myChatGPTAgent".to_string()).unwrap()); + assert_eq!( + agent.full_identity_name, + ShinkaiName::new("@@alice.shinkai/profileName/agent/myChatGPTAgent".to_string()).unwrap() + ); assert_eq!(agent.perform_locally, false); assert_eq!(agent.external_url, Some("http://localhost:8000".to_string())); assert_eq!(agent.toolkit_permissions, vec!["tk1".to_string(), "tk2".to_string()]); @@ -205,7 +236,7 @@ mod tests { ); tokio::spawn(async move { - agent.execute("Test".to_string(), context).await; + agent.execute("Test".to_string(), context, "some_job_1".to_string()).await; }); let val = tokio::time::timeout(std::time::Duration::from_millis(600), rx.recv()).await; @@ -216,7 +247,7 @@ mod tests { }; match val { - Ok(Some(response)) => assert_eq!(response.first().unwrap(), &expected_resp), + Ok(Some(response)) => assert_eq!(response.0.first().unwrap(), &expected_resp), Ok(None) => panic!("Channel is empty"), Err(_) => panic!("Timeout exceeded"), } diff --git a/src/managers/identity_manager.rs b/src/managers/identity_manager.rs index 4a9176735..9287c792c 100644 --- a/src/managers/identity_manager.rs +++ b/src/managers/identity_manager.rs @@ -54,7 +54,7 @@ impl IdentityManager { { let db = db.lock().await; db.debug_print_all_keys_for_profiles_identity_key(); - println!("identities_manager identities: {:?}", identities); + // println!("identities_manager identities: {:?}", identities); } identities.extend(agents); @@ -205,7 +205,7 @@ impl IdentityManager { } pub fn get_all_subidentities(&self) -> Vec { - println!("identities_manager identities: {:?}", self.local_identities); + // println!("identities_manager identities: {:?}", self.local_identities); self.local_identities.clone() } @@ -215,7 +215,7 @@ impl IdentityManager { } pub fn find_by_identity_name(&self, full_profile_name: ShinkaiName) -> Option<&Identity> { - println!("identities_manager identities: {:?}", self.local_identities); + // println!("identities_manager identities: {:?}", self.local_identities); self.local_identities.iter().find(|identity| { match identity { Identity::Standard(identity) => identity.full_identity_name == full_profile_name, @@ -276,7 +276,7 @@ impl IdentityManager { original_message: &ShinkaiMessage, decrypted_message: &ShinkaiMessage, ) -> Result<(), NodeError> { - eprintln!("signature check > sender_subidentity: {:?}", sender_subidentity); + // eprintln!("signature check > sender_subidentity: {:?}", sender_subidentity); if sender_subidentity.is_none() { eprintln!( "signature check > Subidentity not found for profile name: {}", @@ -291,7 +291,7 @@ impl IdentityManager { } // If we reach this point, it means that subidentity exists, so it's safe to unwrap let subidentity = sender_subidentity.unwrap(); - eprintln!("signature check > subidentity: {:?}", subidentity); + // eprintln!("signature check > subidentity: {:?}", subidentity); // Validate that the message actually came from the subidentity let signature_public_key = match &subidentity { diff --git a/src/managers/job_manager.rs b/src/managers/job_manager.rs index 5b61171fe..e8e9578f3 100644 --- a/src/managers/job_manager.rs +++ b/src/managers/job_manager.rs @@ -1,12 +1,17 @@ use crate::db::{db_errors::ShinkaiDBError, ShinkaiDB}; use chrono::Utc; +use ed25519_dalek::{PublicKey as SignaturePublicKey, SecretKey as SignatureStaticKey}; use reqwest::Identity; use shinkai_message_wasm::{ - schemas::{inbox_name::InboxName, shinkai_name::{ShinkaiName, ShinkaiNameError}}, + schemas::{ + inbox_name::InboxName, + shinkai_name::{ShinkaiName, ShinkaiNameError}, + }, shinkai_message::{ shinkai_message::{MessageBody, MessageData, ShinkaiMessage}, - shinkai_message_schemas::{JobCreation, JobMessage, JobPreMessage, JobScope, MessageSchemaType}, + shinkai_message_schemas::{JobCreation, JobMessage, JobPreMessage, JobRecipient, JobScope, MessageSchemaType}, }, + shinkai_utils::{shinkai_message_builder::ShinkaiMessageBuilder, signatures::clone_signature_secret_key}, ShinkaiMessageWrapper, }; use std::result::Result::Ok; @@ -14,6 +19,7 @@ use std::{collections::HashMap, error::Error, sync::Arc}; use std::{fmt, thread}; use tokio::sync::{mpsc, Mutex}; use warp::path::full; +use x25519_dalek::{PublicKey as EncryptionPublicKey, StaticSecret as EncryptionStaticKey}; use super::{agent::Agent, IdentityManager}; @@ -73,19 +79,32 @@ impl JobLike for Job { } } +type JobId = String; + pub struct JobManager { pub agent_manager: Arc>, - pub job_manager_receiver: Option>>, + pub job_manager_receiver: Arc, JobId)>>>, + pub job_manager_sender: mpsc::Sender<(Vec, JobId)>, + pub identity_secret_key: SignatureStaticKey, + pub node_profile_name: ShinkaiName, } impl JobManager { - pub async fn new(db: Arc>, identity_manager: Arc>) -> Self { + pub async fn new( + db: Arc>, + identity_manager: Arc>, + identity_secret_key: SignatureStaticKey, + node_profile_name: ShinkaiName, + ) -> Self { let (job_manager_sender, job_manager_receiver) = tokio::sync::mpsc::channel(100); - let agent_manager = AgentManager::new(db, identity_manager, job_manager_sender).await; + let agent_manager = AgentManager::new(db, identity_manager, job_manager_sender.clone()).await; let mut job_manager = Self { agent_manager: Arc::new(Mutex::new(agent_manager)), - job_manager_receiver: Some(job_manager_receiver), + job_manager_receiver: Arc::new(Mutex::new(job_manager_receiver)), + job_manager_sender: job_manager_sender.clone(), + identity_secret_key, + node_profile_name, }; job_manager.process_received_messages().await; job_manager @@ -101,21 +120,33 @@ impl JobManager { } pub async fn process_received_messages(&mut self) { - if let Some(mut receiver) = self.job_manager_receiver.take() { - let agent_manager = Arc::clone(&self.agent_manager); - tokio::spawn(async move { - while let Some(messages) = receiver.recv().await { - println!("process_received_messages> messages: {:?}", messages); - for message in messages { - let mut agent_manager = agent_manager.lock().await; - println!("calling handle_pre_message_schema> message: {:?}", message); - if let Err(err) = agent_manager.handle_pre_message_schema(message).await { + let agent_manager = Arc::clone(&self.agent_manager); + let receiver = Arc::clone(&self.job_manager_receiver); + let node_profile_name_clone = self.node_profile_name.clone(); + let identity_secret_key_clone = clone_signature_secret_key(&self.identity_secret_key); + tokio::spawn(async move { + while let Some((messages, job_id)) = receiver.lock().await.recv().await { + for message in messages { + let mut agent_manager = agent_manager.lock().await; + + let shinkai_message_result = ShinkaiMessageBuilder::job_message_from_agent( + job_id.clone(), + message.clone().content, + clone_signature_secret_key(&identity_secret_key_clone), + node_profile_name_clone.clone().to_string(), + node_profile_name_clone.clone().to_string(), + ); + + if let Ok(shinkai_message) = shinkai_message_result { + if let Err(err) = agent_manager.handle_pre_message_schema(message, job_id.clone(), shinkai_message).await { eprintln!("Error while handling pre message schema: {:?}", err); } + } else if let Err(err) = shinkai_message_result { + eprintln!("Error while building ShinkaiMessage: {:?}", err); } } - }); - } + } + }); } pub async fn decision_phase(&self, job: &dyn JobLike) -> Result<(), Box> { @@ -134,7 +165,7 @@ pub struct AgentManager { jobs: Arc>>>, db: Arc>, identity_manager: Arc>, - job_manager_sender: mpsc::Sender>, + job_manager_sender: mpsc::Sender<(Vec, JobId)>, agents: Vec>>, } @@ -142,7 +173,7 @@ impl AgentManager { pub async fn new( db: Arc>, identity_manager: Arc>, - job_manager_sender: mpsc::Sender>, + job_manager_sender: mpsc::Sender<(Vec, JobId)>, ) -> Self { let jobs_map = Arc::new(Mutex::new(HashMap::new())); { @@ -168,10 +199,11 @@ impl AgentManager { let mut job_manager = Self { jobs: jobs_map, db, - job_manager_sender, + job_manager_sender: job_manager_sender.clone(), identity_manager, agents, }; + job_manager } @@ -237,7 +269,11 @@ impl AgentManager { } } - pub async fn handle_job_message_schema(&mut self, message: ShinkaiMessage, job_message: JobMessage) -> Result { + pub async fn handle_job_message_schema( + &mut self, + message: ShinkaiMessage, + job_message: JobMessage, + ) -> Result { if let Some(job) = self.jobs.lock().await.get(&job_message.job_id) { let job = job.clone(); let mut shinkai_db = self.db.lock().await; @@ -253,9 +289,15 @@ impl AgentManager { } } - pub async fn handle_pre_message_schema(&mut self, pre_message: JobPreMessage) -> Result { - // Placeholder logic + pub async fn handle_pre_message_schema( + &mut self, + pre_message: JobPreMessage, + job_id: String, + shinkai_message: ShinkaiMessage + ) -> Result { println!("handle_pre_message_schema> pre_message: {:?}", pre_message); + + self.db.lock().await.add_message_to_job_inbox(job_id.as_str(), &shinkai_message)?; Ok(String::new()) } @@ -267,7 +309,8 @@ impl AgentManager { let message_type = data.message_content_schema; match message_type { MessageSchemaType::JobCreationSchema => { - let agent_name = ShinkaiName::from_shinkai_message_using_recipient_subidentity(&message)?; + let agent_name = + ShinkaiName::from_shinkai_message_using_recipient_subidentity(&message)?; let agent_id = agent_name.get_agent_name().ok_or(JobManagerError::AgentNotFound)?; let job_creation: JobCreation = serde_json::from_str(&data.message_raw_content) .map_err(|_| JobManagerError::ContentParseFailed)?; @@ -281,7 +324,8 @@ impl AgentManager { MessageSchemaType::PreMessageSchema => { let pre_message: JobPreMessage = serde_json::from_str(&data.message_raw_content) .map_err(|_| JobManagerError::ContentParseFailed)?; - self.handle_pre_message_schema(pre_message).await + // TODO: we should be able to extract the job_id from the inbox + self.handle_pre_message_schema(pre_message, "".to_string(), message).await } _ => { // Handle Empty message type if needed, or return an error if it's not a valid job message @@ -304,6 +348,7 @@ impl AgentManager { let time_with_comment = format!("{}: {}", "Current datetime in RFC3339", Utc::now().to_rfc3339()); let full_job = { self.db.lock().await.get_job(job.job_id()).unwrap() }; + let job_id = job.job_id().to_string(); let mut context = full_job.step_history.clone(); context.push(time_with_comment); println!("decision_phase> context: {:?}", context); @@ -322,10 +367,14 @@ impl AgentManager { Some(agent) => { // Create a new async task where the agent's execute method will run // Note: agent execute run in a separate thread - let last_message = full_job.step_history.last().ok_or(JobManagerError::ContentParseFailed)?.clone(); + let last_message = full_job + .step_history + .last() + .ok_or(JobManagerError::ContentParseFailed)? + .clone(); tokio::spawn(async move { let mut agent = agent.lock().await; - agent.execute(last_message.to_string(), context).await; + agent.execute(last_message.to_string(), context, job_id).await; }) .await?; Ok(()) @@ -427,4 +476,4 @@ impl From for JobManagerError { fn from(err: ShinkaiNameError) -> JobManagerError { JobManagerError::ShinkaiNameError(err) } -} \ No newline at end of file +} diff --git a/src/network/node.rs b/src/network/node.rs index 5687605f3..f40e139b5 100644 --- a/src/network/node.rs +++ b/src/network/node.rs @@ -1,15 +1,15 @@ use async_channel::{Receiver, Sender}; use chashmap::CHashMap; use chrono::Utc; -use shinkai_message_wasm::schemas::agents::serialized_agent::SerializedAgent; -use shinkai_message_wasm::schemas::inbox_name::InboxNameError; -use shinkai_message_wasm::shinkai_message::shinkai_message_error::ShinkaiMessageError; use core::panic; use ed25519_dalek::{PublicKey as SignaturePublicKey, SecretKey as SignatureStaticKey}; use futures::{future::FutureExt, pin_mut, prelude::*, select}; use log::{debug, error, info, trace, warn}; +use shinkai_message_wasm::schemas::agents::serialized_agent::SerializedAgent; +use shinkai_message_wasm::schemas::inbox_name::InboxNameError; use shinkai_message_wasm::schemas::shinkai_name::ShinkaiName; use shinkai_message_wasm::shinkai_message::shinkai_message::ShinkaiMessage; +use shinkai_message_wasm::shinkai_message::shinkai_message_error::ShinkaiMessageError; use shinkai_message_wasm::shinkai_message::shinkai_message_schemas::{ IdentityPermissions, JobToolCall, RegistrationCodeType, }; @@ -24,8 +24,8 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; use x25519_dalek::{PublicKey as EncryptionPublicKey, StaticSecret as EncryptionStaticKey}; -use crate::db::ShinkaiDB; use crate::db::db_errors::ShinkaiDBError; +use crate::db::ShinkaiDB; use crate::managers::identity_manager::{self}; use crate::managers::job_manager::{JobManager, JobManagerError}; use crate::managers::{job_manager, IdentityManager}; @@ -38,6 +38,7 @@ use super::node_api::APIError; use super::node_error::NodeError; pub enum NodeCommand { + Shutdown, // Command to make the node ping all the other nodes it knows about. PingAll, // Command to request the node's public keys for signing and encryption. The sender will receive the keys. @@ -189,7 +190,7 @@ pub enum NodeCommand { res: Sender, APIError>>, }, AvailableAgents { - full_profile_name: String, + full_profile_name: String, res: Sender, String>>, }, } @@ -222,7 +223,7 @@ pub struct Node { // The database connection for this node. pub db: Arc>, // The job manager - pub job_manager: Arc>, + pub job_manager: Option>>, } impl Node { @@ -268,9 +269,6 @@ impl Node { .await .unwrap(); let identity_manager = Arc::new(Mutex::new(subidentity_manager)); - let job_manager = Arc::new(Mutex::new( - JobManager::new(db_arc.clone(), identity_manager.clone()).await, - )); Node { node_profile_name, @@ -284,12 +282,21 @@ impl Node { commands, identity_manager, db: db_arc, - job_manager, + job_manager: None, } } // Start the node's operations. pub async fn start(&mut self) -> Result<(), NodeError> { + self.job_manager = Some(Arc::new(Mutex::new( + JobManager::new( + Arc::clone(&self.db), + Arc::clone(&self.identity_manager), + clone_signature_secret_key(&self.identity_secret_key), + self.node_profile_name.clone(), + ) + .await, + ))); let listen_future = self.listen_and_reconnect().fuse(); pin_mut!(listen_future); @@ -320,6 +327,11 @@ impl Node { // check_peers = check_peers_future => self.connect_new_peers().await?, command = commands_future => { match command { + Some(NodeCommand::Shutdown) => { + eprintln!("Shutdown command received. Stopping the node."); + info!("Shutdown command received. Stopping the node."); + break; + }, Some(NodeCommand::PingAll) => self.ping_all().await?, Some(NodeCommand::GetPeers(sender)) => self.send_peer_addresses(sender).await?, Some(NodeCommand::IdentityNameToExternalProfileData { name, res }) => self.handle_external_profile_data(name, res).await?, @@ -470,6 +482,7 @@ impl Node { peer: (SocketAddr, ProfileName), db: &mut ShinkaiDB, maybe_identity_manager: Arc>, + save_to_db_flag: bool, ) -> Result<(), NodeError> { println!("Sending {:?} to {:?}", message, peer); let address = peer.0; @@ -482,12 +495,15 @@ impl Node { stream.write_all(encoded_msg.as_ref()).await?; stream.flush().await?; info!("Sent message to {}", stream.peer_addr()?); - Node::save_to_db(true, message, my_encryption_sk, db, maybe_identity_manager).await?; + if save_to_db_flag { + Node::save_to_db(true, message, my_encryption_sk, db, maybe_identity_manager).await?; + } Ok(()) } Err(e) => { // handle the error println!("Failed to connect to {}: {}", address, e); + // TODO: it should save the message to db to retry every x^2 Ok(()) } } @@ -578,7 +594,10 @@ impl Node { maybe_db: Arc>, maybe_identity_manager: Arc>, ) -> Result<(), NodeError> { - info!("\n\n {} > Got message from {:?}", receiver_address, unsafe_sender_address); + info!( + "\n\n {} > Got message from {:?}", + receiver_address, unsafe_sender_address + ); // Extract and validate the message let message = extract_message(bytes, receiver_address)?; diff --git a/src/network/node_api_commands.rs b/src/network/node_api_commands.rs index 7779ab105..be92e5503 100644 --- a/src/network/node_api_commands.rs +++ b/src/network/node_api_commands.rs @@ -43,7 +43,7 @@ impl Node { potentially_encrypted_msg: ShinkaiMessage, schema_type: Option, ) -> Result<(ShinkaiMessage, Identity), APIError> { - println!("validate_message: {:?}", potentially_encrypted_msg); + // println!("validate_message: {:?}", potentially_encrypted_msg); // Decrypt the message body if needed let msg: ShinkaiMessage; { @@ -153,7 +153,6 @@ impl Node { }) } }; - println!("sender_name: {:?}", sender_name); // We (currently) don't proxy external messages from other nodes to other nodes if sender_name.get_node_name() != self.node_profile_name.get_node_name() { @@ -170,10 +169,10 @@ impl Node { let sender_subidentity = subidentity_manager.find_by_identity_name(sender_name).cloned(); std::mem::drop(subidentity_manager); - eprintln!( - "\n\nafter find_by_identity_name> sender_subidentity: {:?}", - sender_subidentity - ); + // eprintln!( + // "\n\nafter find_by_identity_name> sender_subidentity: {:?}", + // sender_subidentity + // ); // Check that the identity exists locally let sender_subidentity = match sender_subidentity.clone() { @@ -1282,6 +1281,7 @@ impl Node { (node_addr, recipient_profile_name_string), &mut db_guard, self.identity_manager.clone(), + true ) .await?; diff --git a/src/network/node_internal_commands.rs b/src/network/node_internal_commands.rs index 975a4173e..343976230 100644 --- a/src/network/node_internal_commands.rs +++ b/src/network/node_internal_commands.rs @@ -250,7 +250,8 @@ impl Node { sender: Identity, ) -> Result { println!("Creating new job"); - match self.job_manager.lock().await.process_job_message(shinkai_message).await { + let job_manager = self.job_manager.as_ref().expect("JobManager not initialized"); + match job_manager.lock().await.process_job_message(shinkai_message).await { Ok(job_id) => { { let inbox_name = InboxName::get_job_inbox_name_from_params(job_id.clone()).unwrap(); @@ -302,7 +303,8 @@ impl Node { } pub async fn internal_job_message(&self, shinkai_message: ShinkaiMessage) -> Result<(), NodeError> { - match self.job_manager.lock().await.process_job_message(shinkai_message).await { + let job_manager = self.job_manager.as_ref().expect("JobManager not initialized"); + match job_manager.lock().await.process_job_message(shinkai_message).await { Ok(_) => Ok(()), Err(err) => Err(NodeError { message: format!("Error with process job message: {}", err), diff --git a/src/network/node_message_handlers.rs b/src/network/node_message_handlers.rs index e7487001a..f4ecf4860 100644 --- a/src/network/node_message_handlers.rs +++ b/src/network/node_message_handlers.rs @@ -294,6 +294,7 @@ pub async fn send_ack( peer, db, maybe_identity_manager, + false ) .await?; Ok(()) @@ -338,6 +339,7 @@ pub async fn ping_pong( peer, db, maybe_identity_manager, + false, ) .await } diff --git a/src/utils/logging_helpers.rs b/src/utils/logging_helpers.rs new file mode 100644 index 000000000..8f2d32351 --- /dev/null +++ b/src/utils/logging_helpers.rs @@ -0,0 +1,17 @@ +use shinkai_message_wasm::shinkai_message::shinkai_message::{ShinkaiMessage, MessageBody, MessageData}; + +pub fn print_content_time_messages(messages: Vec) { + for message in &messages { + match &message.body { + MessageBody::Unencrypted(body) => { + match &body.message_data { + MessageData::Unencrypted(data) => { + println!("Content: {}, Time: {}", data.message_raw_content, message.external_metadata.scheduled_time); + }, + _ => println!("Message data is encrypted"), + } + }, + _ => println!("Message body is encrypted"), + } + } +} \ No newline at end of file diff --git a/src/utils/mod.rs b/src/utils/mod.rs index bb2e0ea50..e4414f950 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -3,4 +3,5 @@ pub mod environment; pub mod keys; pub mod printer; pub mod cli; -pub mod qr_code_setup; \ No newline at end of file +pub mod qr_code_setup; +pub mod logging_helpers; \ No newline at end of file diff --git a/tests/agent_integration_tests.rs b/tests/agent_integration_tests.rs index 81ecaf2a4..35d2473b1 100644 --- a/tests/agent_integration_tests.rs +++ b/tests/agent_integration_tests.rs @@ -2,7 +2,7 @@ use async_channel::{bounded, Receiver, Sender}; use shinkai_message_wasm::schemas::agents::serialized_agent::{AgentAPIModel, OpenAI, SerializedAgent}; use shinkai_message_wasm::schemas::inbox_name::InboxName; use shinkai_message_wasm::schemas::shinkai_name::ShinkaiName; -use shinkai_message_wasm::shinkai_message::shinkai_message_schemas::MessageSchemaType; +use shinkai_message_wasm::shinkai_message::shinkai_message_schemas::{JobMessage, MessageSchemaType}; use shinkai_message_wasm::shinkai_utils::encryption::{ clone_static_secret_key, unsafe_deterministic_encryption_keypair, EncryptionMethod, }; @@ -225,8 +225,15 @@ fn node_agent_registration() { .await .unwrap(); let node2_last_messages = res2_receiver.recv().await.unwrap().expect("Failed to receive messages"); - // println!("node2_last_messages: {:?}", node2_last_messages); - assert!(node2_last_messages.len() == 1); + println!("### node2_last_messages: {:?}", node2_last_messages); + let shinkai_message_content_agent = node2_last_messages[1].get_message_content().unwrap(); + let message_content_agent: JobMessage = serde_json::from_str(&shinkai_message_content_agent).unwrap(); + + assert_eq!( + message_content_agent.content, + "\n\nHello there, how may I assist you today?".to_string() + ); + assert!(node2_last_messages.len() == 2); } { // Check Profile inboxes (to confirm job's there) @@ -237,7 +244,7 @@ fn node_agent_registration() { clone_static_secret_key(&node1_profile_encryption_sk), clone_signature_secret_key(&node1_profile_identity_sk), node1_encryption_pk.clone(), - sender.clone().to_string(), + sender.clone().to_string(), "".to_string(), sender, node1_identity_name.clone().to_string(), @@ -253,6 +260,125 @@ fn node_agent_registration() { println!("node1_all_profiles: {:?}", node2_last_messages); assert!(node2_last_messages.len() == 1); } + + // Now we add more messages to properly test unread and pagination + { + // Send a Message to the Job for processing + let message = "Are you still there?".to_string(); + api_message_job( + node1_commands_sender.clone(), + clone_static_secret_key(&node1_profile_encryption_sk), + node1_encryption_pk.clone(), + clone_signature_secret_key(&node1_profile_identity_sk), + node1_identity_name.clone(), + node1_subidentity_name.clone(), + &agent_subidentity.clone(), + &job_id.clone().to_string(), + &message, + ) + .await; + + // Successfully read unread messages from job inbox + let inbox_name = InboxName::get_job_inbox_name_from_params(job_id.clone()).unwrap(); + let sender = format!("{}/{}", node1_identity_name.clone(), node1_subidentity_name.clone()); + + let msg = ShinkaiMessageBuilder::get_last_unread_messages_from_inbox( + clone_static_secret_key(&node1_profile_encryption_sk), + clone_signature_secret_key(&node1_profile_identity_sk), + node1_encryption_pk.clone(), + inbox_name.to_string(), + 3, + None, + "".to_string(), + sender.clone(), + node1_identity_name.clone().to_string(), + ) + .unwrap(); + let (res2_sender, res2_receiver) = async_channel::bounded(1); + node1_commands_sender + .send(NodeCommand::APIGetLastUnreadMessagesFromInbox { msg, res: res2_sender }) + .await + .unwrap(); + let node2_last_messages = res2_receiver.recv().await.unwrap().expect("Failed to receive messages"); + println!("### node2_last_messages: {:?}", node2_last_messages); + let shinkai_message_content_agent = node2_last_messages[2].get_message_content().unwrap(); + let message_content_agent: JobMessage = serde_json::from_str(&shinkai_message_content_agent).unwrap(); + + assert_eq!(message_content_agent.content, message.to_string()); + assert!(node2_last_messages.len() == 3); + + let offset = format!("{}:::{}", node2_last_messages[1].external_metadata.scheduled_time, node2_last_messages[1].calculate_message_hash()); + eprintln!("next_msg offset: {}", offset); + let next_msg = ShinkaiMessageBuilder::get_last_unread_messages_from_inbox( + clone_static_secret_key(&node1_profile_encryption_sk), + clone_signature_secret_key(&node1_profile_identity_sk), + node1_encryption_pk.clone(), + inbox_name.to_string(), + 4, + Some(offset.clone()), + "".to_string(), + sender.clone(), + node1_identity_name.clone().to_string(), + ) + .unwrap(); + let (res2_sender, res2_receiver) = async_channel::bounded(1); + node1_commands_sender + .send(NodeCommand::APIGetLastUnreadMessagesFromInbox { msg: next_msg, res: res2_sender }) + .await + .unwrap(); + let node2_last_messages = res2_receiver.recv().await.unwrap().expect("Failed to receive messages"); + println!("### node2_last_messages unread pagination: {:?}", node2_last_messages); + let shinkai_message_content_agent = node2_last_messages[0].get_message_content().unwrap(); + let message_content_agent: JobMessage = serde_json::from_str(&shinkai_message_content_agent).unwrap(); + + assert!(node2_last_messages.len() == 2); + assert_eq!(message_content_agent.content, message.to_string()); + + // we mark read until the offset + let read_msg = ShinkaiMessageBuilder::read_up_to_time( + clone_static_secret_key(&node1_profile_encryption_sk), + clone_signature_secret_key(&node1_profile_identity_sk), + node1_encryption_pk.clone(), + inbox_name.to_string(), + offset, + "".to_string(), + sender, + node1_identity_name.clone().to_string(), + ) + .unwrap(); + let (res2_sender, res2_receiver) = async_channel::bounded(1); + node1_commands_sender + .send(NodeCommand::APIMarkAsReadUpTo { msg: read_msg, res: res2_sender }) + .await + .unwrap(); + } + { + // check how many unread messages are left + let inbox_name = InboxName::get_job_inbox_name_from_params(job_id.clone()).unwrap(); + let sender = format!("{}/{}", node1_identity_name.clone(), node1_subidentity_name.clone()); + + let msg = ShinkaiMessageBuilder::get_last_unread_messages_from_inbox( + clone_static_secret_key(&node1_profile_encryption_sk), + clone_signature_secret_key(&node1_profile_identity_sk), + node1_encryption_pk.clone(), + inbox_name.to_string(), + 3, + None, + "".to_string(), + sender.clone(), + node1_identity_name.clone().to_string(), + ) + .unwrap(); + let (res2_sender, res2_receiver) = async_channel::bounded(1); + node1_commands_sender + .send(NodeCommand::APIGetLastUnreadMessagesFromInbox { msg, res: res2_sender }) + .await + .unwrap(); + let node2_last_messages = res2_receiver.recv().await.unwrap().expect("Failed to receive messages"); + println!("### unread after cleaning node2_last_messages: {:?}", node2_last_messages); + eprintln!("### unread after cleaning node2_last_messages len: {:?}", node2_last_messages.len()); + assert!(node2_last_messages.len() == 2); + } }); // Wait for all tasks to complete diff --git a/tests/db_inbox_tests.rs b/tests/db_inbox_tests.rs index 5681cb854..84c2b54c5 100644 --- a/tests/db_inbox_tests.rs +++ b/tests/db_inbox_tests.rs @@ -200,7 +200,7 @@ fn db_inbox() { assert_eq!(last_unread_messages_inbox.len(), 2); assert_eq!( last_unread_messages_inbox[0].clone().get_message_content().unwrap(), - "Hello World 3".to_string() + "Hello World".to_string() ); assert_eq!( last_unread_messages_inbox[1].clone().get_message_content().unwrap(), @@ -217,7 +217,7 @@ fn db_inbox() { assert_eq!(last_unread_messages_inbox_page2.len(), 1); assert_eq!( last_unread_messages_inbox_page2[0].clone().get_message_content().unwrap(), - "Hello World".to_string() + "Hello World 3".to_string() ); // check pagination for inbox messages diff --git a/tests/job_manager_tests.rs b/tests/job_manager_tests.rs index 889016bf0..afa3e1d74 100644 --- a/tests/job_manager_tests.rs +++ b/tests/job_manager_tests.rs @@ -130,7 +130,7 @@ mod tests { } // Create JobManager - let mut job_manager = JobManager::new(db_arc.clone(), identity_manager).await; + let mut job_manager = JobManager::new(db_arc.clone(), identity_manager, clone_signature_secret_key(&node1_identity_sk), node_profile_name.clone()).await; // Create a JobCreationMessage ShinkaiMessage let scope = JobScope { diff --git a/tests/node_integration_tests.rs b/tests/node_integration_tests.rs index 4c409191f..7f9d4933f 100644 --- a/tests/node_integration_tests.rs +++ b/tests/node_integration_tests.rs @@ -350,7 +350,7 @@ fn subidentity_registration() { ); assert_eq!( - node1_last_messages[1].external_metadata.clone().other, + node1_last_messages[0].external_metadata.clone().other, encryption_public_key_to_string(node2_subencryption_pk), "Node 2's profile send an encrypted message to Node 1. Node 1 has the other's public key" ); @@ -560,7 +560,7 @@ fn subidentity_registration() { // println!("Node 1 last messages: {:?}", node1_last_messages); // println!("\n\nNode 2 last messages: {:?}", node2_last_messages); - let message_to_check = node2_last_messages[1].clone(); + let message_to_check = node2_last_messages[0].clone(); // Check that the message is body encrypted assert_eq!( diff --git a/tests/node_retrying_tests.rs b/tests/node_retrying_tests.rs new file mode 100644 index 000000000..1628ed0ec --- /dev/null +++ b/tests/node_retrying_tests.rs @@ -0,0 +1,224 @@ +use async_channel::{bounded, Receiver, Sender}; +use shinkai_message_wasm::schemas::agents::serialized_agent::{AgentAPIModel, OpenAI, SerializedAgent}; +use shinkai_message_wasm::schemas::inbox_name::InboxName; +use shinkai_message_wasm::schemas::shinkai_name::ShinkaiName; +use shinkai_message_wasm::shinkai_message::shinkai_message_schemas::{JobMessage, MessageSchemaType}; +use shinkai_message_wasm::shinkai_utils::encryption::{ + clone_static_secret_key, unsafe_deterministic_encryption_keypair, EncryptionMethod, encryption_public_key_to_string, +}; +use shinkai_message_wasm::shinkai_utils::shinkai_message_builder::ShinkaiMessageBuilder; +use shinkai_message_wasm::shinkai_utils::signatures::{ + clone_signature_secret_key, unsafe_deterministic_signature_keypair, +}; +use shinkai_message_wasm::shinkai_utils::utils::hash_string; +use shinkai_node::managers::agent; +use shinkai_node::network::node::NodeCommand; +use shinkai_node::network::node_api::APIError; +use shinkai_node::network::Node; +use std::fs; +use std::net::{IpAddr, Ipv4Addr}; +use std::path::Path; +use std::{net::SocketAddr, time::Duration}; +use tokio::runtime::Runtime; + +mod utils; +use crate::utils::node_test_api::{ + api_agent_registration, api_create_job, api_message_job, api_registration_device_node_profile_main, +}; +use crate::utils::node_test_local::local_registration_profile_node; + +#[test] +fn setup() { + let path = Path::new("db_tests/"); + let _ = fs::remove_dir_all(&path); +} + +#[test] +fn node_retrying_test() { + setup(); + let rt = Runtime::new().unwrap(); + + rt.block_on(async { + let node1_identity_name = "@@node1_test.shinkai"; + let node2_identity_name = "@@node2_test.shinkai"; + let node1_profile_name = "main"; + let node1_device_name = "node1_device"; + + let node2_profile_name = "main_profile_node2"; + + let (node1_identity_sk, node1_identity_pk) = unsafe_deterministic_signature_keypair(0); + let (node1_encryption_sk, node1_encryption_pk) = unsafe_deterministic_encryption_keypair(0); + let node1_encryption_sk_clone = node1_encryption_sk.clone(); + let node1_encryption_sk_clone2 = node1_encryption_sk.clone(); + + let (node2_identity_sk, node2_identity_pk) = unsafe_deterministic_signature_keypair(1); + let (node2_encryption_sk, node2_encryption_pk) = unsafe_deterministic_encryption_keypair(1); + let node2_encryption_sk_clone = node2_encryption_sk.clone(); + + let node1_identity_sk_clone = clone_signature_secret_key(&node1_identity_sk); + let node2_identity_sk_clone = clone_signature_secret_key(&node2_identity_sk); + + let (node1_profile_identity_sk, node1_profile_identity_pk) = unsafe_deterministic_signature_keypair(100); + let (node1_profile_encryption_sk, node1_profile_encryption_pk) = unsafe_deterministic_encryption_keypair(100); + + let (node2_subidentity_sk, node2_subidentity_pk) = unsafe_deterministic_signature_keypair(101); + let (node2_subencryption_sk, node2_subencryption_pk) = unsafe_deterministic_encryption_keypair(101); + + let node1_subencryption_sk_clone = node1_profile_encryption_sk.clone(); + let node2_subencryption_sk_clone = node2_subencryption_sk.clone(); + + let node1_subidentity_sk_clone = clone_signature_secret_key(&node1_profile_identity_sk); + let node2_subidentity_sk_clone = clone_signature_secret_key(&node2_subidentity_sk); + + let (node1_device_identity_sk, node1_device_identity_pk) = unsafe_deterministic_signature_keypair(200); + let (node1_device_encryption_sk, node1_device_encryption_pk) = unsafe_deterministic_encryption_keypair(200); + + let (node1_commands_sender, node1_commands_receiver): (Sender, Receiver) = + bounded(100); + let (node2_commands_sender, node2_commands_receiver): (Sender, Receiver) = + bounded(100); + + let node1_db_path = format!("db_tests/{}", hash_string(node1_identity_name.clone())); + let node2_db_path = format!("db_tests/{}", hash_string(node2_identity_name.clone())); + + // Create node1 and node2 + let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let mut node1 = Node::new( + node1_identity_name.to_string(), + addr1, + node1_identity_sk, + node1_encryption_sk, + 0, + node1_commands_receiver, + node1_db_path, + ); + + let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081); + let mut node2 = Node::new( + node2_identity_name.to_string(), + addr2, + node2_identity_sk, + node2_encryption_sk, + 0, + node2_commands_receiver, + node2_db_path, + ); + + eprintln!("Starting nodes"); + // Start node1 and node2 + let node1_handler = tokio::spawn(async move { + eprintln!("\n\n"); + eprintln!("Starting node 1"); + let _ = node1.await.start().await; + }); + + let node2_handler = tokio::spawn(async move { + eprintln!("\n\n"); + eprintln!("Starting node 2"); + let _ = node2.await.start().await; + }); + + let interactions_handler = tokio::spawn(async move { + eprintln!("Starting interactions"); + eprintln!("Registration of Subidentities"); + + // Register a Profile in Node1 and verifies it + { + eprintln!("Register a Device with main profile in Node1 and verify it"); + api_registration_device_node_profile_main( + node1_commands_sender.clone(), + node1_profile_name, + node1_identity_name, + node1_encryption_pk.clone(), + node1_device_encryption_sk.clone(), + clone_signature_secret_key(&node1_device_identity_sk), + node1_profile_encryption_sk.clone(), + clone_signature_secret_key(&node1_profile_identity_sk), + node1_device_name, + ) + .await; + } + + // Register a Profile in Node2 and verifies it + { + eprintln!("Register a Profile in Node2 and verify it"); + local_registration_profile_node( + node2_commands_sender.clone(), + node2_profile_name, + node2_identity_name, + node2_subencryption_sk_clone.clone(), + node2_encryption_pk, + clone_signature_secret_key(&node2_subidentity_sk), + 1, + ) + .await; + } + + // Send message from Node 2 subidentity to Node 1 + { + eprintln!("\n\n### Sending message from a node 2 profile to node 1 profile\n\n"); + + let message_content = "test body content".to_string(); + let unchanged_message = ShinkaiMessageBuilder::new( + node2_subencryption_sk.clone(), + clone_signature_secret_key(&node2_subidentity_sk), + node1_encryption_pk, + ) + .message_raw_content(message_content.clone()) + .no_body_encryption() + .message_schema_type(MessageSchemaType::TextContent) + .internal_metadata( + node2_profile_name.to_string().clone(), + node1_profile_name.to_string(), + EncryptionMethod::DiffieHellmanChaChaPoly1305, + ) + .external_metadata_with_other( + node1_identity_name.to_string(), + node2_identity_name.to_string().clone(), + encryption_public_key_to_string(node2_subencryption_pk.clone()), + ) + .build() + .unwrap(); + + eprintln!("\n\n unchanged message: {:?}", unchanged_message); + + // Shutdown Node 1 + node1_commands_sender + .send(NodeCommand::Shutdown) + .await + .unwrap(); + + let (res_send_msg_sender, res_send_msg_receiver): ( + async_channel::Sender>, + async_channel::Receiver>, + ) = async_channel::bounded(1); + + node2_commands_sender + .send(NodeCommand::SendOnionizedMessage { + msg: unchanged_message, + res: res_send_msg_sender, + }) + .await + .unwrap(); + + let send_result = res_send_msg_receiver.recv().await.unwrap(); + eprintln!("send_result: {:?}", send_result); + assert!(send_result.is_ok(), "Failed to send onionized message"); + tokio::time::sleep(Duration::from_secs(1)).await; + + // Get Node2 messages + let (res2_sender, res2_receiver) = async_channel::bounded(1); + node2_commands_sender + .send(NodeCommand::FetchLastMessages { + limit: 2, + res: res2_sender, + }) + .await + .unwrap(); + let node2_last_messages = res2_receiver.recv().await.unwrap(); + } + }); + + let _ = tokio::try_join!(node1_handler, node2_handler, interactions_handler).unwrap(); + }); +}