Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store shard metadata in S3, add a tailing facility #5

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
build/
src/github.com/
src/gopkg.in/
/.go/
cscope.*
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ build:

clean:
rm -rf build

cscope:
find $$GOPATH/src -type f -iname "*.go"> cscope.files
cscope -b -k
42 changes: 15 additions & 27 deletions triton/archive.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package triton

import (
"fmt"
"regexp"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -17,8 +14,7 @@ type StoreArchive struct {
Key string
ClientName string

T time.Time
SortValue int
T time.Time

s3Svc S3Service
rdr Reader
Expand All @@ -43,39 +39,31 @@ func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) {
}

func (sa *StoreArchive) parseKeyName(keyName string) (err error) {
re := regexp.MustCompile(`(?P<day>\d{8})\/(?P<stream>.+)\-(?P<ts>\d+)\.tri$`)
res := re.FindAllStringSubmatch(keyName, -1)

if len(res) != 1 {
return fmt.Errorf("Invalid key name")
}

sa.T, err = time.Parse("20060102", res[0][1])

n, err := fmt.Sscanf(res[0][3], "%d", &sa.SortValue)
if n != 1 {
return fmt.Errorf("Failed to parse sort value")
key, err := DecodeArchiveKey(keyName)
if err != nil {
return
}
sa.T = key.Time
sa.StreamName = key.Stream
sa.ClientName = key.Client
return
}

nameParts := strings.Split(res[0][2], "-")
if len(nameParts) != 2 {
return fmt.Errorf("Failure parsing stream name: %v", res[0][2])
}
sa.StreamName = nameParts[0]
sa.ClientName = nameParts[1]
// Read the stream metadata associated with this store archive instance
func (sa *StoreArchive) GetStreamMetadata() (result *StreamMetadata, err error) {
result, err = ReadStreamMetadata(sa.s3Svc, sa.Bucket, sa.Key)

return
}

// NewStoreArchive returns a StoreArchive instance
func NewStoreArchive(bucketName, keyName string, svc S3Service) (sa StoreArchive, err error) {
sa.Bucket = bucketName
sa.Key = keyName
sa.s3Svc = svc

err = sa.parseKeyName(keyName)
if err != nil {
return sa, err
return
}

return sa, nil
return
}
82 changes: 82 additions & 0 deletions triton/archive_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package triton

import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
)

// ArchiveKey is a struct representing the path value for the Triton S3 keys
type ArchiveKey struct {
Client string
Stream string
Time time.Time
}

// Path encodes the ArchiveKey to a string path
func (a ArchiveKey) Path() string {
return fmt.Sprintf("%04d%02d%02d/%s-%d.tri", a.Time.Year(), a.Time.Month(), a.Time.Day(), a.fullStreamName(), a.Time.Unix())
}

const (
metadataSuffix = ".metadata"
)

// MetadataPath encodes the ArchiveKey to a string path with the metadata suffix applied
func (a ArchiveKey) MetadataPath() string {
return a.Path() + metadataSuffix
}

// fullStreamName returns the full stream name (stream + "-" + client) if there is a client name or just stream
func (a ArchiveKey) fullStreamName() (stream string) {
stream = a.Stream
if a.Client != "" {
stream += "-" + a.Client
}
return
}

// PathPrefix returns the string key prefix without the timestamp
func (a ArchiveKey) PathPrefix() string {
return fmt.Sprintf("%04d%02d%02d/%s-", a.Time.Year(), a.Time.Month(), a.Time.Day(), a.fullStreamName())
}

func (a ArchiveKey) Equal(other ArchiveKey) (result bool) {
if a.Stream != other.Stream {
return false
}
if a.Time.Truncate(time.Second) != other.Time.Truncate(time.Second) {
return false
}
if a.Client != other.Client {
return false
}
return true
}

var archiveKeyPattern = regexp.MustCompile(`^/?(?P<day>\d{8})\/(?P<stream>.+)\-(?P<ts>\d+)\.tri$`)

// Decode an archive S3 key into an ArchiveKey
func DecodeArchiveKey(keyName string) (a ArchiveKey, err error) {
res := archiveKeyPattern.FindStringSubmatch(keyName)
if res == nil {
err = fmt.Errorf("Invalid key name")
return
}
ts, err := strconv.ParseInt(res[3], 10, 64)
if err != nil {
err = fmt.Errorf("Failed to parse timestamp value: %s", err.Error())
return
}
a.Time = time.Unix(ts, 0)
nameParts := strings.Split(res[2], "-")
if len(nameParts) != 2 {
err = fmt.Errorf("Failure parsing stream name: %v", res[2])
return
}
a.Stream = nameParts[0]
a.Client = nameParts[1]
return
}
20 changes: 20 additions & 0 deletions triton/archive_key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package triton

import (
"testing"
"time"
)

func TestArchiveKeyPathCodec(t *testing.T) {
aTime := time.Now()
archiveKey := ArchiveKey{Time: aTime, Stream: "a", Client: "b"}
archiveKey2, err := DecodeArchiveKey(archiveKey.Path())

if err != nil {
t.Fatalf("unexpected error: %s", err.Error())
}
if !archiveKey.Equal(archiveKey2) {
t.Fatalf("expecting %+v == %+v", archiveKey, archiveKey2)
}

}
100 changes: 100 additions & 0 deletions triton/archive_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package triton

import (
"bytes"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"io"
"io/ioutil"
"sort"
"strings"
"time"
)

// ArchiveRepository manages reading and writing Archives
type ArchiveRepository struct {
s3Service S3Service
s3Uploader S3UploaderService
stream string
bucket string
client string
}

func NewArchiveRepository(s3Service S3Service, s3Uploader S3UploaderService, bucket string, stream string, client string) *ArchiveRepository {
return &ArchiveRepository{
s3Service: s3Service,
s3Uploader: s3Uploader,
bucket: bucket,
stream: stream,
client: client,
}
}

// Upload the archive for a stream at Time t
func (ar *ArchiveRepository) Upload(t time.Time, contents io.ReadCloser, metadata *StreamMetadata) (err error) {
archiveKey := ArchiveKey{Stream: ar.stream, Time: t, Client: ar.client}
_, err = ar.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(ar.bucket),
Key: aws.String(archiveKey.Path()),
Body: contents,
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
return fmt.Errorf("Failed to upload: %v (%v)", awsErr.Code(), awsErr.Message())
}
return
}
var buf bytes.Buffer
err = json.NewEncoder(&buf).Encode(metadata)
if err != nil {
return
}
_, err = ar.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(ar.bucket),
Key: aws.String(archiveKey.MetadataPath()),
Body: ioutil.NopCloser(&buf),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
return fmt.Errorf("Failed to upload metadata: %v (%v)", awsErr.Code(), awsErr.Message())
}
return
}
return
}

