Skip to content
This repository has been archived by the owner on Aug 30, 2018. It is now read-only.

Commit

Permalink
go-proio: added InputStreamer for go-hep.org/x/hep/fwk integration
Browse files Browse the repository at this point in the history
  • Loading branch information
decibelcooper committed Apr 3, 2018
1 parent 68bed7d commit 73417ec
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 0 deletions.
55 changes: 55 additions & 0 deletions go-proio/fwkStream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package proio

import (
"fmt"
"io"
"reflect"

"go-hep.org/x/hep/fwk"
)

// InputStream implements the go-hep.org/x/hep/fwk Task interface
type InputStream struct {
R io.Reader

outputs []string
rdr *Reader
}

// Connect establishes output ports, returning an error for the wrong type
func (stream *InputStream) Connect(ports []fwk.Port) error {
eventType := reflect.TypeOf(&Event{})
for _, port := range ports {
switch port.Type {
case eventType:
stream.outputs = append(stream.outputs, port.Name)
default:
return fmt.Errorf("Invalid port type: %v", port.Type)
}
}

stream.rdr = NewReader(stream.R)

return nil
}

func (stream *InputStream) Read(ctx fwk.Context) error {
event, err := stream.rdr.Next()
if err != nil {
return err
}

for _, output := range stream.outputs {
if err = ctx.Store().Put(output, event); err != nil {
return err
}
}

return nil
}

func (stream *InputStream) Disconnect() error {
stream.rdr.Close()

return nil
}
123 changes: 123 additions & 0 deletions go-proio/fwk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package proio

import (
"bytes"
"reflect"
"testing"

"github.com/decibelcooper/proio/go-proio/model/eic"
"go-hep.org/x/hep/fwk"
"go-hep.org/x/hep/fwk/job"
)

func TestInputStream1(t *testing.T) {
buffer := &bytes.Buffer{}

wrt := NewWriter(buffer)
event := NewEvent()
event.AddEntry("test", &eic.SimHit{})
wrt.Push(event)
wrt.Push(event)
wrt.Push(event)
wrt.Push(event)
wrt.Close()

app := job.New(job.P{
"EvtMax": int64(4),
"NProcs": 2,
"MsgLevel": job.MsgLevel("ERROR"),
})

app.Create(job.C{
Type: "github.com/decibelcooper/proio/go-proio.PrintEvents",
Name: "eventprinter",
Props: job.P{
"Input": "toprint",
"Output": "printed",
},
})

app.Create(job.C{
Type: "go-hep.org/x/hep/fwk.InputStream",
Name: "input",
Props: job.P{
"Ports": []fwk.Port{
{
Name: "toprint",
Type: reflect.TypeOf(&Event{}),
},
},
"Streamer": &InputStream{R: buffer},
},
})

app.Run()
}

type PrintEvents struct {
fwk.TaskBase

input string
output string
}

func (tsk *PrintEvents) Configure(ctx fwk.Context) error {
if err := tsk.DeclInPort(tsk.input, reflect.TypeOf(&Event{})); err != nil {
return err
}

if err := tsk.DeclOutPort(tsk.output, reflect.TypeOf("")); err != nil {
return err
}

return nil
}

func (tsk *PrintEvents) StartTask(ctx fwk.Context) error {
return nil
}

func (tsk *PrintEvents) Process(ctx fwk.Context) error {
store := ctx.Store()

v, err := store.Get(tsk.input)
if err != nil {
return err
}
event := v.(*Event)

evtString := event.String()
err = store.Put(tsk.output, evtString)
if err != nil {
return err
}

return nil
}

func (tsk *PrintEvents) StopTask(ctx fwk.Context) error {
return nil
}

func init() {
fwk.Register(
reflect.TypeOf(PrintEvents{}),
func(typ, name string, mgr fwk.App) (fwk.Component, error) {
tsk := &PrintEvents{
TaskBase: fwk.NewTask(typ, name, mgr),
input: "printevents",
output: "printedevents",
}

if err := tsk.DeclProp("Input", &tsk.input); err != nil {
return nil, err
}

if err := tsk.DeclProp("Output", &tsk.output); err != nil {
return nil, err
}

return tsk, nil
},
)
}

0 comments on commit 73417ec

Please sign in to comment.