Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds support for reading schema from the avro file. #100

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.Analytics.UnitTest;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
Expand Down Expand Up @@ -73,6 +74,15 @@ public IRow SingleColumnRowGenerator<T>()
return new USqlRow(schema, null);
}

public IRow DualColumnRowGenerator<T, T2>()
{
var foo = new USqlColumn<T>("Value");
var bar = new USqlColumn<T2>("Value2");
var columns = new List<IColumn> { foo, bar };
var schema = new USqlSchema(columns);
return new USqlRow(schema, null);
}

[TestMethod]
public void AvroExtractor_DatatypeInt_Extracted()
{
Expand All @@ -89,6 +99,25 @@ public void AvroExtractor_DatatypeInt_Extracted()
Assert.IsTrue(result[1].Get<int>("Value") == 0);
}


[TestMethod]
public void AvroExtractor_DatatypeInt_Extracted_Using_Internal_Schema_Flag()
{
var schema = @"{""type"":""record"",""name"":""SingleColumnPoco"",""fields"":[{""name"":""Value2"",""type"":""int""},{""name"":""Value"",""type"": ""int"",""default"":""0"" }]}";
var data = new List<SingleColumnPoco<int>>
{
new SingleColumnPoco<int>() { Value = 1 },
new SingleColumnPoco<int>() { Value = 0 },
};

var result = ExecuteExtract<int, int>(data, schema, true);

Assert.IsTrue(result[0].Get<int>("Value") == 1);
Assert.IsTrue(result[0].Get<int>("Value2") == 0);
Assert.IsTrue(result[1].Get<int>("Value") == 0);
Assert.IsTrue(result[1].Get<int>("Value2") == 0);
}

[TestMethod]
public void AvroExtractor_DatatypeNullableInt_Extracted()
{
Expand Down Expand Up @@ -316,7 +345,7 @@ public void AvroExtractor_EmptyFile_ReturnNoRow()
Assert.IsTrue(result.Count == 0);
}

private IList<IRow> ExecuteExtract<T>(List<SingleColumnPoco<T>> data, string schema)
private IList<IRow> ExecuteExtract<T>(List<SingleColumnPoco<T>> data, string schema, bool autoSchemaExtract = false)
{
var output = SingleColumnRowGenerator<T>().AsUpdatable();

Expand All @@ -325,24 +354,48 @@ private IList<IRow> ExecuteExtract<T>(List<SingleColumnPoco<T>> data, string sch
serializeAvro(dataStream, data, schema);

var reader = new USqlStreamReader(dataStream);
var extractor = new AvroExtractor(schema);
var extractor = new AvroExtractor(schema, autoSchemaExtract);
return extractor.Extract(reader, output).ToList();
}
}

private IList<IRow> ExecuteExtract<T,T2>(List<SingleColumnPoco<T>> data, string schema, bool autoSchemaExtract = false)
{
var output = DualColumnRowGenerator<T, T2>().AsUpdatable();

using (var dataStream = new MemoryStream())
{
serializeAvro(dataStream, data, schema);

var reader = new USqlStreamReader(dataStream);
var extractor = new AvroExtractor(schema, autoSchemaExtract);
return extractor.Extract(reader, output).ToList();
}
}

private void serializeAvro<T>(MemoryStream dataStream, List<SingleColumnPoco<T>> data, string schema)
{
var avroSchema = Schema.Parse(schema);
var recordSchema = avroSchema as RecordSchema;

Debug.Assert(recordSchema != null, "recordSchema != null");

var writer = new GenericWriter<GenericRecord>(avroSchema);
var fileWriter = DataFileWriter<GenericRecord>.OpenWriter(writer, dataStream);
var encoder = new BinaryEncoder(dataStream);

foreach (SingleColumnPoco<T> record in data)
{
var genericRecord = new GenericRecord(avroSchema as RecordSchema);
var genericRecord = new GenericRecord(recordSchema);

genericRecord.Add("Value", record.Value);

// some tests use value2 field
if (recordSchema.Fields.Exists(x => x.Name == "Value2"))
{
genericRecord.Add("Value2", 0);
}

fileWriter.Append(genericRecord);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,47 @@ namespace Microsoft.Analytics.Samples.Formats.ApacheAvro
public class AvroExtractor : IExtractor
{
private string avroSchema;
private bool mapToInternalSchema;

public AvroExtractor(string avroSchema)
public AvroExtractor(string avroSchema, bool mapToInternalSchema = false)
{
this.avroSchema = avroSchema;
this.mapToInternalSchema = mapToInternalSchema;
}

public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
{
var avschema = Avro.Schema.Parse(avroSchema);
var reader = new GenericDatumReader<GenericRecord>(avschema, avschema);
Avro.Schema avschema = null;

if (!string.IsNullOrWhiteSpace(avroSchema))
{
avschema = Avro.Schema.Parse(avroSchema);
}

IFileReader<GenericRecord> fileReader = null;

using (var ms = new MemoryStream())
{
CreateSeekableStream(input, ms);
ms.Position = 0;

var foundSchema = false;

var fileReader = DataFileReader<GenericRecord>.OpenReader(ms, avschema);
if (mapToInternalSchema)
{
fileReader = DataFileReader<GenericRecord>.OpenReader(ms);
var schema = fileReader.GetSchema();

foundSchema = schema != null;
}

while (fileReader.HasNext())
if (!foundSchema)
{
ms.Position = 0;
fileReader = DataFileReader<GenericRecord>.OpenReader(ms, avschema);
}

while (fileReader?.HasNext() == true)
{
var avroRecord = fileReader.Next();

Expand All @@ -57,9 +79,9 @@ public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableR
{
output.Set<object>(column.Name, null);
}

yield return output.AsReadOnly();
}

yield return output.AsReadOnly();
}
}
}
Expand Down