// ArchivesAtDate lists all the archives for a stream stored at a UTC date represented by aDate
func (ar *ArchiveRepository) ArchivesAtDate(aDate time.Time) (result []StoreArchive, err error) {
keyPrefix := ArchiveKey{Time: aDate, Stream: ar.stream, Client: ar.client}.PathPrefix()
keys := []string{}
err = ar.s3Service.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(ar.bucket),
Prefix: aws.String(keyPrefix),
}, func(output *s3.ListObjectsOutput, lastPage bool) (shouldContinue bool) {
for _, object := range output.Contents {
keys = append(keys, *object.Key)
}
return true
})
if err != nil {
return
}
sort.Sort(sort.StringSlice(keys))
for _, key := range keys {
if strings.HasSuffix(key, metadataSuffix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, but maybe you should just attempt to create archives for every key it finds, and let the DecodeArchiveKey figure out if it's a valid key to use or not.

Seems like it would be safer to allow unrecognizable keys to exist for future backwards compatibility reasons too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable

On Fri, Dec 18, 2015 at 3:05 PM, Rhett Garber [email protected]
wrote:

In triton/archive_repository.go
#5 (comment):

  • keys := []string{}
  • err = ar.s3Service.ListObjectsPages(&s3.ListObjectsInput{
  •   Bucket: aws.String(ar.bucket),
    
  •   Prefix: aws.String(keyPrefix),
    
  • }, func(output *s3.ListObjectsOutput, lastPage bool) (shouldContinue bool) {
  •   for _, object := range output.Contents {
    
  •       keys = append(keys, *object.Key)
    
  •   }
    
  •   return true
    
  • })
  • if err != nil {
  •   return
    
  • }
  • sort.Sort(sort.StringSlice(keys))
  • for _, key := range keys {
  •   if strings.HasSuffix(key, metadataSuffix) {
    

Just a thought, but maybe you should just attempt to create archives for
every key it finds, and let the DecodeArchiveKey figure out if it's a valid
key to use or not.

Seems like it would be safer to allow unrecognizable keys to exist for
future backwards compatibility reasons too?


Reply to this email directly or view it on GitHub
https://github.com/postmates/go-triton/pull/5/files#r48064339.

continue
}
var sa StoreArchive
sa, err = NewStoreArchive(ar.bucket, key, ar.s3Service)
if err != nil {
err = fmt.Errorf("failed to create store archive for %q: %s", key, err)
return
}
result = append(result, sa)
}
return
}
38 changes: 18 additions & 20 deletions triton/archive_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package triton

import (
"fmt"
"io"
"testing"
"time"
)

var (
streamTime = time.Date(2015, time.August, 1, 0, 0, 0, 0, time.UTC)
streamKeyPath = fmt.Sprintf("%04d%02d%02d/test_stream-archive-%d.tri", streamTime.Year(), streamTime.Month(), streamTime.Day(), streamTime.Unix())
)

func TestNewArchive(t *testing.T) {
key := "20150801/test_stream-archive-123455.tri"
sa, err := NewStoreArchive("foo", key, nil)
sa, err := NewStoreArchive("foo", streamKeyPath, nil)
if err != nil {
t.Fatal("Error creating sa", err)
t.Fatal("Error creating sa:", err.Error())
}

if sa.Key != key {
if sa.Key != streamKeyPath {
t.Error("Failed to store key")
}

Expand All @@ -24,45 +29,38 @@ func TestNewArchive(t *testing.T) {
if sa.ClientName != "archive" {
t.Error("Should have a client name")
}

if sa.T != time.Date(2015, time.August, 1, 0, 0, 0, 0, time.UTC) {
t.Error("StreamName mismatch", sa.StreamName)
if !sa.T.Equal(streamTime) {
t.Errorf("Stream time mismatch: %s != %s", sa.T, streamTime)
}

if sa.Bucket != "foo" {
t.Error("bucket name mismatch")
t.Error("bucket name mismatch, %s != %s", sa.Bucket, "foo")
}

if sa.SortValue != 123455 {
t.Error("Sort value mismatch")
}
}

func TestNewArchiveShard(t *testing.T) {
sa, err := NewStoreArchive("foo", "20150801/test_stream-store_test-123455.tri", nil)
sa, err := NewStoreArchive("foo", streamKeyPath, nil)
if err != nil {
t.Fatal("Error creating sa", err)
t.Fatalf("Error creating sa", err)
}

if sa.StreamName != "test_stream" {
t.Error("StreamName mismatch", sa.StreamName)
}

if sa.ClientName != "store_test" {
if sa.ClientName != "archive" {
t.Error("Should have a client name")
}

if sa.T != time.Date(2015, time.August, 1, 0, 0, 0, 0, time.UTC) {
t.Error("StreamName mismatch", sa.StreamName)
if !sa.T.Equal(streamTime) {
t.Errorf("StreamName mismatch %s != %s", sa.T, streamTime)
}

if sa.SortValue != 123455 {
t.Error("Sort value mismatch")
}
}

func TestReadEmpty(t *testing.T) {
sa, err := NewStoreArchive("foo", "20150801/test_stream-store_test-123455.tri", &nullS3Service{})
sa, err := NewStoreArchive("foo", streamKeyPath, &nullS3Service{})
if err != nil {
t.Fatal("Error creating sa", err)
}
Expand Down
6 changes: 6 additions & 0 deletions triton/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type KinesisService interface {
type S3Service interface {
GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error)
ListObjectsPages(*s3.ListObjectsInput, func(*s3.ListObjectsOutput, bool) bool) error
}

type S3UploaderService interface {
Expand All @@ -45,3 +46,8 @@ func (s *nullS3Service) ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput
loo := &s3.ListObjectsOutput{}
return loo, nil
}

func (s *nullS3Service) ListObjectsPages(input *s3.ListObjectsInput, f func(*s3.ListObjectsOutput, bool) bool) error {
f(&s3.ListObjectsOutput{}, true)
return nil
}
Loading