Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early Access: Task Worker proposal #2

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions model/fn-execution/src/main/proto/beam_task_worker.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Protocol Buffers describing a custom bundle processer used by Task Worker.
*/

syntax = "proto3";

package org.apache.beam.model.fn_execution.v1;

option go_package = "fnexecution_v1";
option java_package = "org.apache.beam.model.fnexecution.v1";
//option java_outer_classname = "JobApi";

import "beam_fn_api.proto";
import "endpoints.proto";
import "google/protobuf/struct.proto";
import "metrics.proto";


//
// Control Plane
//

// An API that describes the work that a bundle processor task worker is meant to do.
service TaskControl {

// Instructions sent by the SDK to the task worker requesting different types
// of work.
rpc Control (
// A stream of responses to instructions the task worker was asked to be
// performed.
stream TaskInstructionResponse)
returns (
// A stream of instructions requested of the task worker to be performed.
stream TaskInstructionRequest);
}

// A request sent by SDK which the task worker is asked to fulfill.
message TaskInstructionRequest {
// (Required) An unique identifier provided by the SDK which represents
// this requests execution. The InstructionResponse MUST have the matching id.
string instruction_id = 1;

// (Required) A request that the task worker needs to interpret.
oneof request {
CreateRequest create = 1000;
ProcessorProcessBundleRequest process_bundle = 1001;
ShutdownRequest shutdown = 1002;
}
}

// The response for an associated request the task worker had been asked to fulfill.
message TaskInstructionResponse {

// (Required) A reference provided by the SDK which represents a requests
// execution. The InstructionResponse MUST have the matching id when
// responding to the SDK.
string instruction_id = 1;

// An equivalent response type depending on the request this matches.
oneof response {
CreateResponse create = 1000;
ProcessorProcessBundleResponse process_bundle = 1001;
ShutdownResponse shutdown = 1002;
}

// (Optional) If there's error processing request
string error = 2;
}

message ChannelCredentials {
string _credentials = 1;
}

message GrpcClientDataChannelFactory {
ChannelCredentials credentials = 1;
string worker_id = 2;
string transmitter_url = 3;
}

message CreateRequest {
org.apache.beam.model.fn_execution.v1.ProcessBundleDescriptor process_bundle_descriptor = 1; // (required)
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_handler_endpoint = 2; // (required)
GrpcClientDataChannelFactory data_factory = 3; // (required)
}

message CreateResponse {
}

message ProcessorProcessBundleRequest {
// (Optional) The cache token that can be used by an SDK to reuse
// cached data returned by the State API across multiple bundles.
string cache_token = 1;
}

message ProcessorProcessBundleResponse {
repeated org.apache.beam.model.fn_execution.v1.DelayedBundleApplication delayed_applications = 1;
bool require_finalization = 2;
}

message ShutdownRequest {
}

message ShutdownResponse {
}


//
// Data Plane
//

service TaskFnData {
// Handles data transferring between TaskWorkerHandler and Task Worker.
rpc Receive(
ReceiveRequest)
returns (
// A stream of data representing output.
stream Elements.Data);


// Used to send data from proxy bundle processor to sdk harness
rpc Send (SendRequest) returns (SendResponse);
}

message ReceiveRequest {
string instruction_id = 1;
string client_data_endpoint = 2;
}

message SendRequest {
string instruction_id = 1;
string client_data_endpoint = 2;
Elements.Data data = 3;
}

message SendResponse {
string error = 1;
}
82 changes: 82 additions & 0 deletions sdks/python/apache_beam/examples/taskworker/k8s.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Sample pipeline showcasing how to use the kubernetes task worker.
"""

from __future__ import absolute_import

import argparse
import logging
from typing import TYPE_CHECKING

import apache_beam as beam
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.task.task_worker.kubejob.transforms import KubeTask

if TYPE_CHECKING:
from typing import Any
from apache_beam.task.task_worker.kubejob.handler import KubePayload


def process(element):
# type: (int) -> int
import time
start = time.time()
print('processing...')
# represents an expensive process of some sort
time.sleep(element)
print('processing took {:0.6f} s'.format(time.time() - start))
return element


def _per_element_wrapper(element, payload):
# type: (Any, KubePayload) -> KubePayload
"""
Callback to modify the kubernetes job properties per-element.
"""
payload.job_name += '-{}'.format(element)
return payload


def run(argv=None, save_main_session=True):
"""
Run a pipeline that submits two kubernetes jobs, each that simply sleep
"""
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options.view_as(DirectOptions).direct_running_mode = 'multi_processing'

with beam.Pipeline(options=pipeline_options) as pipe:
(
pipe
| beam.Create([20, 42])
| 'kubetask' >> KubeTask(
beam.Map(process),
wrapper=_per_element_wrapper)
)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
79 changes: 79 additions & 0 deletions sdks/python/apache_beam/examples/taskworker/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Basic pipeline to test using TaskWorkers.
"""

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from apache_beam.runners.worker.task_worker.core import BeamTask


class TestFn(beam.DoFn):

def process(self, element, side):
from apache_beam.runners.worker.task_worker.handlers import TaskableValue

for s in side:
if isinstance(element, TaskableValue):
value = element.value
else:
value = element
print(value + s)
yield value + s


def run(argv=None, save_main_session=True):
"""
Run a pipeline that executes each element using the local task worker.
"""
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options.view_as(DirectOptions).direct_running_mode = 'multi_processing'

with beam.Pipeline(options=pipeline_options) as pipe:

A = (
pipe
| 'A' >> beam.Create(range(3))
)

B = (
pipe
| beam.Create(range(2))
| BeamTask(beam.ParDo(TestFn(), beam.pvalue.AsList(A)),
wrapper=lambda x, _: x)
)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Loading