diff --git a/.github/workflows/nightlyIntegrationTests.yml b/.github/workflows/nightlyIntegrationTests.yml index 478299f63099..f8967c10426a 100644 --- a/.github/workflows/nightlyIntegrationTests.yml +++ b/.github/workflows/nightlyIntegrationTests.yml @@ -19,7 +19,7 @@ jobs: # John and re-run the job. runs-on: ["self-hosted", "1ES.Pool=1ES-CIRCT-builds", "linux"] container: - image: ghcr.io/circt/images/circt-integration-test:v13.1 + image: ghcr.io/circt/images/circt-integration-test:v15.0 volumes: - /mnt:/__w/circt strategy: diff --git a/.github/workflows/shortIntegrationTests.yml b/.github/workflows/shortIntegrationTests.yml index 0af6f7e252f8..78b85ed60db5 100644 --- a/.github/workflows/shortIntegrationTests.yml +++ b/.github/workflows/shortIntegrationTests.yml @@ -29,7 +29,7 @@ jobs: # John and re-run the job. runs-on: ["self-hosted", "1ES.Pool=1ES-CIRCT-builds", "linux"] container: - image: ghcr.io/circt/images/circt-integration-test:v13.1 + image: ghcr.io/circt/images/circt-integration-test:v15.0 volumes: - /mnt:/__w/circt strategy: diff --git a/docs/Dialects/ESI/cosim.md b/docs/Dialects/ESI/cosim.md index 2c0ac2b03906..2cdf1c1ac232 100644 --- a/docs/Dialects/ESI/cosim.md +++ b/docs/Dialects/ESI/cosim.md @@ -15,7 +15,9 @@ variety of languages: the Cap'nProto website lists C++, C#, Erlang, Go, Haskell, JavaScript, Ocaml, Python, and Rust as languages which support messages and RPC! -Status: **prototype** +Status: **production** +Documentation status: **out of date** needs to be updated to reflect the new RPC +interface and library. ## Usage diff --git a/frontends/PyCDE/integration_test/esitester.py b/frontends/PyCDE/integration_test/esitester.py index 95fea9cc1ffe..c7bce36538bb 100644 --- a/frontends/PyCDE/integration_test/esitester.py +++ b/frontends/PyCDE/integration_test/esitester.py @@ -17,7 +17,7 @@ # RUN: rm -rf %t # RUN: mkdir %t && cd %t # RUN: %PYTHON% %s %t 2>&1 -# RUN: esi-cosim.py -- esitester cosim env | FileCheck %s +# RUN: esi-cosim.py --source %t -- esitester cosim env wait | FileCheck %s import pycde from pycde import AppID, Clock, Module, Reset, generator diff --git a/integration_test/Dialect/ESI/runtime/basic_mmio.sv b/integration_test/Dialect/ESI/runtime/basic_mmio.sv index 51e0e0f8e4d1..91456b678e7b 100644 --- a/integration_test/Dialect/ESI/runtime/basic_mmio.sv +++ b/integration_test/Dialect/ESI/runtime/basic_mmio.sv @@ -1,4 +1,5 @@ // REQUIRES: esi-cosim, esi-runtime, rtl-sim +// XFAIL: * // RUN: rm -rf %t && mkdir %t && cp %s %t // RUN: esi-cosim.py --source %t --top top -- %python %s.py cosim env diff --git a/lib/Dialect/ESI/runtime/CMakeLists.txt b/lib/Dialect/ESI/runtime/CMakeLists.txt index 123a0d7aecc9..8b240f627995 100644 --- a/lib/Dialect/ESI/runtime/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/CMakeLists.txt @@ -37,7 +37,7 @@ find_package(ZLIB REQUIRED) # JSON parser for the manifest. if (NOT TARGET nlohmann_json) - message(" -- ESI runtime pulling down json") + message("-- ESI runtime pulling down json") FetchContent_Declare(json GIT_REPOSITORY https://github.com/nlohmann/json.git GIT_TAG v3.11.3 @@ -94,49 +94,37 @@ ENDIF(MSVC) option(ESI_COSIM "Enable ESI cosimulation." ON) if(ESI_COSIM) + # gRPC for cosimulation. Local install required. + option(GRPC_PATH "Location of gRPC install.") + if (${GRPC_PATH}) + find_package(Protobuf REQUIRED CONFIG HINTS ${GRPC_PATH}) + find_package(gRPC REQUIRED CONFIG HINTS ${GRPC_PATH}) + else() + find_package(Protobuf REQUIRED CONFIG) + find_package(gRPC REQUIRED CONFIG) + endif() + + add_subdirectory(cosim) # Inform the runtime code that Cosimulation is enabled. Kinda hacky since all # backends should only need to be linked in. # TODO: Once the hack in the python bindings is remidied, remove this. add_compile_definitions(ESI_COSIM) - - # Try to find Cap'nProto. If the user has set CAPNP_PATH, use that. - if(DEFINED CAPNP_PATH) - set(ENV{PKG_CONFIG_PATH} - "${CAPNP_PATH}/lib/pkgconfig:$ENV{PKG_CONFIG_PATH}") - find_package(CapnProto CONFIG PATHS ${CAPNP_PATH}) - else() - set(ENV{PKG_CONFIG_PATH} - "${CMAKE_CURRENT_SOURCE_DIR}/ext/lib/pkgconfig:$ENV{PKG_CONFIG_PATH}") - find_package(CapnProto CONFIG PATHS "${CMAKE_CURRENT_SOURCE_DIR}/ext") - endif() - - # If Cap'nProto has been found, generate the headers and definitions. - if(CapnProto_FOUND) - message("-- ESI cosim enabled") - - message(STATUS "Found Cap'nProto at ${CapnProto_DIR}.") - add_subdirectory(cosim) - - set(ESIRuntimeSources - ${ESIRuntimeSources} - ${CMAKE_CURRENT_SOURCE_DIR}/cpp/lib/backends/Cosim.cpp - ) - set(ESIRuntimeBackendHeaders - ${ESIRuntimeBackendHeaders} - ${CMAKE_CURRENT_SOURCE_DIR}/cpp/include/esi/backends/Cosim.h - ) - set(ESIRuntimeLinkLibraries - ${ESIRuntimeLinkLibraries} - EsiCosimCapnp - ) - set(ESIRuntimeIncludeDirs - ${ESIRuntimeIncludeDirs} - ${CMAKE_CURRENT_SOURCE_DIR}/cosim/include - ) - else() - message(FATAL_ERROR "ESI cosimulation requires Cap'nProto. Either install - Cap'nProto or disable ESI cosim with -DESI_COSIM=OFF.") - endif() + set(ESIRuntimeSources + ${ESIRuntimeSources} + ${CMAKE_CURRENT_SOURCE_DIR}/cpp/lib/backends/Cosim.cpp + ) + set(ESIRuntimeBackendHeaders + ${ESIRuntimeBackendHeaders} + ${CMAKE_CURRENT_SOURCE_DIR}/cpp/include/esi/backends/Cosim.h + ) + set(ESIRuntimeLinkLibraries + ${ESIRuntimeLinkLibraries} + EsiCosimGRPC + ) + set(ESIRuntimeIncludeDirs + ${ESIRuntimeIncludeDirs} + ${CMAKE_CURRENT_SOURCE_DIR}/cosim/include + ) else() message("-- ESI cosim disabled") endif() @@ -193,17 +181,6 @@ install(TARGETS ESIRuntime DESTINATION lib COMPONENT ESIRuntime ) -install(IMPORTED_RUNTIME_ARTIFACTS ESIRuntime - RUNTIME_DEPENDENCY_SET ESIRuntime_RUNTIME_DEPS - DESTINATION lib - COMPONENT ESIRuntime -) -install(RUNTIME_DEPENDENCY_SET ESIRuntime_RUNTIME_DEPS - DESTINATION lib - PRE_EXCLUDE_REGEXES .* - PRE_INCLUDE_REGEXES capnp kj - COMPONENT ESIRuntime -) install(FILES ${ESIRuntimeHeaders} DESTINATION include/esi COMPONENT ESIRuntime-dev diff --git a/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt b/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt index 5c55946ca82f..0572f350a5d4 100644 --- a/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt @@ -2,41 +2,6 @@ ## ##===----------------------------------------------------------------------===// -# Compile Capnp file. -add_definitions(${CAPNP_DEFINITIONS}) -set(CAPNPC_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR}/../cpp/include/backends) -include_directories(${CAPNPC_OUTPUT_DIR}) -file(MAKE_DIRECTORY ${CAPNPC_OUTPUT_DIR}) -capnp_generate_cpp( - COSIM_CAPNP_SRCS COSIM_CANPN_HDRS - "CosimDpi.capnp" -) - -# Compile a library for ESI cosim capnp for both the API runtime backend and the -# cosim DPI server to use. -add_library(EsiCosimCapnp SHARED - ${COSIM_CAPNP_HDRS} - ${COSIM_CAPNP_SRCS} - ${COSIM_SCHEMA_HDR} - - lib/CapnpThreads.cpp - lib/Client.cpp - lib/Endpoint.cpp - lib/Server.cpp -) -target_include_directories(EsiCosimCapnp PUBLIC ${CAPNPC_OUTPUT_DIR}) -target_include_directories(EsiCosimCapnp PUBLIC ${CAPNP_INCLUDE_DIRS}) -target_include_directories(EsiCosimCapnp PUBLIC - ${CMAKE_CURRENT_SOURCE_DIR}/../cpp/include) -target_link_libraries(EsiCosimCapnp PUBLIC - CapnProto::kj CapnProto::kj-async CapnProto::kj-gzip - CapnProto::capnp CapnProto::capnp-rpc -) -install(TARGETS EsiCosimCapnp - DESTINATION lib - COMPONENT ESIRuntime -) - set(cosim_collateral Cosim_DpiPkg.sv Cosim_Endpoint.sv @@ -73,13 +38,31 @@ install(FILES COMPONENT ESIRuntime ) -# Cap'nProto MUST be built with exceptions enabled. -if (MSVC) - target_compile_options(EsiCosimCapnp PRIVATE /EHsc) -else() - target_compile_options(EsiCosimCapnp PRIVATE -fexceptions) -endif() +add_library(EsiCosimGRPC OBJECT "${CMAKE_CURRENT_LIST_DIR}/cosim.proto") +target_link_libraries(EsiCosimGRPC PUBLIC protobuf::libprotobuf gRPC::grpc++) +set(PROTO_BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") +target_include_directories(EsiCosimGRPC PUBLIC "$") + +protobuf_generate( + TARGET EsiCosimGRPC + PROTOC_OUT_DIR "${PROTO_BINARY_DIR}") +protobuf_generate( + TARGET EsiCosimGRPC + LANGUAGE grpc + GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc + PLUGIN "protoc-gen-grpc=\$" + PROTOC_OUT_DIR "${PROTO_BINARY_DIR}") include_directories("${CMAKE_CURRENT_SOURCE_DIR}/include") + +add_library(CosimRpcServer + lib/RpcServer.cpp +) +target_link_libraries(CosimRpcServer + PUBLIC + EsiCosimGRPC + ESIRuntime +) + add_subdirectory(cosim_dpi_server) add_subdirectory(MtiPliStub) diff --git a/lib/Dialect/ESI/runtime/cosim/CosimDpi.capnp b/lib/Dialect/ESI/runtime/cosim/CosimDpi.capnp deleted file mode 100644 index 9f61dca05be5..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/CosimDpi.capnp +++ /dev/null @@ -1,60 +0,0 @@ -##===- CosimDpi.capnp - ESI cosim RPC schema ------------------*- CAPNP -*-===// -## -## Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -## See https://llvm.org/LICENSE.txt for license information. -## SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -## -##===----------------------------------------------------------------------===// -## -## The ESI cosimulation RPC Cap'nProto schema. Documentation is in -## docs/ESI/cosim.md. TL;DR: Run the simulation, then connect to its RPC server -## with a client generated by the Cap'nProto implementation for your language of -## choice! (https://capnproto.org/otherlang.html) -## -##===----------------------------------------------------------------------===// - -@0x9fd65fec6e2d2779; - -# The primary interface exposed by an ESI cosim simulation. -interface CosimDpiServer @0xe3d7f70c7065c46a { - # List all the registered endpoints. - list @0 () -> (ifaces :List(EsiDpiInterfaceDesc)); - # Open one of them. Specify both the send and recv data types if want type - # safety and your language supports it. - open @1 (iface :EsiDpiInterfaceDesc) -> (endpoint :EsiDpiEndpoint); - - # Get the zlib-compressed JSON system manifest. - getCompressedManifest @2 () -> (version :Int32, compressedManifest :Data); - - # Create a low level interface into the simulation. - openLowLevel @3 () -> (lowLevel :EsiLowLevel); -} - -# Description of a registered endpoint. -struct EsiDpiInterfaceDesc @0xd2584d2506f01c8c { - # Capn'Proto ID of the struct type being sent _to_ the simulator. - fromHostType @0 :Text; - # Capn'Proto ID of the struct type being sent _from_ the simulator. - toHostType @1 :Text; - # Numerical identifier of the endpoint. Defined in the design. - endpointID @2 :Text; -} - -# Interactions with an open endpoint. Optionally typed. -interface EsiDpiEndpoint @0xfb0a36bf859be47b { - # Send a message to the endpoint. - sendFromHost @0 (msg :Data); - # Recieve a message from the endpoint. Non-blocking. - recvToHost @1 () -> (hasData :Bool, resp :Data); - # Close the connect to this endpoint. - close @2 (); -} - -# A low level interface simply provides MMIO and host memory access. In all -# cases, hardware errors become exceptions. -interface EsiLowLevel @0xae716100ef82f6d6 { - # Write to an MMIO register. - writeMMIO @0 (address :UInt32, data :UInt32) -> (); - # Read from an MMIO register. - readMMIO @1 (address :UInt32) -> (data :UInt32); -} diff --git a/lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv b/lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv index e352afb781b7..0612ab50a9c9 100644 --- a/lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv +++ b/lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv @@ -78,7 +78,7 @@ import "DPI-C" sv2cCosimserverEpTryGet = import "DPI-C" sv2cCosimserverSetManifest = function void cosim_set_manifest( - input int unsigned esi_version, + input int signed esi_version, input byte unsigned compressed_manifest[] ); diff --git a/lib/Dialect/ESI/runtime/cosim/cosim.proto b/lib/Dialect/ESI/runtime/cosim/cosim.proto new file mode 100644 index 000000000000..67683cd5d13f --- /dev/null +++ b/lib/Dialect/ESI/runtime/cosim/cosim.proto @@ -0,0 +1,68 @@ +//===- cosim.proto - ESI cosim RPC definitions ------------------*- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// The ESI cosimulation gRPC schema. If something (client or server) wants to +// talk to an ESI runtime, it mergely needs to implement this schema. If +// possible, however, it is encouraged to use the C++ esiruntime API as that is +// expected to be more portable and supports more than just cosim. +// +//===----------------------------------------------------------------------===// + +syntax = "proto3"; + +package esi.cosim; + +// Description of a channel that can be connected to by the client. +message ChannelDesc { + string name = 1; + + enum Direction { + TO_SERVER = 0; + TO_CLIENT = 1; + } + Direction dir = 2; + + string type = 3; +} + +// List of channels that the client can connect to. +message ListOfChannels { repeated ChannelDesc channels = 1; } + +// Empty message since gRPC only supports exactly one argument and return. +message VoidMessage {} + +// The manifest package. +message Manifest { + int32 esi_version = 1; + bytes compressed_manifest = 2; +} + +// An ESI message. +message Message { bytes data = 1; } + +// An ESI message and the channel to which is should be directed. +message AddressedMessage { + string channel_name = 1; + Message message = 2; +} + +// The server interface provided by the ESI cosim server. +service ChannelServer { + // Get the manifest embedded in the accelertor. + rpc GetManifest(VoidMessage) returns (Manifest) {} + + // List the channels that the client can connect to. + rpc ListChannels(VoidMessage) returns (ListOfChannels) {} + + // Send a message to the server. + rpc SendToServer(AddressedMessage) returns (VoidMessage) {} + + // Connect to a client channel and return a stream of messages coming from + // that channel. + rpc ConnectToClientChannel(ChannelDesc) returns (stream Message) {} +} diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt index 562f05fec9fc..bea992f0ca62 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt @@ -13,8 +13,14 @@ set_target_properties(EsiCosimDpiServer RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib CXX_VISIBILITY_PRESET "default" ) -add_dependencies(EsiCosimDpiServer EsiCosimCapnp MtiPli) -target_link_libraries(EsiCosimDpiServer PRIVATE EsiCosimCapnp MtiPli) +add_dependencies(EsiCosimDpiServer ESIRuntime MtiPli) +target_link_libraries(EsiCosimDpiServer + PRIVATE + ESIRuntime + CosimRpcServer + MtiPli +) + install(TARGETS EsiCosimDpiServer DESTINATION lib COMPONENT ESIRuntime diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp index c7170228142e..bad8afefa05b 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp @@ -16,32 +16,33 @@ // //===----------------------------------------------------------------------===// -#include "cosim/CapnpThreads.h" #include "dpi.h" +#include "esi/Ports.h" +#include "esi/cosim/RpcServer.h" #include #include #include +using namespace esi; using namespace esi::cosim; /// If non-null, log to this file. Protected by 'serverMutex`. static FILE *logFile; -static RpcServer *server = nullptr; +static std::unique_ptr server = nullptr; static std::mutex serverMutex; // ---- Helper functions ---- /// Emit the contents of 'msg' to the log file in hex. -static void log(char *epId, bool toClient, - const Endpoint::MessageDataPtr &msg) { +static void log(char *epId, bool toClient, const MessageData &msg) { std::lock_guard g(serverMutex); if (!logFile) return; fprintf(logFile, "[ep: %50s to: %4s]", epId, toClient ? "host" : "sim"); - size_t msgSize = msg->getSize(); - auto bytes = msg->getBytes(); + size_t msgSize = msg.getSize(); + auto bytes = msg.getBytes(); for (size_t i = 0; i < msgSize; ++i) { auto b = bytes[i]; // Separate 32-bit words. @@ -61,7 +62,8 @@ static void log(char *epId, bool toClient, static int findPort() { const char *portEnv = getenv("COSIM_PORT"); if (portEnv == nullptr) { - printf("[COSIM] RPC server port not found. Letting CapnpRPC select one\n"); + printf( + "[COSIM] RPC server port not found. Letting RPC server select one\n"); return 0; } printf("[COSIM] Opening RPC server on port %s\n", portEnv); @@ -101,17 +103,39 @@ static int validateSvOpenArray(const svOpenArrayHandle data, // ---- Traditional cosim DPI entry points ---- +// Lookups for registered ports. As a future optimization, change the DPI API to +// return a handle when registering wherein said handle is a pointer to a port. +std::map readPorts; +std::map writePorts; + // Register simulated device endpoints. // - return 0 on success, non-zero on failure (duplicate EP registered). -DPI int sv2cCosimserverEpRegister(char *endpointId, char *fromHostTypeId, - int fromHostTypeSize, char *toHostTypeId, +// TODO: Change this by breaking it in two functions, one for read and one for +// write. Also return the pointer as a handle. +DPI int sv2cCosimserverEpRegister(char *endpointId, char *fromHostTypeIdC, + int fromHostTypeSize, char *toHostTypeIdC, int toHostTypeSize) { // Ensure the server has been constructed. sv2cCosimserverInit(); - // Then register with it. - if (server->registerEndpoint(endpointId, fromHostTypeId, toHostTypeId)) - return 0; - return -1; + std::string fromHostTypeId(fromHostTypeIdC), toHostTypeId(toHostTypeIdC); + + // Both only one type allowed. + if (!(fromHostTypeId.empty() ^ toHostTypeId.empty())) { + printf("ERROR: Only one of fromHostTypeId and toHostTypeId can be set!\n"); + return -2; + } + if (readPorts.contains(endpointId)) { + printf("ERROR: Endpoint already registered!\n"); + return -3; + } + + if (!fromHostTypeId.empty()) + readPorts.emplace(endpointId, + server->registerReadPort(endpointId, fromHostTypeId)); + else + writePorts.emplace(endpointId, + server->registerWritePort(endpointId, toHostTypeId)); + return 0; } // Attempt to recieve data from a client. @@ -127,25 +151,25 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, if (server == nullptr) return -1; - Endpoint *ep = server->getEndpoint(endpointId); - if (!ep) { + auto portIt = readPorts.find(endpointId); + if (portIt == readPorts.end()) { fprintf(stderr, "Endpoint not found in registry!\n"); return -4; } - Endpoint::MessageDataPtr msg; + ReadChannelPort &port = portIt->second; + MessageData msg; // Poll for a message. - if (!ep->getMessageToSim(msg)) { + if (!port.read(msg)) { // No message. *dataSize = 0; return 0; } + log(endpointId, false, msg); + // Do the validation only if there's a message available. Since the // simulator is going to poll up to every tick and there's not going to be // a message most of the time, this is important for performance. - - log(endpointId, false, msg); - if (validateSvOpenArray(data, sizeof(int8_t)) != 0) { printf("ERROR: DPI-func=%s line=%d event=invalid-sv-array\n", __func__, __LINE__); @@ -161,7 +185,7 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, return -3; } // Verify it'll fit. - size_t msgSize = msg->getSize(); + size_t msgSize = msg.getSize(); if (msgSize > *dataSize) { printf("ERROR: Message size too big to fit in HW buffer\n"); return -5; @@ -169,7 +193,7 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, // Copy the message data. size_t i; - auto bytes = msg->getBytes(); + auto bytes = msg.getBytes(); for (i = 0; i < msgSize; ++i) { auto b = bytes[i]; *(char *)svGetArrElemPtr1(data, i) = b; @@ -179,7 +203,7 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, *(char *)svGetArrElemPtr1(data, i) = 0; } // Set the output data size. - *dataSize = msg->getSize(); + *dataSize = msg.getSize(); return 0; } @@ -213,16 +237,17 @@ DPI int sv2cCosimserverEpTryPut(char *endpointId, for (int i = 0; i < dataSize; ++i) { dataVec[i] = *(char *)svGetArrElemPtr1(data, i); } - Endpoint::MessageDataPtr blob = std::make_unique(dataVec); + auto blob = std::make_unique(dataVec); // Queue the blob. - Endpoint *ep = server->getEndpoint(endpointId); - if (!ep) { + auto portIt = writePorts.find(endpointId); + if (portIt == writePorts.end()) { fprintf(stderr, "Endpoint not found in registry!\n"); return -4; } - log(endpointId, true, blob); - ep->pushMessageToClient(std::move(blob)); + log(endpointId, true, *blob); + WriteChannelPort &port = portIt->second; + port.write(*blob); return 0; } @@ -254,7 +279,7 @@ DPI int sv2cCosimserverInit() { // Find the port and run. printf("[cosim] Starting RPC server.\n"); - server = new RpcServer(); + server = std::make_unique(); server->run(findPort()); } return 0; @@ -263,7 +288,7 @@ DPI int sv2cCosimserverInit() { // ---- Manifest DPI entry points ---- DPI void -sv2cCosimserverSetManifest(unsigned int esiVersion, +sv2cCosimserverSetManifest(int esiVersion, const svOpenArrayHandle compressedManifest) { if (server == nullptr) sv2cCosimserverInit(); @@ -287,6 +312,10 @@ sv2cCosimserverSetManifest(unsigned int esiVersion, // ---- Low-level cosim DPI entry points ---- +// TODO: These had the shit broken outta them in the gRPC conversion. We're not +// actively using them at the moment, but they'll have to be revived again in +// the future. + static bool mmioRegistered = false; DPI int sv2cCosimserverMMIORegister() { if (mmioRegistered) { @@ -299,47 +328,50 @@ DPI int sv2cCosimserverMMIORegister() { } DPI int sv2cCosimserverMMIOReadTryGet(uint32_t *address) { - assert(server); - LowLevel *ll = server->getLowLevel(); - std::optional reqAddress = ll->readReqs.pop(); - if (!reqAddress.has_value()) - return -1; - *address = reqAddress.value(); - ll->readsOutstanding++; - return 0; + // assert(server); + // LowLevel *ll = server->getLowLevel(); + // std::optional reqAddress = ll->readReqs.pop(); + // if (!reqAddress.has_value()) + return -1; + // *address = reqAddress.value(); + // ll->readsOutstanding++; + // return 0; } DPI void sv2cCosimserverMMIOReadRespond(uint32_t data, char error) { - assert(server); - LowLevel *ll = server->getLowLevel(); - if (ll->readsOutstanding == 0) { - printf("ERROR: More read responses than requests! Not queuing response.\n"); - return; - } - ll->readsOutstanding--; - ll->readResps.push(data, error); + assert(false && "unimplemented"); + // assert(server); + // LowLevel *ll = server->getLowLevel(); + // if (ll->readsOutstanding == 0) { + // printf("ERROR: More read responses than requests! Not queuing + // response.\n"); return; + // } + // ll->readsOutstanding--; + // ll->readResps.push(data, error); } DPI void sv2cCosimserverMMIOWriteRespond(char error) { - assert(server); - LowLevel *ll = server->getLowLevel(); - if (ll->writesOutstanding == 0) { - printf( - "ERROR: More write responses than requests! Not queuing response.\n"); - return; - } - ll->writesOutstanding--; - ll->writeResps.push(error); + assert(false && "unimplemented"); + // assert(server); + // LowLevel *ll = server->getLowLevel(); + // if (ll->writesOutstanding == 0) { + // printf( + // "ERROR: More write responses than requests! Not queuing + // response.\n"); + // return; + // } + // ll->writesOutstanding--; + // ll->writeResps.push(error); } DPI int sv2cCosimserverMMIOWriteTryGet(uint32_t *address, uint32_t *data) { - assert(server); - LowLevel *ll = server->getLowLevel(); - auto req = ll->writeReqs.pop(); - if (!req.has_value()) - return -1; - *address = req.value().first; - *data = req.value().second; - ll->writesOutstanding++; - return 0; + // assert(server); + // LowLevel *ll = server->getLowLevel(); + // auto req = ll->writeReqs.pop(); + // if (!req.has_value()) + return -1; + // *address = req.value().first; + // *data = req.value().second; + // ll->writesOutstanding++; + // return 0; } diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/dpi.h b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/dpi.h index 9ebf4146c35b..2713746527d6 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/dpi.h +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/dpi.h @@ -47,7 +47,7 @@ DPI int sv2cCosimserverInit(); DPI void sv2cCosimserverFinish(); /// Set the system zlib-compressed manifest. -DPI void sv2cCosimserverSetManifest(unsigned int esiVersion, +DPI void sv2cCosimserverSetManifest(int esiVersion, const svOpenArrayHandle compressedManifest); /// Register an MMIO module. Just checks that there is only one instantiated. diff --git a/lib/Dialect/ESI/runtime/cosim/include/cosim/CapnpThreads.h b/lib/Dialect/ESI/runtime/cosim/include/cosim/CapnpThreads.h deleted file mode 100644 index 736a90443d2c..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/include/cosim/CapnpThreads.h +++ /dev/null @@ -1,121 +0,0 @@ -//===- CapnpThreads.h - ESI cosim RPC ---------------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -// -// Various classes used to implement the RPC server classes generated by -// CapnProto. Capnp C++ RPC servers are based on 'libkj' and its asynchrony -// model, which is very foreign. This is what the 'kj' namespace is along with -// alternate collections and other utility code. -// -//===----------------------------------------------------------------------===// - -#ifndef COSIM_SERVER_H -#define COSIM_SERVER_H - -#include "cosim/Endpoint.h" -#include "cosim/LowLevel.h" - -#include -#include - -namespace kj { -class WaitScope; -} // namespace kj - -namespace esi { -namespace cosim { - -/// Since Capnp is not thread-safe, client and server must be run in their own -/// threads and communicate with the outside world through thread safe channels. -class CapnpCosimThread { -public: - CapnpCosimThread(); - ~CapnpCosimThread(); - - /// Stop the thread. This is a blocking call -- it will not return until the - /// capnp thread has stopped. - void stop(); - - // Get an endpoint by its ID. - Endpoint *getEndpoint(std::string epId); - // Get the low level bridge. - LowLevel *getLowLevel() { return &lowLevelBridge; } - - // Get the ESI version and compressed manifest. Returns false if the manifest - // has yet to be loaded. - bool getCompressedManifest(unsigned int &esiVersion, - std::vector &manifest) { - esiVersion = this->esiVersion; - manifest = compressedManifest; - return this->esiVersion >= 0; - } - -protected: - /// Start capnp polling loop. Does not return until stop() is called. Must be - /// called in the same thread the RPC server/client was created. 'poll' is - /// called on each iteration of the loop. - void loop(kj::WaitScope &waitScope, std::function poll); - - using Lock = std::lock_guard; - - EndpointRegistry endpoints; - LowLevel lowLevelBridge; - - std::thread *myThread; - volatile bool stopSig; - std::mutex m; - - unsigned int esiVersion = -1; - std::vector compressedManifest; -}; - -/// The main RpcServer. Does not implement any capnp RPC interfaces but contains -/// the capnp main RPC server. We run the capnp server in its own thread to be -/// more responsive to network traffic and so as to not slow down the -/// simulation. -class RpcServer : public CapnpCosimThread { -public: - /// Start and stop the server thread. - void run(uint16_t port); - - void setManifest(unsigned int esiVersion, - const std::vector &manifest) { - this->esiVersion = esiVersion; - compressedManifest = manifest; - } - - bool registerEndpoint(std::string epId, std::string fromHostTypeId, - std::string toHostTypeId) { - return endpoints.registerEndpoint(epId, fromHostTypeId, toHostTypeId); - } - -private: - /// The thread's main loop function. Exits on shutdown. - void mainLoop(uint16_t port); -}; - -/// The Capnp RpcClient. -class RpcClient : public CapnpCosimThread { - // To hide the ugly details of the capnp headers. - struct Impl; - friend struct Impl; - -public: - /// Start client thread. - void run(std::string host, uint16_t port); - -private: - void mainLoop(std::string host, uint16_t port); - - /// The 'capnp' sets this to true when it is ready to go. - std::atomic started; -}; - -} // namespace cosim -} // namespace esi - -#endif diff --git a/lib/Dialect/ESI/runtime/cosim/include/cosim/Endpoint.h b/lib/Dialect/ESI/runtime/cosim/include/cosim/Endpoint.h deleted file mode 100644 index 21d36b87a464..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/include/cosim/Endpoint.h +++ /dev/null @@ -1,149 +0,0 @@ -//===- Endpoint.h - Cosim endpoint server -----------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -// -// Declare the class which is used to model DPI endpoints. -// -//===----------------------------------------------------------------------===// - -#ifndef COSIM_ENDPOINT_H -#define COSIM_ENDPOINT_H - -#include "esi/Common.h" - -#include -#include -#include -#include -#include -#include - -namespace esi { -namespace cosim { - -/// Implements a bi-directional, thread-safe bridge between the RPC server and -/// DPI functions. -/// -/// Several of the methods below are inline with the declaration to make them -/// candidates for inlining during compilation. This is particularly important -/// on the simulation side since polling happens at each clock and we do not -/// want to slow down the simulation any more than necessary. -class Endpoint { -public: - /// Representing messages as shared pointers to vectors may be a performance - /// issue in the future but it is the easiest way to ensure memory - /// correctness. - using MessageDataPtr = std::unique_ptr; - - /// Construct an endpoint which knows and the type IDs in both directions. - Endpoint(std::string fromHostTypeId, std::string toHostTypeId); - ~Endpoint(); - /// Disallow copying. There is only ONE endpoint object per logical endpoint - /// so copying is almost always a bug. - Endpoint(const Endpoint &) = delete; - - std::string getSendTypeId() const { return fromHostTypeId; } - std::string getRecvTypeId() const { return toHostTypeId; } - - /// These two are used to set and unset the inUse flag, to ensure that an open - /// endpoint is not opened again. - bool setInUse(); - void returnForUse(); - - /// Queue message to the simulation. - void pushMessageToSim(MessageDataPtr msg) { - Lock g(m); - toCosim.push(std::move(msg)); - } - - /// Pop from the to-simulator queue. Return true if there was a message in the - /// queue. - bool getMessageToSim(MessageDataPtr &msg) { - Lock g(m); - if (toCosim.empty()) - return false; - msg = std::move(toCosim.front()); - toCosim.pop(); - return true; - } - - /// Queue message to the RPC client. - void pushMessageToClient(MessageDataPtr msg) { - Lock g(m); - toClient.push(std::move(msg)); - } - - /// Pop from the to-RPC-client queue. Return true if there was a message in - /// the queue. - bool getMessageToClient(MessageDataPtr &msg) { - Lock g(m); - if (toClient.empty()) - return false; - msg = std::move(toClient.front()); - toClient.pop(); - return true; - } - -private: - const std::string fromHostTypeId; - const std::string toHostTypeId; - bool inUse; - - using Lock = std::lock_guard; - - /// This class needs to be thread-safe. All of the mutable member variables - /// are protected with this object-wide lock. This may be a performance issue - /// in the future. - std::mutex m; - /// Message queue from RPC client to the simulation. - std::queue toCosim; - /// Message queue to RPC client from the simulation. - std::queue toClient; -}; - -/// The Endpoint registry is where Endpoints report their existence (register) -/// and they are looked up by RPC clients. -class EndpointRegistry { -public: - /// Register an Endpoint. Creates the Endpoint object and owns it. Returns - /// false if unsuccessful. - bool registerEndpoint(std::string epId, std::string fromHostTypeId, - std::string toHostTypeId); - - /// Get the specified endpoint. Return nullptr if it does not exist. This - /// method is defined inline so it can be inlined at compile time. Performance - /// is important here since this method is used in the polling call from the - /// simulator. Returns nullptr if the endpoint cannot be found. - Endpoint *operator[](const std::string &epId) { - Lock g(m); - auto it = endpoints.find(epId); - if (it == endpoints.end()) - return nullptr; - return &it->second; - } - - /// Iterate over the list of endpoints, calling the provided function for each - /// endpoint. - void iterateEndpoints( - const std::function &f) const; - /// Return the number of endpoints. - size_t size() const; - -private: - using Lock = std::lock_guard; - - /// This object needs to be thread-safe. An object-wide mutex is sufficient. - std::mutex m; - - /// Endpoint ID to object pointer mapping. - std::map endpoints; -}; - -} // namespace cosim -} // namespace esi - -#endif diff --git a/lib/Dialect/ESI/runtime/cosim/include/cosim/LowLevel.h b/lib/Dialect/ESI/runtime/cosim/include/cosim/LowLevel.h deleted file mode 100644 index 390651fee666..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/include/cosim/LowLevel.h +++ /dev/null @@ -1,41 +0,0 @@ -//===- LowLevel.h - Cosim low level implementation --------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#ifndef COSIM_LOWLEVEL_H -#define COSIM_LOWLEVEL_H - -#include - -#include "cosim/Utils.h" - -namespace esi { -namespace cosim { - -// Implements a bi-directional, thread-safe bridge between the RPC server and -// DPI functions for low level functionality. -class LowLevel { -public: - LowLevel() = default; - ~LowLevel() = default; - /// Disallow copying. There is only ONE low level object per RPC server, so - /// copying is almost always a bug. - LowLevel(const LowLevel &) = delete; - - TSQueue readReqs; - TSQueue> readResps; - std::atomic readsOutstanding = 0; - - TSQueue> writeReqs; - TSQueue writeResps; - std::atomic writesOutstanding = 0; -}; - -} // namespace cosim -} // namespace esi - -#endif diff --git a/lib/Dialect/ESI/runtime/cosim/include/cosim/Utils.h b/lib/Dialect/ESI/runtime/cosim/include/cosim/Utils.h deleted file mode 100644 index 4ef3c84d5539..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/include/cosim/Utils.h +++ /dev/null @@ -1,50 +0,0 @@ -//===- Utils.h - utility code for cosim -------------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#ifndef COSIM_UTILS_H -#define COSIM_UTILS_H - -#include -#include -#include - -namespace esi { -namespace cosim { - -/// Thread safe queue. Just wraps std::queue protected with a lock. -template -class TSQueue { - using Lock = std::lock_guard; - - std::mutex m; - std::queue q; - -public: - /// Push onto the queue. - template - void push(E... t) { - Lock l(m); - q.emplace(t...); - } - - /// Pop something off the queue but return nullopt if the queue is empty. Why - /// doesn't std::queue have anything like this? - std::optional pop() { - Lock l(m); - if (q.size() == 0) - return std::nullopt; - auto t = q.front(); - q.pop(); - return t; - } -}; - -} // namespace cosim -} // namespace esi - -#endif diff --git a/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h b/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h new file mode 100644 index 000000000000..c92671641c47 --- /dev/null +++ b/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h @@ -0,0 +1,54 @@ +//===- RpcServer.h - Run a cosim server -------------------------*- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// Setup and run a server accepting connections via the 'cosim' RPC protocol. +// Then, one can request ports to and from the clients. +// +// Abstract this out to support multi-party communication in the future. +// +//===----------------------------------------------------------------------===// + +#ifndef ESI_COSIM_RPCSERVER_H +#define ESI_COSIM_RPCSERVER_H + +#include "esi/Ports.h" + +namespace esi { +namespace cosim { + +class RpcServer { +public: + ~RpcServer(); + + /// Set the manifest and version. There is a race condition here in that the + /// RPC server can be started and a connection from the client could happen + /// before the manifest is set. TODO: rework the DPI API to require that the + /// manifest gets set first. + void setManifest(int esiVersion, + const std::vector &compressedManifest); + + /// Register a read or write port which communicates over RPC. + ReadChannelPort ®isterReadPort(const std::string &name, + const std::string &type); + WriteChannelPort ®isterWritePort(const std::string &name, + const std::string &type); + + void stop(); + void run(int port); + + /// Hide the implementation details from this header file. + class Impl; + +private: + Impl *impl; +}; + +} // namespace cosim +} // namespace esi + +#endif // ESI_COSIM_RPCSERVER_H diff --git a/lib/Dialect/ESI/runtime/cosim/lib/CapnpThreads.cpp b/lib/Dialect/ESI/runtime/cosim/lib/CapnpThreads.cpp deleted file mode 100644 index ac4a40d5eeb4..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/lib/CapnpThreads.cpp +++ /dev/null @@ -1,58 +0,0 @@ -//===- CapnpThreads.cpp - Cosim RPC common code -----------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#include "cosim/CapnpThreads.h" -#include "CosimDpi.capnp.h" -#include -#include -#ifdef _WIN32 -#include -#else -#include -#endif - -using namespace capnp; -using namespace esi::cosim; - -CapnpCosimThread::CapnpCosimThread() : myThread(nullptr), stopSig(false) {} -CapnpCosimThread::~CapnpCosimThread() { stop(); } - -void CapnpCosimThread::loop(kj::WaitScope &waitScope, - std::function poll) { - // OK, this is uber hacky, but it unblocks me and isn't _too_ inefficient. The - // problem is that I can't figure out how read the stop signal from libkj - // asyncrony land. - // - // IIRC the main libkj wait loop uses `select()` (or something similar on - // Windows) on its FDs. As a result, any code which checks the stop variable - // doesn't run until there is some I/O. Probably the right way is to set up a - // pipe to deliver a shutdown signal. - // - // TODO: Figure out how to do this properly, if possible. - while (!stopSig) { - waitScope.poll(); - poll(); - waitScope.poll(); - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } -} - -/// Signal the RPC server thread to stop. Wait for it to exit. -void CapnpCosimThread::stop() { - Lock g(m); - if (myThread == nullptr) { - fprintf(stderr, "CapnpCosimThread not Run()\n"); - } else if (!stopSig) { - stopSig = true; - myThread->join(); - } -} - -Endpoint *CapnpCosimThread::getEndpoint(std::string epId) { - return endpoints[epId]; -} diff --git a/lib/Dialect/ESI/runtime/cosim/lib/Client.cpp b/lib/Dialect/ESI/runtime/cosim/lib/Client.cpp deleted file mode 100644 index 27f3ffd4d20d..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/lib/Client.cpp +++ /dev/null @@ -1,160 +0,0 @@ -//===- Client.cpp - Cosim RPC client ----------------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#include "CosimDpi.capnp.h" -#include "cosim/CapnpThreads.h" -#include - -#include -#include -#ifdef _WIN32 -#include -#else -#include -#endif - -using namespace capnp; -using namespace esi::cosim; - -/// Internal implementation to hide all the capnp details. -struct esi::cosim::RpcClient::Impl { - - Impl(RpcClient &client, capnp::EzRpcClient &rpcClient) - : client(client), waitScope(rpcClient.getWaitScope()), cosim(nullptr), - lowLevel(nullptr) { - // Get the main interface. - cosim = rpcClient.getMain(); - - // Grab a reference to the low level interface. - auto llReq = cosim.openLowLevelRequest(); - auto llPromise = llReq.send(); - lowLevel = llPromise.wait(waitScope).getLowLevel(); - - // Get the ESI version and compressed manifest. - auto maniResp = cosim.getCompressedManifestRequest().send().wait(waitScope); - capnp::Data::Reader data = maniResp.getCompressedManifest(); - client.esiVersion = maniResp.getVersion(); - client.compressedManifest = std::vector(data.begin(), data.end()); - - // Iterate through the endpoints and register them. - auto capnpEndpointsResp = cosim.listRequest().send().wait(waitScope); - for (const auto &capnpEndpoint : capnpEndpointsResp.getIfaces()) { - assert(capnpEndpoint.hasEndpointID() && - "Response did not contain endpoint ID not found!"); - std::string fromHostType, toHostType; - if (capnpEndpoint.hasFromHostType()) - fromHostType = capnpEndpoint.getFromHostType(); - if (capnpEndpoint.hasToHostType()) - toHostType = capnpEndpoint.getToHostType(); - bool rc = client.endpoints.registerEndpoint(capnpEndpoint.getEndpointID(), - fromHostType, toHostType); - assert(rc && "Endpoint ID already exists!"); - Endpoint *ep = client.endpoints[capnpEndpoint.getEndpointID()]; - // TODO: delay opening until client calls connect(). - auto openReq = cosim.openRequest(); - openReq.setIface(capnpEndpoint); - EsiDpiEndpoint::Client dpiEp = - openReq.send().wait(waitScope).getEndpoint(); - endpointMap.emplace(ep, dpiEp); - } - } - - RpcClient &client; - kj::WaitScope &waitScope; - CosimDpiServer::Client cosim; - EsiLowLevel::Client lowLevel; - std::map endpointMap; - - /// Called from the event loop periodically. - // TODO: try to reduce work in here. Ideally, eliminate polling altogether - // though I can't figure out how with libkj's event loop. - void pollInternal(); -}; - -void esi::cosim::RpcClient::Impl::pollInternal() { - // Iterate through the endpoints checking for messages. - for (auto &[ep, capnpEp] : endpointMap) { - // Process writes to the simulation. - Endpoint::MessageDataPtr msg; - if (!ep->getSendTypeId().empty() && ep->getMessageToSim(msg)) { - auto req = capnpEp.sendFromHostRequest(); - req.setMsg(capnp::Data::Reader(msg->getBytes(), msg->getSize())); - req.send().detach([](kj::Exception &&e) -> void { - throw std::runtime_error("Error sending message to simulation: " + - std::string(e.getDescription().cStr())); - }); - } - - // Process reads from the simulation. - // TODO: polling for a response is horribly slow and inefficient. Rework - // the capnp protocol to avoid it. - if (!ep->getRecvTypeId().empty()) { - auto resp = capnpEp.recvToHostRequest().send().wait(waitScope); - if (resp.getHasData()) { - auto data = resp.getResp(); - ep->pushMessageToClient( - std::make_unique(data.begin(), data.size())); - } - } - } - - // Process MMIO read requests. - if (auto readReq = client.lowLevelBridge.readReqs.pop()) { - auto req = lowLevel.readMMIORequest(); - req.setAddress(*readReq); - auto respPromise = req.send(); - respPromise - .then([&](auto resp) -> void { - client.lowLevelBridge.readResps.push( - std::make_pair(resp.getData(), 0)); - }) - .detach([&](kj::Exception &&e) -> void { - client.lowLevelBridge.readResps.push(std::make_pair(0, 1)); - }); - } - - // Process MMIO write requests. - if (auto writeReq = client.lowLevelBridge.writeReqs.pop()) { - auto req = lowLevel.writeMMIORequest(); - req.setAddress(writeReq->first); - req.setData(writeReq->second); - req.send() - .then([&](auto resp) -> void { - client.lowLevelBridge.writeResps.push(0); - }) - .detach([&](kj::Exception &&e) -> void { - client.lowLevelBridge.writeResps.push(1); - }); - } -} - -void RpcClient::mainLoop(std::string host, uint16_t port) { - capnp::EzRpcClient rpcClient(host, port); - kj::WaitScope &waitScope = rpcClient.getWaitScope(); - Impl impl(*this, rpcClient); - - // Signal that we're good to go. - started.store(true); - - // Start the event loop. Does not return until stop() is called. - loop(waitScope, [&]() { impl.pollInternal(); }); -} - -/// Start the client if not already started. -void RpcClient::run(std::string host, uint16_t port) { - Lock g(m); - if (myThread == nullptr) { - started.store(false); - myThread = new std::thread(&RpcClient::mainLoop, this, host, port); - // Spin until the capnp thread is started and ready to go. - while (!started.load()) - std::this_thread::sleep_for(std::chrono::microseconds(10)); - } else { - fprintf(stderr, "Warning: cannot Run() RPC client more than once!"); - } -} diff --git a/lib/Dialect/ESI/runtime/cosim/lib/Endpoint.cpp b/lib/Dialect/ESI/runtime/cosim/lib/Endpoint.cpp deleted file mode 100644 index cd1c871c52b6..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/lib/Endpoint.cpp +++ /dev/null @@ -1,70 +0,0 @@ -//===- EndPoint.cpp - Definitions for EndPointRegistry ----------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -// -// Definitions for Cosim EndPoint and EndPointRegistry. -// -//===----------------------------------------------------------------------===// - -#include "cosim/Endpoint.h" - -using namespace esi::cosim; - -Endpoint::Endpoint(std::string fromHostTypeId, std::string toHostTypeId) - : fromHostTypeId(fromHostTypeId), toHostTypeId(toHostTypeId), inUse(false) { -} -Endpoint::~Endpoint() {} - -bool Endpoint::setInUse() { - Lock g(m); - if (inUse) - return false; - inUse = true; - return true; -} - -void Endpoint::returnForUse() { - Lock g(m); - if (!inUse) - fprintf(stderr, "Warning: Returning an endpoint which was not in use.\n"); - inUse = false; -} - -bool EndpointRegistry::registerEndpoint(std::string epId, - std::string fromHostTypeId, - std::string toHostTypeId) { - Lock g(m); - if (endpoints.find(epId) != endpoints.end()) { - fprintf(stderr, "Endpoint ID already exists!\n"); - return false; - } - // The following ugliness adds an Endpoint to the map of Endpoints. The - // Endpoint class has its copy constructor deleted, thus the metaprogramming. - endpoints.emplace(std::piecewise_construct, - // Map key. - std::forward_as_tuple(epId), - // Endpoint constructor args. - std::forward_as_tuple(fromHostTypeId, toHostTypeId)); - return true; -} - -void EndpointRegistry::iterateEndpoints( - const std::function &f) const { - // This function is logically const, but modification is needed to obtain a - // lock. - Lock g(const_cast(this)->m); - for (const auto &ep : endpoints) { - f(ep.first, ep.second); - } -} - -size_t EndpointRegistry::size() const { - // This function is logically const, but modification is needed to obtain a - // lock. - Lock g(const_cast(this)->m); - return endpoints.size(); -} diff --git a/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp b/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp new file mode 100644 index 000000000000..dd1658ce740b --- /dev/null +++ b/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp @@ -0,0 +1,365 @@ +//===- RpcServer.cpp - Run a cosim server ---------------------------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include "esi/cosim/RpcServer.h" +#include "esi/Utils.h" + +#include "cosim.grpc.pb.h" + +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace esi; +using namespace esi::cosim; + +using grpc::CallbackServerContext; +using grpc::Server; +using grpc::ServerUnaryReactor; +using grpc::ServerWriteReactor; +using grpc::Status; +using grpc::StatusCode; + +/// Write the port number to a file. Necessary when we are allowed to select our +/// own port. We can't use stdout/stderr because the flushing semantics are +/// undefined (as in `flush()` doesn't work on all simulators). +static void writePort(uint16_t port) { + // "cosim.cfg" since we may want to include other info in the future. + FILE *fd = fopen("cosim.cfg", "w"); + fprintf(fd, "port: %u\n", static_cast(port)); + fclose(fd); +} + +namespace { +class RpcServerReadPort; +class RpcServerWritePort; +} // namespace + +class esi::cosim::RpcServer::Impl + : public esi::cosim::ChannelServer::CallbackService { +public: + Impl(int port); + ~Impl(); + + //===--------------------------------------------------------------------===// + // Internal API + //===--------------------------------------------------------------------===// + + void setManifest(int esiVersion, + const std::vector &compressedManifest) { + this->compressedManifest = compressedManifest; + this->esiVersion = esiVersion; + } + + ReadChannelPort ®isterReadPort(const std::string &name, + const std::string &type); + WriteChannelPort ®isterWritePort(const std::string &name, + const std::string &type); + + void stop(); + + //===--------------------------------------------------------------------===// + // RPC API implementations. See the .proto file for the API documentation. + //===--------------------------------------------------------------------===// + + ServerUnaryReactor *GetManifest(CallbackServerContext *context, + const VoidMessage *, + Manifest *response) override; + ServerUnaryReactor *ListChannels(CallbackServerContext *, const VoidMessage *, + ListOfChannels *channelsOut) override; + ServerWriteReactor * + ConnectToClientChannel(CallbackServerContext *context, + const ChannelDesc *request) override; + ServerUnaryReactor *SendToServer(CallbackServerContext *context, + const esi::cosim::AddressedMessage *request, + esi::cosim::VoidMessage *response) override; + +private: + int esiVersion; + std::vector compressedManifest; + std::map> readPorts; + std::map> writePorts; + + std::unique_ptr server; +}; +using Impl = esi::cosim::RpcServer::Impl; + +//===----------------------------------------------------------------------===// +// Read and write ports +// +// Implemented as simple queues which the RPC server writes to and reads from. +//===----------------------------------------------------------------------===// + +namespace { +/// Implements a simple read queue. The RPC server will push messages into this +/// as appropriate. +class RpcServerReadPort : public ReadChannelPort { +public: + RpcServerReadPort(Type *type) : ReadChannelPort(type) {} + + bool read(MessageData &data) override { + std::optional msg = readQueue.pop(); + if (!msg) + return false; + data = std::move(*msg); + return true; + } + + utils::TSQueue readQueue; +}; + +/// Implements a simple write queue. The RPC server will pull messages from this +/// as appropriate. Note that this could be more performant if a callback is +/// used. This would have more complexity as when a client disconnects the +/// outstanding messages will need somewhere to be held until the next client +/// connects. For now, it's simpler to just have the server poll the queue. +class RpcServerWritePort : public WriteChannelPort { +public: + RpcServerWritePort(Type *type) : WriteChannelPort(type) {} + void write(const MessageData &data) override { writeQueue.push(data); } + + utils::TSQueue writeQueue; +}; +} // namespace + +//===----------------------------------------------------------------------===// +// RPC server implementations +//===----------------------------------------------------------------------===// + +/// Start a server on the given port. -1 means to let the OS pick a port. +Impl::Impl(int port) : esiVersion(-1) { + grpc::ServerBuilder builder; + std::string server_address("127.0.0.1:" + std::to_string(port)); + // TODO: use secure credentials. Not so bad for now since we only accept + // connections on localhost. + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), + &port); + builder.RegisterService(this); + server = builder.BuildAndStart(); + if (!server) + throw std::runtime_error("Failed to start server on " + server_address); + writePort(port); + std::cout << "Server listening on 127.0.0.1:" << port << std::endl; +} + +void Impl::stop() { + /// Shutdown the server and wait for it to finish. + server->Shutdown(); + server->Wait(); + server = nullptr; +} + +Impl::~Impl() { + if (server) + stop(); +} + +ReadChannelPort &Impl::registerReadPort(const std::string &name, + const std::string &type) { + auto port = new RpcServerReadPort(new Type(type)); + readPorts.emplace(name, port); + return *port; +} +WriteChannelPort &Impl::registerWritePort(const std::string &name, + const std::string &type) { + auto port = new RpcServerWritePort(new Type(type)); + writePorts.emplace(name, port); + return *port; +} + +ServerUnaryReactor *Impl::GetManifest(CallbackServerContext *context, + const VoidMessage *, Manifest *response) { + response->set_esi_version(esiVersion); + response->set_compressed_manifest(compressedManifest.data(), + compressedManifest.size()); + ServerUnaryReactor *reactor = context->DefaultReactor(); + reactor->Finish(Status::OK); + return reactor; +} + +/// Load the list of channels into the response and fire it off. +ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context, + const VoidMessage *, + ListOfChannels *channelsOut) { + for (auto &[name, port] : readPorts) { + auto *channel = channelsOut->add_channels(); + channel->set_name(name); + channel->set_type(port->getType()->getID()); + channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER); + } + for (auto &[name, port] : writePorts) { + auto *channel = channelsOut->add_channels(); + channel->set_name(name); + channel->set_type(port->getType()->getID()); + channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT); + } + + // The default reactor is basically to just finish the RPC call as if we're + // implementing the RPC function as a blocking call. + auto reactor = context->DefaultReactor(); + reactor->Finish(Status::OK); + return reactor; +} + +namespace { +/// When a client connects to a read port (on its end, a write port on this +/// end), construct one of these to poll the corresponding write port on this +/// side and forward the messages. +class RpcServerWriteReactor : public ServerWriteReactor { +public: + RpcServerWriteReactor(RpcServerWritePort *writePort) + : writePort(writePort), sentSuccessfully(SendStatus::UnknownStatus), + shutdown(false) { + myThread = std::thread(&RpcServerWriteReactor::threadLoop, this); + } + ~RpcServerWriteReactor() { + shutdown = true; + // Wake up the potentially sleeping thread. + sentSuccessfullyCV.notify_one(); + myThread.join(); + } + + // Deleting 'this' from within a callback is safe since this is how gRPC tells + // us that it's released the reference. This pattern lets gRPC manage this + // object. (Though a shared pointer would be better.) It was actually copied + // from one of the gRPC examples: + // https://github.com/grpc/grpc/blob/4795c5e69b25e8c767b498bea784da0ef8c96fd5/examples/cpp/route_guide/route_guide_callback_server.cc#L120 + // The alternative is to have something else (e.g. Impl) manage this object + // and have this method tell it that gRPC is done with it and it should be + // deleted. As of now, there's no specific need for that and it adds + // additional complexity. If there is at some point in the future, change + // this. + void OnDone() override { delete this; } + void OnWriteDone(bool ok) override { + std::scoped_lock lock(sentMutex); + sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure; + sentSuccessfullyCV.notify_one(); + } + void OnCancel() override { + std::scoped_lock lock(sentMutex); + sentSuccessfully = SendStatus::Disconnect; + sentSuccessfullyCV.notify_one(); + } + +private: + /// The polling loop. + void threadLoop(); + /// The polling thread. + std::thread myThread; + + /// Assoicated write port on this side. (Read port on the client side.) + RpcServerWritePort *writePort; + + /// Mutex to protect the sentSuccessfully flag. + std::mutex sentMutex; + enum SendStatus { UnknownStatus, Success, Failure, Disconnect }; + volatile SendStatus sentSuccessfully; + std::condition_variable sentSuccessfullyCV; + + std::atomic shutdown; +}; + +} // namespace + +void RpcServerWriteReactor::threadLoop() { + while (!shutdown && sentSuccessfully != SendStatus::Disconnect) { + // TODO: adapt this to a new notification mechanism which is forthcoming. + if (writePort->writeQueue.empty()) + std::this_thread::sleep_for(std::chrono::microseconds(100)); + + // This lambda will get called with the message at the front of the queue. + // If the send is successful, return true to pop it. We don't know, however, + // if the message was sent successfully in this thread. It's only when the + // `OnWriteDone` method is called by gRPC that we know. Use locking and + // condition variables to orchestrate this confirmation. + writePort->writeQueue.pop([this](const MessageData &data) -> bool { + esi::cosim::Message msg; + msg.set_data(reinterpret_cast(data.getBytes()), + data.getSize()); + + // Get a lock, reset the flag, start sending the message, and wait for the + // write to complete or fail. Be mindful of the shutdown flag. + std::unique_lock lock(sentMutex); + sentSuccessfully = SendStatus::UnknownStatus; + StartWrite(&msg); + sentSuccessfullyCV.wait(lock, [&]() { + return shutdown || sentSuccessfully != SendStatus::UnknownStatus; + }); + bool ret = sentSuccessfully == SendStatus::Success; + lock.unlock(); + return ret; + }); + } +} + +/// When a client sends a message to a read port (write port on this end), start +/// streaming messages until the client calls uncle and requests a cancellation. +ServerWriteReactor * +Impl::ConnectToClientChannel(CallbackServerContext *context, + const ChannelDesc *request) { + printf("connect to client channel\n"); + auto it = writePorts.find(request->name()); + if (it == writePorts.end()) { + auto reactor = new RpcServerWriteReactor(nullptr); + reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel")); + return reactor; + } + return new RpcServerWriteReactor(it->second.get()); +} + +/// When a client sends a message to a write port (a read port on this end), +/// simply locate the associated port, and write that message into its queue. +ServerUnaryReactor * +Impl::SendToServer(CallbackServerContext *context, + const esi::cosim::AddressedMessage *request, + esi::cosim::VoidMessage *response) { + auto reactor = context->DefaultReactor(); + auto it = readPorts.find(request->channel_name()); + if (it == readPorts.end()) { + reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel")); + return reactor; + } + + std::string msgDataString = request->message().data(); + MessageData data(reinterpret_cast(msgDataString.data()), + msgDataString.size()); + it->second->readQueue.push(std::move(data)); + reactor->Finish(Status::OK); + return reactor; +} + +//===----------------------------------------------------------------------===// +// RpcServer pass throughs to the actual implementations above. +//===----------------------------------------------------------------------===// +RpcServer::~RpcServer() { + if (impl) + delete impl; +} +void RpcServer::setManifest(int esiVersion, + const std::vector &compressedManifest) { + impl->setManifest(esiVersion, compressedManifest); +} +ReadChannelPort &RpcServer::registerReadPort(const std::string &name, + const std::string &type) { + return impl->registerReadPort(name, type); +} +WriteChannelPort &RpcServer::registerWritePort(const std::string &name, + const std::string &type) { + return impl->registerWritePort(name, type); +} +void RpcServer::run(int port) { impl = new Impl(port); } +void RpcServer::stop() { + assert(impl && "Server not running"); + impl->stop(); +} diff --git a/lib/Dialect/ESI/runtime/cosim/lib/Server.cpp b/lib/Dialect/ESI/runtime/cosim/lib/Server.cpp deleted file mode 100644 index 57c34204c7c8..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/lib/Server.cpp +++ /dev/null @@ -1,279 +0,0 @@ -//===- Server.cpp - Cosim RPC server ----------------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -// -// Definitions for the RPC server class. Capnp C++ RPC servers are based on -// 'libkj' and its asyncrony model plus the capnp C++ API, both of which feel -// very foreign. In general, both RPC arguments and returns are passed as a C++ -// object. In order to return data, the capnp message must be constructed inside -// that object. -// -// A [capnp encoded message](https://capnproto.org/encoding.html) can have -// multiple 'segments', which is a pain to deal with. (See comments below.) -// -//===----------------------------------------------------------------------===// - -#include "CosimDpi.capnp.h" -#include "cosim/CapnpThreads.h" -#include -#include -#ifdef _WIN32 -#include -#else -#include -#endif - -using namespace capnp; -using namespace esi::cosim; - -namespace { -/// Implements the `EsiDpiEndpoint` interface from the RPC schema. Mostly a -/// wrapper around an `Endpoint` object. Whereas the `Endpoint`s are long-lived -/// (associated with the HW endpoint), this class is constructed/destructed -/// when the client open()s it. -class EndpointServer final : public EsiDpiEndpoint::Server { - /// The wrapped endpoint. - Endpoint &endpoint; - /// Signals that this endpoint has been opened by a client and hasn't been - /// closed by said client. - bool open; - -public: - EndpointServer(Endpoint &ep); - /// Release the Endpoint should the client disconnect without properly closing - /// it. - ~EndpointServer(); - /// Disallow copying as the 'open' variable needs to track the endpoint. - EndpointServer(const EndpointServer &) = delete; - - /// Implement the EsiDpiEndpoint RPC interface. - kj::Promise sendFromHost(SendFromHostContext) override; - kj::Promise recvToHost(RecvToHostContext) override; - kj::Promise close(CloseContext) override; -}; - -/// Implement the low level cosim RPC protocol. -class LowLevelServer final : public EsiLowLevel::Server { - // Queues to and from the simulation. - LowLevel &bridge; - - // Functions which poll for responses without blocking the main loop. Polling - // ain't great, but it's the only way (AFAICT) to do inter-thread - // communication between a libkj concurrent thread and other threads. There is - // a non-polling way to do it by setting up a queue over a OS-level pipe - // (since the libkj event loop uses 'select'). - kj::Promise pollReadResp(ReadMMIOContext context); - kj::Promise pollWriteResp(WriteMMIOContext context); - -public: - LowLevelServer(LowLevel &bridge); - /// Release the Endpoint should the client disconnect without properly closing - /// it. - ~LowLevelServer(); - /// Disallow copying as the 'open' variable needs to track the endpoint. - LowLevelServer(const LowLevelServer &) = delete; - - // Implement the protocol methods. - kj::Promise readMMIO(ReadMMIOContext) override; - kj::Promise writeMMIO(WriteMMIOContext) override; -}; - -/// Implements the `CosimDpiServer` interface from the RPC schema. -class CosimServer final : public CosimDpiServer::Server { - /// The registry of endpoints. The RpcServer class owns this. - EndpointRegistry ® - LowLevel &lowLevelBridge; - const unsigned int &esiVersion; - const std::vector &compressedManifest; - -public: - CosimServer(EndpointRegistry ®, LowLevel &lowLevelBridge, - const unsigned int &esiVersion, - const std::vector &compressedManifest); - - /// List all the registered interfaces. - kj::Promise list(ListContext ctxt) override; - /// Open a specific interface, locking it in the process. - kj::Promise open(OpenContext ctxt) override; - - kj::Promise - getCompressedManifest(GetCompressedManifestContext) override; - - kj::Promise openLowLevel(OpenLowLevelContext ctxt) override; -}; -} // anonymous namespace - -/// ------ EndpointServer definitions. - -EndpointServer::EndpointServer(Endpoint &ep) : endpoint(ep), open(true) {} -EndpointServer::~EndpointServer() { - if (open) - endpoint.returnForUse(); -} - -/// This is the client polling for a message. If one is available, send it. -/// TODO: implement a blocking call with a timeout. -kj::Promise EndpointServer::recvToHost(RecvToHostContext context) { - KJ_REQUIRE(open, "EndPoint closed already"); - - // Try to pop a message. - Endpoint::MessageDataPtr blob; - auto msgPresent = endpoint.getMessageToClient(blob); - context.getResults().setHasData(msgPresent); - if (msgPresent) { - Data::Builder data(const_cast(blob->getBytes()), blob->getSize()); - context.getResults().setResp(data.asReader()); - } - return kj::READY_NOW; -} - -/// 'Send' is from the client perspective, so this is a message we are -/// recieving. The only way I could figure out to copy the raw message is a -/// double copy. I was have issues getting libkj's arrays to play nice with -/// others. -kj::Promise EndpointServer::sendFromHost(SendFromHostContext context) { - KJ_REQUIRE(open, "EndPoint closed already"); - KJ_REQUIRE(context.getParams().hasMsg(), "Send request must have a message."); - kj::ArrayPtr data = context.getParams().getMsg().asBytes(); - Endpoint::MessageDataPtr blob = std::make_unique( - (const uint8_t *)data.begin(), data.size()); - endpoint.pushMessageToSim(std::move(blob)); - return kj::READY_NOW; -} - -kj::Promise EndpointServer::close(CloseContext context) { - KJ_REQUIRE(open, "EndPoint closed already"); - open = false; - endpoint.returnForUse(); - return kj::READY_NOW; -} - -/// ------ LowLevelServer definitions. - -LowLevelServer::LowLevelServer(LowLevel &bridge) : bridge(bridge) {} -LowLevelServer::~LowLevelServer() {} - -kj::Promise LowLevelServer::pollReadResp(ReadMMIOContext context) { - auto respMaybe = bridge.readResps.pop(); - if (!respMaybe.has_value()) { - return kj::evalLast( - [this, KJ_CPCAP(context)]() mutable { return pollReadResp(context); }); - } - auto resp = respMaybe.value(); - KJ_REQUIRE(resp.second == 0, "Read MMIO register encountered an error"); - context.getResults().setData(resp.first); - return kj::READY_NOW; -} - -kj::Promise LowLevelServer::readMMIO(ReadMMIOContext context) { - bridge.readReqs.push(context.getParams().getAddress()); - return kj::evalLast( - [this, KJ_CPCAP(context)]() mutable { return pollReadResp(context); }); -} - -kj::Promise LowLevelServer::pollWriteResp(WriteMMIOContext context) { - auto respMaybe = bridge.writeResps.pop(); - if (!respMaybe.has_value()) { - return kj::evalLast( - [this, KJ_CPCAP(context)]() mutable { return pollWriteResp(context); }); - } - auto resp = respMaybe.value(); - KJ_REQUIRE(resp == 0, "write MMIO register encountered an error"); - return kj::READY_NOW; -} - -kj::Promise LowLevelServer::writeMMIO(WriteMMIOContext context) { - bridge.writeReqs.push(context.getParams().getAddress(), - context.getParams().getData()); - return kj::evalLast( - [this, KJ_CPCAP(context)]() mutable { return pollWriteResp(context); }); -} - -/// ----- CosimServer definitions. - -CosimServer::CosimServer(EndpointRegistry ®, LowLevel &lowLevelBridge, - const unsigned int &esiVersion, - const std::vector &compressedManifest) - : reg(reg), lowLevelBridge(lowLevelBridge), esiVersion(esiVersion), - compressedManifest(compressedManifest) { - printf("version: %d\n", esiVersion); -} - -kj::Promise CosimServer::list(ListContext context) { - auto ifaces = context.getResults().initIfaces((unsigned int)reg.size()); - unsigned int ctr = 0u; - reg.iterateEndpoints([&](std::string id, const Endpoint &ep) { - ifaces[ctr].setEndpointID(id); - ifaces[ctr].setFromHostType(ep.getSendTypeId()); - ifaces[ctr].setToHostType(ep.getRecvTypeId()); - ++ctr; - }); - return kj::READY_NOW; -} - -kj::Promise CosimServer::open(OpenContext ctxt) { - Endpoint *ep = reg[ctxt.getParams().getIface().getEndpointID()]; - KJ_REQUIRE(ep != nullptr, "Could not find endpoint"); - - auto gotLock = ep->setInUse(); - KJ_REQUIRE(gotLock, "Endpoint in use"); - - ctxt.getResults().setEndpoint( - EsiDpiEndpoint::Client(kj::heap(*ep))); - return kj::READY_NOW; -} - -kj::Promise -CosimServer::getCompressedManifest(GetCompressedManifestContext ctxt) { - ctxt.getResults().setVersion(esiVersion); - ctxt.getResults().setCompressedManifest( - Data::Reader(compressedManifest.data(), compressedManifest.size())); - return kj::READY_NOW; -} - -kj::Promise CosimServer::openLowLevel(OpenLowLevelContext ctxt) { - ctxt.getResults().setLowLevel(kj::heap(lowLevelBridge)); - return kj::READY_NOW; -} - -/// ----- RpcServer definitions. - -/// Write the port number to a file. Necessary when we allow 'EzRpcServer' to -/// select its own port. We can't use stdout/stderr because the flushing -/// semantics are undefined (as in `flush()` doesn't work on all simulators). -static void writePort(uint16_t port) { - // "cosim.cfg" since we may want to include other info in the future. - FILE *fd = fopen("cosim.cfg", "w"); - fprintf(fd, "port: %u\n", (unsigned int)port); - fclose(fd); -} - -void RpcServer::mainLoop(uint16_t port) { - capnp::EzRpcServer rpcServer(kj::heap(endpoints, lowLevelBridge, - esiVersion, - compressedManifest), - /* bindAddress */ "*", port); - auto &waitScope = rpcServer.getWaitScope(); - // If port is 0, ExRpcSever selects one and we have to wait to get the port. - if (port == 0) { - auto portPromise = rpcServer.getPort(); - port = portPromise.wait(waitScope); - } - writePort(port); - printf("[COSIM] Listening on port: %u\n", (unsigned int)port); - loop(waitScope, []() {}); -} - -/// Start the server if not already started. -void RpcServer::run(uint16_t port) { - Lock g(m); - if (myThread == nullptr) { - myThread = new std::thread(&RpcServer::mainLoop, this, port); - } else { - fprintf(stderr, "Warning: cannot Run() RPC server more than once!"); - } -} diff --git a/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h b/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h index fe59c806f39e..0101095c84f5 100644 --- a/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h +++ b/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h @@ -17,12 +17,80 @@ #define ESI_UTILS_H #include +#include +#include +#include +#include #include namespace esi { namespace utils { // Very basic base64 encoding. void encodeBase64(const void *data, size_t size, std::string &out); + +/// Thread safe queue. Just wraps std::queue protected with a lock. Long term, +/// we need to avoid copying data. It has a lot of data copies currently. +template +class TSQueue { + using Lock = std::lock_guard; + + /// The queue and its mutex. + mutable std::mutex qM; + std::queue q; + + /// A mutex to ensure that only one 'pop' operation is happening at a time. It + /// is critical that locks be obtained on this and `qM` same order in both pop + /// methods. This lock should be obtained first since one of the pop methods + /// must unlock `qM` then relock it. + std::mutex popM; + +public: + /// Push onto the queue. + template + void push(E... t) { + Lock l(qM); + q.emplace(t...); + } + + /// Pop something off the queue but return nullopt if the queue is empty. Why + /// doesn't std::queue have anything like this? + std::optional pop() { + Lock pl(popM); + Lock ql(qM); + if (q.size() == 0) + return std::nullopt; + auto t = q.front(); + q.pop(); + return t; + } + + /// Call the callback for the front of the queue (if anything is there). Only + /// pop it off the queue if the callback returns true. + void pop(std::function callback) { + // Since we need to unlock the mutex to call the callback, the queue + // could be pushed on to and its memory layout could thusly change, + // invalidating the reference returned by `.front()`. The easy solution here + // is to copy the data. TODO: Avoid copying the data. + Lock pl(popM); + T t; + { + Lock l(qM); + if (q.size() == 0) + return; + t = q.front(); + } + if (callback(t)) { + Lock l(qM); + q.pop(); + } + } + + /// Is the queue empty? + bool empty() const { + Lock l(qM); + return q.empty(); + } +}; } // namespace utils } // namespace esi diff --git a/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h b/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h index 14610a726a60..e78ff4cb5559 100644 --- a/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h +++ b/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h @@ -31,8 +31,8 @@ namespace esi { namespace cosim { -class RpcClient; -} // namespace cosim +class ChannelDesc; +} namespace backends { namespace cosim { @@ -41,6 +41,8 @@ namespace cosim { class CosimAccelerator : public esi::AcceleratorConnection { public: CosimAccelerator(Context &, std::string hostname, uint16_t port); + ~CosimAccelerator(); + static std::unique_ptr connect(Context &, std::string connectionString); @@ -58,6 +60,11 @@ class CosimAccelerator : public esi::AcceleratorConnection { virtual std::map requestChannelsFor(AppIDPath, const BundleType *) override; + // C++ doesn't have a mechanism to forward declare a nested class and we don't + // want to include the generated header here. So we have to wrap it in a + // forward-declared struct we write ourselves. + struct StubContainer; + protected: virtual Service *createService(Service::Type service, AppIDPath path, std::string implName, @@ -65,7 +72,11 @@ class CosimAccelerator : public esi::AcceleratorConnection { const HWClientDetails &clients) override; private: - std::unique_ptr rpcClient; + StubContainer *rpcClient; + + /// Get the type ID for a channel name. + bool getChannelDesc(const std::string &channelName, + esi::cosim::ChannelDesc &desc); // We own all channels connected to rpcClient since their lifetime is tied to // rpcClient. diff --git a/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp b/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp index 8ec6493be790..9f6c75a4c614 100644 --- a/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp +++ b/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp @@ -15,8 +15,15 @@ #include "esi/backends/Cosim.h" #include "esi/Services.h" +#include "esi/Utils.h" -#include "cosim/CapnpThreads.h" +#include "cosim.grpc.pb.h" + +#include +#include +#include +#include +#include #include #include @@ -25,9 +32,30 @@ using namespace std; using namespace esi; +using namespace esi::cosim; using namespace esi::services; using namespace esi::backends::cosim; +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::ClientReaderWriter; +using grpc::ClientWriter; +using grpc::Status; + +static void checkStatus(Status s, const string &msg) { + if (!s.ok()) + throw runtime_error(msg + ". Code " + to_string(s.error_code()) + ": " + + s.error_message() + " (" + s.error_details() + ")"); +} + +/// Hack around C++ not having a way to forward declare a nested class. +struct esi::backends::cosim::CosimAccelerator::StubContainer { + StubContainer(std::unique_ptr stub) + : stub(std::move(stub)) {} + std::unique_ptr stub; +}; + /// Parse the connection string and instantiate the accelerator. Support the /// traditional 'host:port' syntax and a path to 'cosim.cfg' which is output by /// the cosimulation when it starts (which is useful when it chooses its own @@ -80,147 +108,213 @@ CosimAccelerator::CosimAccelerator(Context &ctxt, string hostname, uint16_t port) : AcceleratorConnection(ctxt) { // Connect to the simulation. - rpcClient = std::make_unique(); - rpcClient->run(hostname, port); + auto channel = grpc::CreateChannel(hostname + ":" + to_string(port), + grpc::InsecureChannelCredentials()); + rpcClient = new StubContainer(ChannelServer::NewStub(channel)); +} +CosimAccelerator::~CosimAccelerator() { + if (rpcClient) + delete rpcClient; + channels.clear(); } -namespace { -class CosimMMIO : public MMIO { -public: - CosimMMIO(esi::cosim::LowLevel *lowLevel) : lowLevel(lowLevel) {} - - // Push the read request into the LowLevel capnp bridge and wait for the - // response. - uint32_t read(uint32_t addr) const override { - lowLevel->readReqs.push(addr); - - std::optional> resp; - while (resp = lowLevel->readResps.pop(), !resp.has_value()) - std::this_thread::sleep_for(std::chrono::microseconds(10)); - if (resp->second != 0) - throw runtime_error("MMIO read error" + to_string(resp->second)); - return resp->first; - } - - // Push the write request into the LowLevel capnp bridge and wait for the ack - // or error. - void write(uint32_t addr, uint32_t data) override { - lowLevel->writeReqs.push(make_pair(addr, data)); - - std::optional resp; - while (resp = lowLevel->writeResps.pop(), !resp.has_value()) - std::this_thread::sleep_for(std::chrono::microseconds(10)); - if (*resp != 0) - throw runtime_error("MMIO write error" + to_string(*resp)); - } - -private: - esi::cosim::LowLevel *lowLevel; -}; -} // namespace +// TODO: Fix MMIO! +// namespace { +// class CosimMMIO : public MMIO { +// public: +// CosimMMIO(esi::cosim::LowLevel *lowLevel) : lowLevel(lowLevel) {} + +// // Push the read request into the LowLevel capnp bridge and wait for the +// // response. +// uint32_t read(uint32_t addr) const override { +// lowLevel->readReqs.push(addr); + +// std::optional> resp; +// while (resp = lowLevel->readResps.pop(), !resp.has_value()) +// std::this_thread::sleep_for(std::chrono::microseconds(10)); +// if (resp->second != 0) +// throw runtime_error("MMIO read error" + to_string(resp->second)); +// return resp->first; +// } + +// // Push the write request into the LowLevel capnp bridge and wait for the +// ack +// // or error. +// void write(uint32_t addr, uint32_t data) override { +// lowLevel->writeReqs.push(make_pair(addr, data)); + +// std::optional resp; +// while (resp = lowLevel->writeResps.pop(), !resp.has_value()) +// std::this_thread::sleep_for(std::chrono::microseconds(10)); +// if (*resp != 0) +// throw runtime_error("MMIO write error" + to_string(*resp)); +// } + +// private: +// esi::cosim::LowLevel *lowLevel; +// }; +// } // namespace namespace { class CosimSysInfo : public SysInfo { public: - CosimSysInfo(const std::unique_ptr &rpcClient) - : rpcClient(rpcClient) {} + CosimSysInfo(ChannelServer::Stub *rpcClient) : rpcClient(rpcClient) {} uint32_t getEsiVersion() const override { - unsigned int esiVersion; - std::vector compressedManifest; - if (!rpcClient->getCompressedManifest(esiVersion, compressedManifest)) - throw runtime_error("Could not get ESI version from cosim"); - return esiVersion; + ::esi::cosim::Manifest response = getManifest(); + return response.esi_version(); } vector getCompressedManifest() const override { - unsigned int esiVersion; - std::vector compressedManifest; - if (!rpcClient->getCompressedManifest(esiVersion, compressedManifest)) - throw runtime_error("Could not get ESI version from cosim"); - return compressedManifest; + ::esi::cosim::Manifest response = getManifest(); + std::string compressedManifestStr = response.compressed_manifest(); + return std::vector(compressedManifestStr.begin(), + compressedManifestStr.end()); } private: - const std::unique_ptr &rpcClient; + ::esi::cosim::Manifest getManifest() const { + ::esi::cosim::Manifest response; + // To get around the a race condition where the manifest may not be set yet, + // loop until it is. TODO: fix this with the DPI API change. + do { + ClientContext context; + VoidMessage arg; + Status s = rpcClient->GetManifest(&context, arg, &response); + checkStatus(s, "Failed to get manifest"); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } while (response.esi_version() < 0); + return response; + } + + esi::cosim::ChannelServer::Stub *rpcClient; }; } // namespace namespace { +/// Cosim client implementation of a write channel port. class WriteCosimChannelPort : public WriteChannelPort { public: - WriteCosimChannelPort(esi::cosim::Endpoint *ep, const Type *type, string name) - : WriteChannelPort(type), ep(ep), name(name) {} - virtual ~WriteCosimChannelPort() = default; - - // TODO: Replace this with a request to connect to the capnp thread. - virtual void connect() override { - if (!ep) - throw runtime_error("Could not find channel '" + name + - "' in cosimulation"); - if (ep->getSendTypeId() == "") - throw runtime_error("Channel '" + name + "' is not a read channel"); - if (ep->getSendTypeId() != getType()->getID()) + WriteCosimChannelPort(ChannelServer::Stub *rpcClient, const ChannelDesc &desc, + const Type *type, string name) + : WriteChannelPort(type), rpcClient(rpcClient), desc(desc), name(name) {} + ~WriteCosimChannelPort() = default; + + void connect() override { + WriteChannelPort::connect(); + if (desc.type() != getType()->getID()) throw runtime_error("Channel '" + name + "' has wrong type. Expected " + - getType()->getID() + ", got " + ep->getSendTypeId()); - ep->setInUse(); + getType()->getID() + ", got " + desc.type()); + if (desc.dir() != ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER) + throw runtime_error("Channel '" + name + "' is not a to server channel"); + assert(desc.name() == name); } - virtual void disconnect() override { - if (ep) - ep->returnForUse(); + + /// Send a write message to the server. + void write(const MessageData &data) override { + ClientContext context; + AddressedMessage msg; + msg.set_channel_name(name); + msg.mutable_message()->set_data(data.getBytes(), data.getSize()); + VoidMessage response; + grpc::Status sendStatus = rpcClient->SendToServer(&context, msg, &response); + if (!sendStatus.ok()) + throw runtime_error("Failed to write to channel '" + name + + "': " + sendStatus.error_message() + + ". Details: " + sendStatus.error_details()); } - virtual void write(const MessageData &) override; protected: - esi::cosim::Endpoint *ep; + ChannelServer::Stub *rpcClient; + /// The channel description as provided by the server. + ChannelDesc desc; + /// The name of the channel from the manifest. string name; }; } // namespace -void WriteCosimChannelPort::write(const MessageData &data) { - ep->pushMessageToSim(make_unique(data)); -} - namespace { -class ReadCosimChannelPort : public ReadChannelPort { +/// Cosim client implementation of a read channel port. Since gRPC read protocol +/// streams messages back, this implementation is quite complex. +class ReadCosimChannelPort + : public ReadChannelPort, + public grpc::ClientReadReactor { public: - ReadCosimChannelPort(esi::cosim::Endpoint *ep, const Type *type, string name) - : ReadChannelPort(type), ep(ep), name(name) {} - virtual ~ReadCosimChannelPort() = default; + ReadCosimChannelPort(ChannelServer::Stub *rpcClient, const ChannelDesc &desc, + const Type *type, string name) + : ReadChannelPort(type), rpcClient(rpcClient), desc(desc), name(name), + context(nullptr) {} + virtual ~ReadCosimChannelPort() { disconnect(); } - // TODO: Replace this with a request to connect to the capnp thread. virtual void connect() override { - if (!ep) - throw runtime_error("Could not find channel '" + name + - "' in cosimulation"); - if (ep->getRecvTypeId() == "") - throw runtime_error("Channel '" + name + "' is not a read channel"); - if (ep->getRecvTypeId() != getType()->getID()) + // Sanity checking. + ReadChannelPort::connect(); + if (desc.type() != getType()->getID()) throw runtime_error("Channel '" + name + "' has wrong type. Expected " + - getType()->getID() + ", got " + ep->getRecvTypeId()); - ep->setInUse(); + getType()->getID() + ", got " + desc.type()); + if (desc.dir() != ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT) + throw runtime_error("Channel '" + name + "' is not a to server channel"); + assert(desc.name() == name); + + // Initiate a stream of messages from the server. + context = std::make_unique(); + rpcClient->async()->ConnectToClientChannel(context.get(), &desc, this); + StartCall(); + StartRead(&incomingMessage); } - virtual void disconnect() override { - if (ep) - ep->returnForUse(); + + /// Gets called when there's a new message from the server. It'll be stored in + /// `incomingMessage`. + void OnReadDone(bool ok) override { + if (!ok) { + // TODO: should we do something here? + std::cerr << "Internal error: read failed due to not `ok`." << std::endl; + return; + } + + // Read the delivered message and push it onto the queue. + const std::string &messageString = incomingMessage.data(); + MessageData data(reinterpret_cast(messageString.data()), + messageString.size()); + messageQueue.push(data); + + // Initiate the next read. + StartRead(&incomingMessage); + } + + /// Disconnect this channel from the server. + void disconnect() override { + if (!context) + return; + context->TryCancel(); + context.reset(); + } + + /// Poll the queue. + bool read(MessageData &data) override { + std::optional msg = messageQueue.pop(); + if (!msg.has_value()) + return false; + data = std::move(*msg); + return true; } - virtual bool read(MessageData &) override; protected: - esi::cosim::Endpoint *ep; + ChannelServer::Stub *rpcClient; + /// The channel description as provided by the server. + ChannelDesc desc; + /// The name of the channel from the manifest. string name; + + std::unique_ptr context; + /// Storage location for the incoming message. + esi::cosim::Message incomingMessage; + /// Queue of messages read from the server. + esi::utils::TSQueue messageQueue; }; } // namespace -bool ReadCosimChannelPort::read(MessageData &data) { - esi::cosim::Endpoint::MessageDataPtr msg; - if (!ep->getMessageToClient(msg)) - return false; - data = *msg; - return true; -} - map CosimAccelerator::requestChannelsFor(AppIDPath idPath, const BundleType *bundleType) { @@ -242,18 +336,43 @@ CosimAccelerator::requestChannelsFor(AppIDPath idPath, // Get the endpoint, which may or may not exist. Construct the port. // Everything is validated when the client calls 'connect()' on the port. - esi::cosim::Endpoint *ep = rpcClient->getEndpoint(channelName); + ChannelDesc chDesc; + if (!getChannelDesc(channelName, chDesc)) + throw runtime_error("Could not find channel '" + channelName + + "' in cosimulation"); + ChannelPort *port; - if (BundlePort::isWrite(dir)) - port = new WriteCosimChannelPort(ep, type, channelName); - else - port = new ReadCosimChannelPort(ep, type, channelName); + if (BundlePort::isWrite(dir)) { + port = new WriteCosimChannelPort(rpcClient->stub.get(), chDesc, type, + channelName); + } else { + port = new ReadCosimChannelPort(rpcClient->stub.get(), chDesc, type, + channelName); + } channels.emplace(port); channelResults.emplace(name, *port); } return channelResults; } +/// Get the channel description for a channel name. Iterate through the list +/// each time. Since this will only be called a small number of times on a small +/// list, it's not worth doing anything fancy. +bool CosimAccelerator::getChannelDesc(const string &channelName, + ChannelDesc &desc) { + ClientContext context; + VoidMessage arg; + ListOfChannels response; + Status s = rpcClient->stub->ListChannels(&context, arg, &response); + checkStatus(s, "Failed to list channels"); + for (const auto &channel : response.channels()) + if (channel.name() == channelName) { + desc = channel; + return true; + } + return false; +} + Service *CosimAccelerator::createService(Service::Type svcType, AppIDPath idPath, std::string implName, const ServiceImplDetails &details, @@ -277,11 +396,11 @@ Service *CosimAccelerator::createService(Service::Type svcType, } if (svcType == typeid(services::MMIO)) { - return new CosimMMIO(rpcClient->getLowLevel()); + // return new CosimMMIO(rpcClient->getLowLevel()); } else if (svcType == typeid(SysInfo)) { switch (manifestMethod) { case ManifestMethod::Cosim: - return new CosimSysInfo(rpcClient); + return new CosimSysInfo(rpcClient->stub.get()); case ManifestMethod::MMIO: return new MMIOSysInfo(getService()); } diff --git a/lib/Dialect/ESI/runtime/cpp/tools/esitester.cpp b/lib/Dialect/ESI/runtime/cpp/tools/esitester.cpp index 1d4fc64df4f9..f54af9f27ffd 100644 --- a/lib/Dialect/ESI/runtime/cpp/tools/esitester.cpp +++ b/lib/Dialect/ESI/runtime/cpp/tools/esitester.cpp @@ -60,6 +60,8 @@ int main(int argc, const char *argv[]) { while (true) { this_thread::sleep_for(chrono::milliseconds(100)); } + } else if (cmd == "wait") { + this_thread::sleep_for(chrono::seconds(1)); } acc->disconnect(); diff --git a/utils/get-capnp.sh b/utils/get-capnp.sh deleted file mode 100755 index 18cf72248423..000000000000 --- a/utils/get-capnp.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env bash -##===- utils/get-capnp.sh - Install CapnProto ----------------*- Script -*-===## -# -# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -# See https://llvm.org/LICENSE.txt for license information. -# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -# -##===----------------------------------------------------------------------===## -# -# This script downloads, compiles, and installs CapnProto into $/ext. -# Cap'nProto is use by ESI (Elastic Silicon Interfaces) cosimulation as a -# message format and RPC client/server. -# -# It will also optionally install pycapnp, which is used for testing. -# -##===----------------------------------------------------------------------===## - -echo "Do you wish to install pycapnp? Cosim integration tests require pycapnp." -read -p "Yes to confirm: " yn -case $yn in - [Yy]* ) pip3 install pycapnp;; - * ) echo "Skipping.";; -esac - -mkdir -p "$(dirname "$BASH_SOURCE[0]")/../ext" -EXT_DIR=$(cd "$(dirname "$BASH_SOURCE[0]")/../ext" && pwd) -CAPNP_VER=0.9.1 -echo "Installing capnproto..." - -echo $EXT_DIR -cd $EXT_DIR - -wget https://capnproto.org/capnproto-c++-$CAPNP_VER.tar.gz -tar -zxf capnproto-c++-$CAPNP_VER.tar.gz -cd capnproto-c++-$CAPNP_VER -./configure --prefix=$EXT_DIR -make -j$(nproc) -make install -cd ../ -rm -r capnproto-c++-$CAPNP_VER capnproto-c++-$CAPNP_VER.tar.gz - -echo "Done." diff --git a/utils/get-grpc.sh b/utils/get-grpc.sh new file mode 100755 index 000000000000..c7391ba32bbc --- /dev/null +++ b/utils/get-grpc.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +##===- utils/get-grpc.sh - Install gRPC (for ESI runtime) ----*- Script -*-===## +# +# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +# See https://llvm.org/LICENSE.txt for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +##===----------------------------------------------------------------------===## +# +# +##===----------------------------------------------------------------------===## + +mkdir -p "$(dirname "$BASH_SOURCE[0]")/../ext" +EXT_DIR=$(cd "$(dirname "$BASH_SOURCE[0]")/../ext" && pwd) +# v1.54.2 is the version in Ubuntu 22.04 +GRPC_VER=1.54.2 +echo "Installing gRPC..." + +echo $EXT_DIR +cd $EXT_DIR + +if [ ! -d grpc ]; then + git clone --recurse-submodules -b v$GRPC_VER https://github.com/grpc/grpc +fi +cd grpc +mkdir -p cmake/build +cd cmake/build +cmake -S ../.. -B . -DCMAKE_INSTALL_PREFIX=$EXT_DIR \ + -DgRPC_INSTALL=ON \ + -DCMAKE_BUILD_TYPE=Debug +make -j$(nproc) +make install + +cd ../../../ +rm -rf grpc + +echo "Done."