diff --git a/go-proio/fwk/inputstream.go b/go-proio/fwk/inputstream.go new file mode 100644 index 0000000..3ed67dd --- /dev/null +++ b/go-proio/fwk/inputstream.go @@ -0,0 +1,56 @@ +package fwk + +import ( + "fmt" + "io" + "reflect" + + "github.com/decibelcooper/proio/go-proio" + "go-hep.org/x/hep/fwk" +) + +// InputStream implements the go-hep.org/x/hep/fwk Task interface +type InputStream struct { + R io.Reader + + outputs []string + rdr *proio.Reader +} + +// Connect establishes output ports, returning an error for the wrong type +func (stream *InputStream) Connect(ports []fwk.Port) error { + eventType := reflect.TypeOf(&proio.Event{}) + for _, port := range ports { + switch port.Type { + case eventType: + stream.outputs = append(stream.outputs, port.Name) + default: + return fmt.Errorf("Invalid port type: %v", port.Type) + } + } + + stream.rdr = proio.NewReader(stream.R) + + return nil +} + +func (stream *InputStream) Read(ctx fwk.Context) error { + event, err := stream.rdr.Next() + if err != nil { + return err + } + + for _, output := range stream.outputs { + if err = ctx.Store().Put(output, event); err != nil { + return err + } + } + + return nil +} + +func (stream *InputStream) Disconnect() error { + stream.rdr.Close() + + return nil +} diff --git a/go-proio/fwk/streamer_test.go b/go-proio/fwk/streamer_test.go new file mode 100644 index 0000000..1d5e5d3 --- /dev/null +++ b/go-proio/fwk/streamer_test.go @@ -0,0 +1,124 @@ +package fwk + +import ( + "bytes" + "reflect" + "testing" + + "github.com/decibelcooper/proio/go-proio" + "github.com/decibelcooper/proio/go-proio/model/eic" + "go-hep.org/x/hep/fwk" + "go-hep.org/x/hep/fwk/job" +) + +func TestInputStream1(t *testing.T) { + buffer := &bytes.Buffer{} + + wrt := proio.NewWriter(buffer) + event := proio.NewEvent() + event.AddEntry("test", &eic.SimHit{}) + wrt.Push(event) + wrt.Push(event) + wrt.Push(event) + wrt.Push(event) + wrt.Close() + + app := job.New(job.P{ + "EvtMax": int64(4), + "NProcs": 2, + "MsgLevel": job.MsgLevel("ERROR"), + }) + + app.Create(job.C{ + Type: "github.com/decibelcooper/proio/go-proio/fwk.PrintEvents", + Name: "eventprinter", + Props: job.P{ + "Input": "toprint", + "Output": "printed", + }, + }) + + app.Create(job.C{ + Type: "go-hep.org/x/hep/fwk.InputStream", + Name: "input", + Props: job.P{ + "Ports": []fwk.Port{ + { + Name: "toprint", + Type: reflect.TypeOf(&proio.Event{}), + }, + }, + "Streamer": &InputStream{R: buffer}, + }, + }) + + app.Run() +} + +type PrintEvents struct { + fwk.TaskBase + + input string + output string +} + +func (tsk *PrintEvents) Configure(ctx fwk.Context) error { + if err := tsk.DeclInPort(tsk.input, reflect.TypeOf(&proio.Event{})); err != nil { + return err + } + + if err := tsk.DeclOutPort(tsk.output, reflect.TypeOf("")); err != nil { + return err + } + + return nil +} + +func (tsk *PrintEvents) StartTask(ctx fwk.Context) error { + return nil +} + +func (tsk *PrintEvents) Process(ctx fwk.Context) error { + store := ctx.Store() + + v, err := store.Get(tsk.input) + if err != nil { + return err + } + event := v.(*proio.Event) + + evtString := event.String() + err = store.Put(tsk.output, evtString) + if err != nil { + return err + } + + return nil +} + +func (tsk *PrintEvents) StopTask(ctx fwk.Context) error { + return nil +} + +func init() { + fwk.Register( + reflect.TypeOf(PrintEvents{}), + func(typ, name string, mgr fwk.App) (fwk.Component, error) { + tsk := &PrintEvents{ + TaskBase: fwk.NewTask(typ, name, mgr), + input: "printevents", + output: "printedevents", + } + + if err := tsk.DeclProp("Input", &tsk.input); err != nil { + return nil, err + } + + if err := tsk.DeclProp("Output", &tsk.output); err != nil { + return nil, err + } + + return tsk, nil + }, + ) +}