Skip to content

Commit

Permalink
Merge pull request #2 from discord/androo/fixes
Browse files Browse the repository at this point in the history
some more fixes
  • Loading branch information
AndrooTheChen authored Sep 30, 2024
2 parents 05e939c + 44948bd commit e967d63
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 33 deletions.
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ fn main() {
"proto/vector/dd_trace.proto",
"proto/third-party/google/cloud/bigquery/storage/v1/storage.proto",
"proto/third-party/google/pubsub/v1/pubsub.proto",
"proto/third-party/google/rpc/code.proto",
"proto/third-party/google/rpc/status.proto",
"proto/vector/vector.proto",
],
Expand Down
8 changes: 4 additions & 4 deletions src/sinks/gcp/bigquery/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use codecs::encoding::ProtobufSerializerConfig;
use vector_lib::codecs::encoding::ProtobufSerializerConfig;
use futures::FutureExt;
use http::Uri;
use indoc::indoc;
use tonic::transport::Channel;
use vector_config::configurable_component;
use vector_lib::configurable::configurable_component;

use super::proto::google::cloud::bigquery::storage::v1 as proto;
use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
use super::request_builder::{BigqueryRequestBuilder, MAX_BATCH_PAYLOAD_SIZE};
use super::service::{AuthInterceptor, BigqueryService};
use super::sink::BigquerySink;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct BigqueryConfig {
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
}
Expand Down
20 changes: 11 additions & 9 deletions src/sinks/gcp/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ mod sink;

#[allow(warnings, clippy::pedantic, clippy::nursery)]
pub(crate) mod proto {
pub(crate) mod google {
pub(crate) mod cloud {
pub(crate) mod bigquery {
pub(crate) mod storage {
pub(crate) mod v1 {
tonic::include_proto!("google.cloud.bigquery.storage.v1");
pub(crate) mod third_party {
pub(crate) mod google {
pub(crate) mod cloud {
pub(crate) mod bigquery {
pub(crate) mod storage {
pub(crate) mod v1 {
tonic::include_proto!("google.cloud.bigquery.storage.v1");
}
}
}
}
}
pub(crate) mod rpc {
tonic::include_proto!("google.rpc");
pub(crate) mod rpc {
tonic::include_proto!("google.rpc");
}
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/gcp/bigquery/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use bytes::BytesMut;
use codecs::encoding::ProtobufSerializer;
use vector_lib::codecs::encoding::ProtobufSerializer;
use prost::Message;
use std::num::NonZeroUsize;
use tokio_util::codec::Encoder;
use vector_common::request_metadata::RequestMetadata;
use vector_core::event::Finalizable;
use vector_lib::request_metadata::RequestMetadata;
use vector_lib::event::Finalizable;

use super::proto::google::cloud::bigquery::storage::v1 as proto;
use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
use super::service::BigqueryRequest;
use crate::event::{Event, EventFinalizers};
use crate::sinks::util::metadata::RequestMetadataBuilder;
Expand All @@ -22,8 +22,8 @@ pub enum BigqueryRequestBuilderError {
ProtobufEncoding { message: String }, // `error` needs to be some concrete type
}

impl From<vector_common::Error> for BigqueryRequestBuilderError {
fn from(error: vector_common::Error) -> Self {
impl From<vector_lib::Error> for BigqueryRequestBuilderError {
fn from(error: vector_lib::Error) -> Self {
BigqueryRequestBuilderError::ProtobufEncoding {
message: format!("{:?}", error),
}
Expand Down Expand Up @@ -142,7 +142,7 @@ mod test {
use codecs::encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions};

Check failure on line 142 in src/sinks/gcp/bigquery/request_builder.rs

View workflow job for this annotation

GitHub Actions / check-component-features

failed to resolve: use of undeclared crate or module `codecs`
use std::collections::BTreeMap;
use std::path::PathBuf;
use vector_core::event::{Event, EventMetadata, LogEvent, Value};
use vector_lib::event::{Event, EventMetadata, LogEvent, Value};

use super::BigqueryRequestBuilder;
use crate::sinks::util::IncrementalRequestBuilder;
Expand Down
18 changes: 9 additions & 9 deletions src/sinks/gcp/bigquery/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use tonic::service::Interceptor;
use tonic::transport::Channel;
use tonic::{Request, Status};
use tower::Service;
use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_core::event::EventStatus;
use vector_core::stream::DriverResponse;
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::event::EventStatus;
use vector_lib::stream::DriverResponse;

use super::proto::google::cloud::bigquery::storage::v1 as proto;
use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
use crate::event::{EventFinalizers, Finalizable};
use crate::gcp::GcpAuthenticator;

Expand Down Expand Up @@ -75,13 +75,13 @@ impl DriverResponse for BigqueryResponse {
None => EventStatus::Dropped,
Some(proto::append_rows_response::Response::AppendResult(_)) => EventStatus::Delivered,
Some(proto::append_rows_response::Response::Error(status)) => {
match super::proto::google::rpc::Code::try_from(status.code) {
match super::proto::third_party::google::rpc::Code::try_from(status.code) {
// we really shouldn't be able to get here, but just in case
Ok(super::proto::google::rpc::Code::Ok) => EventStatus::Delivered,
Ok(super::proto::third_party::google::rpc::Code::Ok) => EventStatus::Delivered,
// these errors can't be retried because the event payload is almost definitely bad
Ok(super::proto::google::rpc::Code::InvalidArgument)
| Ok(super::proto::google::rpc::Code::NotFound)
| Ok(super::proto::google::rpc::Code::AlreadyExists) => EventStatus::Rejected,
Ok(super::proto::third_party::google::rpc::Code::InvalidArgument)
| Ok(super::proto::third_party::google::rpc::Code::NotFound)
| Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => EventStatus::Rejected,
// everything else can probably be retried
_ => EventStatus::Errored,
}
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/gcp/bigquery/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use futures_util::{
stream::{self, BoxStream},
StreamExt,
};
use vector_core::event::Event;
use vector_core::sink::StreamSink;
use vector_core::stream::BatcherSettings;
use vector_lib::event::Event;
use vector_lib::sink::StreamSink;
use vector_lib::stream::BatcherSettings;

use super::request_builder::BigqueryRequestBuilder;
use super::service::BigqueryService;
Expand Down
1 change: 0 additions & 1 deletion src/sinks/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize};
use vector_lib::configurable::configurable_component;

pub mod bigquery;
pub mod chronicle_unstructured;
pub mod cloud_storage;
pub mod pubsub;
pub mod stackdriver;
Expand Down

0 comments on commit e967d63

Please sign in to comment.