Skip to content

Commit

Permalink
Handling multi worker scenerio for pystein (#1142)
Browse files Browse the repository at this point in the history
* Handling multi worker scenerio for pystein

* flake8 fixes

* Fixed unit tests

* Flake8 fix

Co-authored-by: Gavin Aguiar <[email protected]>
  • Loading branch information
gavin-aguiar and Gavin Aguiar authored Nov 16, 2022
1 parent 28e5fdb commit 9a58193
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 99 deletions.
104 changes: 60 additions & 44 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,30 +312,7 @@ async def _handle__functions_metadata_request(self, request):
status=protos.StatusResult.Success)))

try:
indexed_functions = loader.index_function_app(function_path)
logger.info('Indexed function app and found %s functions',
len(indexed_functions))

fx_metadata_results = []
if indexed_functions:
indexed_function_logs: List[str] = []
for func in indexed_functions:
function_log = "Function Name: {}, Function Binding: {}" \
.format(func.get_function_name(),
[(binding.type, binding.name) for binding in
func.get_bindings()])
indexed_function_logs.append(function_log)

logger.info(
'Successfully processed FunctionMetadataRequest for '
'functions: %s', " ".join(indexed_function_logs))

fx_metadata_results = loader.process_indexed_function(
self._functions,
indexed_functions)
else:
logger.warning("No functions indexed. Please refer to "
"aka.ms/pythonprogrammingmodel for more info.")
fx_metadata_results = self.index_functions(function_path)

return protos.StreamingMessage(
request_id=request.request_id,
Expand All @@ -355,33 +332,48 @@ async def _handle__functions_metadata_request(self, request):
async def _handle__function_load_request(self, request):
func_request = request.function_load_request
function_id = func_request.function_id
function_name = func_request.metadata.name
function_metadata = func_request.metadata
function_name = function_metadata.name
function_path = os.path.join(function_metadata.directory,
SCRIPT_FILE_NAME)

logger.info(
'Received WorkerLoadRequest, request ID %s, function_id: %s,'
'function_name: %s,', self.request_id, function_id, function_name)

try:
if not self._functions.get_function(function_id):
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)

logger.info('Successfully processed FunctionLoadRequest, '
'request ID: %s, '
'function ID: %s,'
'function Name: %s', self.request_id, function_id,
function_name)
if function_metadata.properties.get("worker_indexed", False) \
or os.path.exists(function_path):
# This is for the second worker and above where the worker
# indexing is enabled and load request is called without
# calling the metadata request. In this case we index the
# function and update the workers registry
logger.info(f"Indexing function {function_name} in the "
f"load request")
_ = self.index_functions(function_path)
else:
# legacy function
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)

logger.info('Successfully processed FunctionLoadRequest, '
'request ID: %s, '
'function ID: %s,'
'function Name: %s', self.request_id,
function_id,
function_name)

return protos.StreamingMessage(
request_id=self.request_id,
Expand Down Expand Up @@ -577,6 +569,30 @@ async def _handle__function_environment_reload_request(self, request):
request_id=self.request_id,
function_environment_reload_response=failure_response)

def index_functions(self, function_path: str):
indexed_functions = loader.index_function_app(function_path)
logger.info('Indexed function app and found %s functions',
len(indexed_functions))

if indexed_functions:
indexed_function_logs: List[str] = []
for func in indexed_functions:
function_log = "Function Name: {}, Function Binding: {}" \
.format(func.get_function_name(),
[(binding.type, binding.name) for binding in
func.get_bindings()])
indexed_function_logs.append(function_log)

logger.info(
'Successfully processed FunctionMetadataRequest for '
'functions: %s', " ".join(indexed_function_logs))

fx_metadata_results = loader.process_indexed_function(
self._functions,
indexed_functions)

return fx_metadata_results

async def _handle__close_shared_memory_resources_request(self, request):
"""
Frees any memory maps that were produced as output for a given
Expand Down
8 changes: 6 additions & 2 deletions azure_functions_worker/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import operator
import pathlib
import typing
import uuid

from . import bindings as bindings_utils
from . import protos
Expand All @@ -21,6 +22,7 @@ class FunctionInfo(typing.NamedTuple):

name: str
directory: str
function_id: str
requires_context: bool
is_async: bool
has_return: bool
Expand Down Expand Up @@ -311,6 +313,7 @@ def add_func_to_registry_and_return_funcinfo(self, function,
func=function,
name=function_name,
directory=directory,
function_id=function_id,
requires_context=requires_context,
is_async=inspect.iscoroutinefunction(function),
has_return=has_explicit_return or has_implicit_return,
Expand Down Expand Up @@ -371,11 +374,12 @@ def add_function(self, function_id: str,
input_types,
output_types, return_type)

def add_indexed_function(self, function_id: str,
function):
def add_indexed_function(self, function):
func = function.get_user_function()
func_name = function.get_function_name()
func_type = function.http_type
function_id = str(uuid.uuid5(namespace=uuid.NAMESPACE_OID,
name=func_name))
return_binding_name: typing.Optional[str] = None
has_explicit_return = False
has_implicit_return = False
Expand Down
8 changes: 3 additions & 5 deletions azure_functions_worker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import os.path
import pathlib
import sys
import uuid
from os import PathLike, fspath
from typing import Optional, Dict

Expand Down Expand Up @@ -65,24 +64,23 @@ def process_indexed_function(functions_registry: functions.Registry,
indexed_functions):
fx_metadata_results = []
for indexed_function in indexed_functions:
function_id = str(uuid.uuid4())
function_info = functions_registry.add_indexed_function(
function_id,
function=indexed_function)

binding_protos = build_binding_protos(indexed_function)

function_metadata = protos.RpcFunctionMetadata(
name=function_info.name,
function_id=function_id,
function_id=function_info.function_id,
managed_dependency_enabled=False, # only enabled for PowerShell
directory=function_info.directory,
script_file=indexed_function.function_script_file,
entry_point=function_info.name,
is_proxy=False, # not supported in V4
language=PYTHON_LANGUAGE_RUNTIME,
bindings=binding_protos,
raw_bindings=indexed_function.get_raw_bindings())
raw_bindings=indexed_function.get_raw_bindings(),
properties={"worker_indexed": "True"})

fx_metadata_results.append(function_metadata)

Expand Down
Loading

0 comments on commit 9a58193

Please sign in to comment.