Skip to content
This repository has been archived by the owner on Aug 30, 2018. It is now read-only.

[WIP] go-hep.org/x/hep/fwk integration #98

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions go-proio/fwk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package proio

import (
"io"

"go-hep.org/x/hep/fwk"
)

// InputStream implements the go-hep.org/x/hep/fwk Task interface
type InputStream struct {
R io.Reader

outputs []string
rdr *Reader
}

// Connect establishes output ports (returning an error for the wrong type),
// and sets up a Reader with R (io.Reader) as the raw input.
func (stream *InputStream) Connect(ports []fwk.Port) error {
for _, port := range ports {
stream.outputs = append(stream.outputs, port.Name)
}

stream.rdr = NewReader(stream.R)

return nil
}

// Read grabs the next Event from the underlying Reader.
func (stream *InputStream) Read(ctx fwk.Context) error {
event, err := stream.rdr.Next()
if err != nil {
return err
}

for _, output := range stream.outputs {
if err = ctx.Store().Put(output, event); err != nil {
return err
}
}

return nil
}

// Disconnect closes the underlying Reader, but leaves R open.
func (stream *InputStream) Disconnect() error {
stream.rdr.Close()

return nil
}
127 changes: 127 additions & 0 deletions go-proio/fwk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package proio

import (
"bytes"
"reflect"
"testing"

"github.com/decibelcooper/proio/go-proio/model/eic"
"go-hep.org/x/hep/fwk"
"go-hep.org/x/hep/fwk/job"
)

func TestInputStream1(t *testing.T) {
buffer := &bytes.Buffer{}

wrt := NewWriter(buffer)
event := NewEvent()
event.AddEntry("test", &eic.SimHit{})
wrt.Push(event)
wrt.Push(event)
wrt.Push(event)
wrt.Push(event)
wrt.Close()

app := job.New(job.P{
"EvtMax": int64(5),
"NProcs": 2,
"MsgLevel": job.MsgLevel("ERROR"),
})

app.Create(job.C{
Type: "github.com/decibelcooper/proio/go-proio.PrintEvents",
Name: "eventprinter",
Props: job.P{
"Input": "toprint",
"Output": "printed",
},
})

app.Create(job.C{
Type: "go-hep.org/x/hep/fwk.InputStream",
Name: "input",
Props: job.P{
"Ports": []fwk.Port{
{
Name: "toprint",
Type: reflect.TypeOf(&Event{}),
},
{
Name: "badport",
Type: reflect.TypeOf(0),
},
},
"Streamer": &InputStream{R: buffer},
},
})

app.Run()
}

type PrintEvents struct {
fwk.TaskBase

input string
output string
}

func (tsk *PrintEvents) Configure(ctx fwk.Context) error {
if err := tsk.DeclInPort(tsk.input, reflect.TypeOf(&Event{})); err != nil {
return err
}

if err := tsk.DeclOutPort(tsk.output, reflect.TypeOf("")); err != nil {
return err
}

return nil
}

func (tsk *PrintEvents) StartTask(ctx fwk.Context) error {
return nil
}

func (tsk *PrintEvents) Process(ctx fwk.Context) error {
store := ctx.Store()

v, err := store.Get(tsk.input)
if err != nil {
return err
}
event := v.(*Event)

evtString := event.String()
err = store.Put(tsk.output, evtString)
if err != nil {
return err
}

return nil
}

func (tsk *PrintEvents) StopTask(ctx fwk.Context) error {
return nil
}

func init() {
fwk.Register(
reflect.TypeOf(PrintEvents{}),
func(typ, name string, mgr fwk.App) (fwk.Component, error) {
tsk := &PrintEvents{
TaskBase: fwk.NewTask(typ, name, mgr),
input: "printevents",
output: "printedevents",
}

if err := tsk.DeclProp("Input", &tsk.input); err != nil {
return nil, err
}

if err := tsk.DeclProp("Output", &tsk.output); err != nil {
return nil, err
}

return tsk, nil
},
)
}
17 changes: 12 additions & 5 deletions go-proio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func Open(filename string) (*Reader, error) {
return nil, err
}

return NewReader(file), nil
rdr := NewReader(file)
rdr.deferUntilClose(file.Close)

return rdr, nil
}

// NewReader wraps an existing io.Reader for reading proio Events. Either Open
Expand All @@ -61,15 +64,19 @@ func NewReader(streamReader io.Reader) *Reader {
// Close closes any file that was opened by the library, and stops any
// unfinished scans. Close does not close io.Readers passed directly to
// NewReader.
func (rdr *Reader) Close() {
func (rdr *Reader) Close() error {
rdr.Lock()
defer rdr.Unlock()

rdr.StopScan()
closer, ok := rdr.streamReader.(io.Closer)
if ok {
closer.Close()
for _, thisFunc := range rdr.deferredUntilClose {
if err := thisFunc(); err != nil {
return err
}
}
rdr.deferredUntilClose = make([]func() error, 0)

return nil
}

// Next retrieves the next event from the stream.
Expand Down