Skip to content
This repository has been archived by the owner on Aug 30, 2018. It is now read-only.

Commit

Permalink
go-proio: added fwk subpackage for go-hep.org/x/hep/fwk integration
Browse files Browse the repository at this point in the history
  • Loading branch information
decibelcooper committed Apr 3, 2018
1 parent 68bed7d commit c083e01
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 0 deletions.
56 changes: 56 additions & 0 deletions go-proio/fwk/inputstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package fwk

import (
"fmt"
"io"
"reflect"

"github.com/decibelcooper/proio/go-proio"
"go-hep.org/x/hep/fwk"
)

// InputStream implements the go-hep.org/x/hep/fwk Task interface
type InputStream struct {
R io.Reader

outputs []string
rdr *proio.Reader
}

// Connect establishes output ports, returning an error for the wrong type
func (stream *InputStream) Connect(ports []fwk.Port) error {
eventType := reflect.TypeOf(&proio.Event{})
for _, port := range ports {
switch port.Type {
case eventType:
stream.outputs = append(stream.outputs, port.Name)
default:
return fmt.Errorf("Invalid port type: %v", port.Type)
}
}

stream.rdr = proio.NewReader(stream.R)

return nil
}

func (stream *InputStream) Read(ctx fwk.Context) error {
event, err := stream.rdr.Next()
if err != nil {
return err
}

for _, output := range stream.outputs {
if err = ctx.Store().Put(output, event); err != nil {
return err
}
}

return nil
}

func (stream *InputStream) Disconnect() error {
stream.rdr.Close()

return nil
}
124 changes: 124 additions & 0 deletions go-proio/fwk/streamer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package fwk

import (
"bytes"
"reflect"
"testing"

"github.com/decibelcooper/proio/go-proio"
"github.com/decibelcooper/proio/go-proio/model/eic"
"go-hep.org/x/hep/fwk"
"go-hep.org/x/hep/fwk/job"
)

func TestInputStream1(t *testing.T) {
buffer := &bytes.Buffer{}

wrt := proio.NewWriter(buffer)
event := proio.NewEvent()
event.AddEntry("test", &eic.SimHit{})
wrt.Push(event)
wrt.Push(event)
wrt.Push(event)
wrt.Push(event)
wrt.Close()

app := job.New(job.P{
"EvtMax": int64(4),
"NProcs": 2,
"MsgLevel": job.MsgLevel("ERROR"),
})

app.Create(job.C{
Type: "github.com/decibelcooper/proio/go-proio/fwk.PrintEvents",
Name: "eventprinter",
Props: job.P{
"Input": "toprint",
"Output": "printed",
},
})

app.Create(job.C{
Type: "go-hep.org/x/hep/fwk.InputStream",
Name: "input",
Props: job.P{
"Ports": []fwk.Port{
{
Name: "toprint",
Type: reflect.TypeOf(&proio.Event{}),
},
},
"Streamer": &InputStream{R: buffer},
},
})

app.Run()
}

type PrintEvents struct {
fwk.TaskBase

input string
output string
}

func (tsk *PrintEvents) Configure(ctx fwk.Context) error {
if err := tsk.DeclInPort(tsk.input, reflect.TypeOf(&proio.Event{})); err != nil {
return err
}

if err := tsk.DeclOutPort(tsk.output, reflect.TypeOf("")); err != nil {
return err
}

return nil
}

func (tsk *PrintEvents) StartTask(ctx fwk.Context) error {
return nil
}

func (tsk *PrintEvents) Process(ctx fwk.Context) error {
store := ctx.Store()

v, err := store.Get(tsk.input)
if err != nil {
return err
}
event := v.(*proio.Event)

evtString := event.String()
err = store.Put(tsk.output, evtString)
if err != nil {
return err
}

return nil
}

func (tsk *PrintEvents) StopTask(ctx fwk.Context) error {
return nil
}

func init() {
fwk.Register(
reflect.TypeOf(PrintEvents{}),
func(typ, name string, mgr fwk.App) (fwk.Component, error) {
tsk := &PrintEvents{
TaskBase: fwk.NewTask(typ, name, mgr),
input: "printevents",
output: "printedevents",
}

if err := tsk.DeclProp("Input", &tsk.input); err != nil {
return nil, err
}

if err := tsk.DeclProp("Output", &tsk.output); err != nil {
return nil, err
}

return tsk, nil
},
)
}

0 comments on commit c083e01

Please sign in to comment.