From 9a58193076e9b7b14990aa3fe09b5336b5c3db4c Mon Sep 17 00:00:00 2001 From: gavin-aguiar <80794152+gavin-aguiar@users.noreply.github.com> Date: Wed, 16 Nov 2022 17:11:47 -0600 Subject: [PATCH] Handling multi worker scenerio for pystein (#1142) * Handling multi worker scenerio for pystein * flake8 fixes * Fixed unit tests * Flake8 fix Co-authored-by: Gavin Aguiar --- azure_functions_worker/dispatcher.py | 104 +++++++------ azure_functions_worker/functions.py | 8 +- azure_functions_worker/loader.py | 8 +- .../protos/_src/src/proto/FunctionRpc.proto | 146 ++++++++++++------ tests/endtoend/test_multi_worker_functions.py | 43 ++++++ tests/unittests/test_functions_registry.py | 6 +- 6 files changed, 216 insertions(+), 99 deletions(-) create mode 100644 tests/endtoend/test_multi_worker_functions.py diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index f0cc832f..24c25ece 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -312,30 +312,7 @@ async def _handle__functions_metadata_request(self, request): status=protos.StatusResult.Success))) try: - indexed_functions = loader.index_function_app(function_path) - logger.info('Indexed function app and found %s functions', - len(indexed_functions)) - - fx_metadata_results = [] - if indexed_functions: - indexed_function_logs: List[str] = [] - for func in indexed_functions: - function_log = "Function Name: {}, Function Binding: {}" \ - .format(func.get_function_name(), - [(binding.type, binding.name) for binding in - func.get_bindings()]) - indexed_function_logs.append(function_log) - - logger.info( - 'Successfully processed FunctionMetadataRequest for ' - 'functions: %s', " ".join(indexed_function_logs)) - - fx_metadata_results = loader.process_indexed_function( - self._functions, - indexed_functions) - else: - logger.warning("No functions indexed. Please refer to " - "aka.ms/pythonprogrammingmodel for more info.") + fx_metadata_results = self.index_functions(function_path) return protos.StreamingMessage( request_id=request.request_id, @@ -355,7 +332,10 @@ async def _handle__functions_metadata_request(self, request): async def _handle__function_load_request(self, request): func_request = request.function_load_request function_id = func_request.function_id - function_name = func_request.metadata.name + function_metadata = func_request.metadata + function_name = function_metadata.name + function_path = os.path.join(function_metadata.directory, + SCRIPT_FILE_NAME) logger.info( 'Received WorkerLoadRequest, request ID %s, function_id: %s,' @@ -363,25 +343,37 @@ async def _handle__function_load_request(self, request): try: if not self._functions.get_function(function_id): - func = loader.load_function( - func_request.metadata.name, - func_request.metadata.directory, - func_request.metadata.script_file, - func_request.metadata.entry_point) - - self._functions.add_function( - function_id, func, func_request.metadata) - - ExtensionManager.function_load_extension( - function_name, - func_request.metadata.directory - ) - - logger.info('Successfully processed FunctionLoadRequest, ' - 'request ID: %s, ' - 'function ID: %s,' - 'function Name: %s', self.request_id, function_id, - function_name) + if function_metadata.properties.get("worker_indexed", False) \ + or os.path.exists(function_path): + # This is for the second worker and above where the worker + # indexing is enabled and load request is called without + # calling the metadata request. In this case we index the + # function and update the workers registry + logger.info(f"Indexing function {function_name} in the " + f"load request") + _ = self.index_functions(function_path) + else: + # legacy function + func = loader.load_function( + func_request.metadata.name, + func_request.metadata.directory, + func_request.metadata.script_file, + func_request.metadata.entry_point) + + self._functions.add_function( + function_id, func, func_request.metadata) + + ExtensionManager.function_load_extension( + function_name, + func_request.metadata.directory + ) + + logger.info('Successfully processed FunctionLoadRequest, ' + 'request ID: %s, ' + 'function ID: %s,' + 'function Name: %s', self.request_id, + function_id, + function_name) return protos.StreamingMessage( request_id=self.request_id, @@ -577,6 +569,30 @@ async def _handle__function_environment_reload_request(self, request): request_id=self.request_id, function_environment_reload_response=failure_response) + def index_functions(self, function_path: str): + indexed_functions = loader.index_function_app(function_path) + logger.info('Indexed function app and found %s functions', + len(indexed_functions)) + + if indexed_functions: + indexed_function_logs: List[str] = [] + for func in indexed_functions: + function_log = "Function Name: {}, Function Binding: {}" \ + .format(func.get_function_name(), + [(binding.type, binding.name) for binding in + func.get_bindings()]) + indexed_function_logs.append(function_log) + + logger.info( + 'Successfully processed FunctionMetadataRequest for ' + 'functions: %s', " ".join(indexed_function_logs)) + + fx_metadata_results = loader.process_indexed_function( + self._functions, + indexed_functions) + + return fx_metadata_results + async def _handle__close_shared_memory_resources_request(self, request): """ Frees any memory maps that were produced as output for a given diff --git a/azure_functions_worker/functions.py b/azure_functions_worker/functions.py index 100bdb06..f6d59122 100644 --- a/azure_functions_worker/functions.py +++ b/azure_functions_worker/functions.py @@ -4,6 +4,7 @@ import operator import pathlib import typing +import uuid from . import bindings as bindings_utils from . import protos @@ -21,6 +22,7 @@ class FunctionInfo(typing.NamedTuple): name: str directory: str + function_id: str requires_context: bool is_async: bool has_return: bool @@ -311,6 +313,7 @@ def add_func_to_registry_and_return_funcinfo(self, function, func=function, name=function_name, directory=directory, + function_id=function_id, requires_context=requires_context, is_async=inspect.iscoroutinefunction(function), has_return=has_explicit_return or has_implicit_return, @@ -371,11 +374,12 @@ def add_function(self, function_id: str, input_types, output_types, return_type) - def add_indexed_function(self, function_id: str, - function): + def add_indexed_function(self, function): func = function.get_user_function() func_name = function.get_function_name() func_type = function.http_type + function_id = str(uuid.uuid5(namespace=uuid.NAMESPACE_OID, + name=func_name)) return_binding_name: typing.Optional[str] = None has_explicit_return = False has_implicit_return = False diff --git a/azure_functions_worker/loader.py b/azure_functions_worker/loader.py index b3831b4f..8f545691 100644 --- a/azure_functions_worker/loader.py +++ b/azure_functions_worker/loader.py @@ -8,7 +8,6 @@ import os.path import pathlib import sys -import uuid from os import PathLike, fspath from typing import Optional, Dict @@ -65,16 +64,14 @@ def process_indexed_function(functions_registry: functions.Registry, indexed_functions): fx_metadata_results = [] for indexed_function in indexed_functions: - function_id = str(uuid.uuid4()) function_info = functions_registry.add_indexed_function( - function_id, function=indexed_function) binding_protos = build_binding_protos(indexed_function) function_metadata = protos.RpcFunctionMetadata( name=function_info.name, - function_id=function_id, + function_id=function_info.function_id, managed_dependency_enabled=False, # only enabled for PowerShell directory=function_info.directory, script_file=indexed_function.function_script_file, @@ -82,7 +79,8 @@ def process_indexed_function(functions_registry: functions.Registry, is_proxy=False, # not supported in V4 language=PYTHON_LANGUAGE_RUNTIME, bindings=binding_protos, - raw_bindings=indexed_function.get_raw_bindings()) + raw_bindings=indexed_function.get_raw_bindings(), + properties={"worker_indexed": "True"}) fx_metadata_results.append(function_metadata) diff --git a/azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto b/azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto index 734211c4..155af606 100644 --- a/azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto +++ b/azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto @@ -26,13 +26,14 @@ message StreamingMessage { oneof content { // Worker initiates stream - StartStream start_stream = 20; + StartStream start_stream = 20; // Host sends capabilities/init data to worker WorkerInitRequest worker_init_request = 17; // Worker responds after initializing with its capabilities & status WorkerInitResponse worker_init_response = 16; + // MESSAGE NOT USED // Worker periodically sends empty heartbeat message to host WorkerHeartbeat worker_heartbeat = 15; @@ -40,7 +41,7 @@ message StreamingMessage { // Worker terminates if it can, otherwise host terminates after a grace period WorkerTerminate worker_terminate = 14; - // Add any worker relevant status to response + // Host periodically sends status request to the worker WorkerStatusRequest worker_status_request = 12; WorkerStatusResponse worker_status_response = 13; @@ -49,25 +50,25 @@ message StreamingMessage { // Worker requests a desired action (restart worker, reload function) WorkerActionResponse worker_action_response = 7; - + // Host sends required metadata to worker to load function FunctionLoadRequest function_load_request = 8; // Worker responds after loading with the load result FunctionLoadResponse function_load_response = 9; - + // Host requests a given invocation InvocationRequest invocation_request = 4; // Worker responds to a given invocation InvocationResponse invocation_response = 5; - // Host sends cancel message to attempt to cancel an invocation. + // Host sends cancel message to attempt to cancel an invocation. // If an invocation is cancelled, host will receive an invocation response with status cancelled. InvocationCancel invocation_cancel = 21; // Worker logs a message back to the host RpcLog rpc_log = 2; - + FunctionEnvironmentReloadRequest function_environment_reload_request = 25; FunctionEnvironmentReloadResponse function_environment_reload_response = 26; @@ -91,7 +92,7 @@ message StreamingMessage { // Process.Start required info // connection details // protocol type -// protocol version +// protocol version // Worker sends the host information identifying itself message StartStream { @@ -99,7 +100,7 @@ message StartStream { string worker_id = 2; } -// Host requests the worker to initialize itself +// Host requests the worker to initialize itself message WorkerInitRequest { // version of the host sending init request string host_version = 1; @@ -120,13 +121,35 @@ message WorkerInitRequest { // Worker responds with the result of initializing itself message WorkerInitResponse { - // Version of worker + // PROPERTY NOT USED + // TODO: Remove from protobuf during next breaking change release string worker_version = 1; + // A map of worker supported features/capabilities map capabilities = 2; // Status of the response StatusResult result = 3; + + // Worker metadata captured for telemetry purposes + WorkerMetadata worker_metadata = 4; +} + +message WorkerMetadata { + // The runtime/stack name + string runtime_name = 1; + + // The version of the runtime/stack + string runtime_version = 2; + + // The version of the worker + string worker_version = 3; + + // The worker bitness/architecture + string worker_bitness = 4; + + // Optional additional custom properties + map custom_properties = 5; } // Used by the host to determine success/failure/cancellation @@ -137,6 +160,7 @@ message StatusResult { Success = 1; Cancelled = 2; } + // Status for the given result Status status = 4; @@ -150,9 +174,8 @@ message StatusResult { repeated RpcLog logs = 3; } -// TODO: investigate grpc heartbeat - don't limit to grpc implemention - -// Message is empty by design - Will add more fields in future if needed +// MESSAGE NOT USED +// TODO: Remove from protobuf during next breaking change release message WorkerHeartbeat {} // Warning before killing the process after grace_period @@ -165,7 +188,7 @@ message WorkerTerminate { message FileChangeEventRequest { // Types of File change operations (See link for more info: https://msdn.microsoft.com/en-us/library/t6xf43e0(v=vs.110).aspx) enum Type { - Unknown = 0; + Unknown = 0; Created = 1; Deleted = 2; Changed = 4; @@ -185,12 +208,12 @@ message FileChangeEventRequest { // Indicates whether worker reloaded successfully or needs a restart message WorkerActionResponse { - // indicates whether a restart is needed, or reload succesfully + // indicates whether a restart is needed, or reload successfully enum Action { Restart = 0; Reload = 1; } - + // action for this response Action action = 1; @@ -198,11 +221,12 @@ message WorkerActionResponse { string reason = 2; } -// NOT USED -message WorkerStatusRequest{ +// Used by the host to determine worker health +message WorkerStatusRequest { } -// NOT USED +// Worker responds with status message +// TODO: Add any worker relevant status to response message WorkerStatusResponse { } @@ -271,7 +295,7 @@ message RpcFunctionMetadata { // base directory for the Function string directory = 1; - + // Script file specified string script_file = 2; @@ -298,6 +322,11 @@ message RpcFunctionMetadata { // A flag indicating if managed dependency is enabled or not bool managed_dependency_enabled = 14; + + // Properties for function metadata + // They're usually specific to a worker and largely passed along to the controller API for use + // outside the host + map properties = 16; } // Host tells worker it is ready to receive metadata @@ -341,14 +370,14 @@ message InvocationRequest { // Host sends ActivityId, traceStateString and Tags from host message RpcTraceContext { - // This corresponds to Activity.Current?.Id - string trace_parent = 1; + // This corresponds to Activity.Current?.Id + string trace_parent = 1; - // This corresponds to Activity.Current?.TraceStateString - string trace_state = 2; + // This corresponds to Activity.Current?.TraceStateString + string trace_state = 2; - // This corresponds to Activity.Current?.Tags - map attributes = 3; + // This corresponds to Activity.Current?.Tags + map attributes = 3; } // Host sends retry context for a function invocation @@ -368,8 +397,8 @@ message InvocationCancel { // Unique id for invocation string invocation_id = 2; - // Time period before force shutdown - google.protobuf.Duration grace_period = 1; // could also use absolute time + // PROPERTY NOT USED + google.protobuf.Duration grace_period = 1; } // Worker responds with status of Invocation @@ -401,6 +430,7 @@ message TypedData { CollectionString collection_string = 9; CollectionDouble collection_double = 10; CollectionSInt64 collection_sint64 = 11; + ModelBindingData model_binding_data = 12; } } @@ -468,20 +498,20 @@ message ParameterBinding { // Used to describe a given binding on load message BindingInfo { - // Indicates whether it is an input or output binding (or a fancy inout binding) - enum Direction { - in = 0; - out = 1; - inout = 2; - } - - // Indicates the type of the data for the binding - enum DataType { - undefined = 0; - string = 1; - binary = 2; - stream = 3; - } + // Indicates whether it is an input or output binding (or a fancy inout binding) + enum Direction { + in = 0; + out = 1; + inout = 2; + } + + // Indicates the type of the data for the binding + enum DataType { + undefined = 0; + string = 1; + binary = 2; + stream = 3; + } // Type of binding (e.g. HttpTrigger) string type = 2; @@ -490,9 +520,12 @@ message BindingInfo { Direction direction = 3; DataType data_type = 4; + + // Properties for binding metadata + map properties = 5; } -// Used to send logs back to the Host +// Used to send logs back to the Host message RpcLog { // Matching ILogger semantics // https://github.com/aspnet/Logging/blob/9506ccc3f3491488fe88010ef8b9eb64594abf95/src/Microsoft.Extensions.Logging/Logger.cs @@ -543,7 +576,7 @@ message RpcLog { map propertiesMap = 9; } -// Encapsulates an Exception +// Encapsulates an Exception message RpcException { // Source of the exception string source = 3; @@ -553,6 +586,14 @@ message RpcException { // Textual message describing the exception string message = 2; + + // Worker specifies whether exception is a user exception, + // for purpose of application insights logging. Defaults to false. + bool is_user_exception = 4; + + // Type of exception. If it's a user exception, the type is passed along to app insights. + // Otherwise, it's ignored for now. + string type = 5; } // Http cookie type. Note that only name and value are used for Http requests @@ -597,7 +638,7 @@ message RpcHttpCookie { // TODO - solidify this or remove it message RpcHttp { string method = 1; - string url = 2; + string url = 2; map headers = 3; TypedData body = 4; map params = 10; @@ -610,4 +651,21 @@ message RpcHttp { map nullable_headers = 20; map nullable_params = 21; map nullable_query = 22; +} + +// Message representing Microsoft.Azure.WebJobs.ParameterBindingData +// Used for hydrating SDK-type bindings in out-of-proc workers +message ModelBindingData +{ + // The version of the binding data content + string version = 1; + + // The extension source of the binding data + string source = 2; + + // The content type of the binding data content + string content_type = 3; + + // The binding data content + bytes content = 4; } \ No newline at end of file diff --git a/tests/endtoend/test_multi_worker_functions.py b/tests/endtoend/test_multi_worker_functions.py new file mode 100644 index 00000000..828030c8 --- /dev/null +++ b/tests/endtoend/test_multi_worker_functions.py @@ -0,0 +1,43 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import os +from threading import Thread +from unittest.mock import patch + +from tests.utils import testutils + + +class TestWorkerProcessCountStein(testutils.WebHostTestCase): + """Test the Http Trigger with setting up the python worker process count to 2. + This tests will check if worker1 indexes the function in metadata request + and worker2 indexes the function in the load request since worker2 does not + call metadata request. + """ + @classmethod + def setUpClass(cls): + os_environ = os.environ.copy() + os_environ['FUNCTIONS_WORKER_PROCESS_COUNT'] = '2' + cls._patch_environ = patch.dict('os.environ', os_environ) + cls._patch_environ.start() + super().setUpClass() + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'http_functions' / \ + 'http_functions_stein' + + def test_http_func_with_worker_process_count(self): + """Test if the default template of Http trigger in Python Function app + will return OK + """ + def http_req(): + r = self.webhost.request('GET', 'default_template') + self.assertTrue(r.ok) + + # creating 2 different threads to send HTTP request + trd1 = Thread(target=http_req, args=(0,)) + trd2 = Thread(target=http_req, args=(1,)) + trd1.start() + trd2.start() + trd1.join() + trd2.join() diff --git a/tests/unittests/test_functions_registry.py b/tests/unittests/test_functions_registry.py index 2682ba37..06d6af2e 100644 --- a/tests/unittests/test_functions_registry.py +++ b/tests/unittests/test_functions_registry.py @@ -26,8 +26,7 @@ def test_add_indexed_function_invalid_route(self): self.func.add_trigger(trigger=trigger1) with self.assertRaises(FunctionLoadError) as ex: - self.function_registry.add_indexed_function(function_id='123', - function=self.func) + self.function_registry.add_indexed_function(function=self.func) self.assertEqual(str(ex.exception), 'cannot load the dummy function: Invalid route name: ' @@ -41,8 +40,7 @@ def test_add_indexed_function_invalid_direction(self): self.func.add_binding(binding=binding) with self.assertRaises(FunctionLoadError) as ex: - self.function_registry.add_indexed_function(function_id='123', - function=self.func) + self.function_registry.add_indexed_function(function=self.func) self.assertEqual(str(ex.exception), 'cannot load the dummy function: \"$return\" '