Skip to content

Commit

Permalink
fix: fix the example module
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang733 committed Mar 20, 2022
1 parent 955fd12 commit 11263af
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,16 @@ fs.cosn.bucket.endpoint_suffix: cos.ap-guangzhou.myqcloud.com

```java
...
StreamingFileSink<String> fileSink = StreamingFileSink.forRowFormat(
new Path("cosn://flink-test-1250000000/sink-test"),
new SimpleStringEncoder<String>("UTF-8"))
.build();
StreamingFileSink<String> streamingFileSink =
StreamingFileSink.forRowFormat(
new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(5))
.withInactivityInterval(TimeUnit.SECONDS.toMillis(5))
.withMaxPartSize(1024)
.build())
.build();
...
```

Expand Down
1 change: 1 addition & 0 deletions flink-cos-fs-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/** Flink StreamingFileSink Test. */
public class StreamingFileSinkTest {
private static final Logger LOG = LoggerFactory.getLogger(StreamingFileSinkTest.class);
Expand All @@ -29,13 +31,16 @@ public static void main(String[] args) throws Exception {
return;
}
String outputPath = parameterTool.getRequired("output");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamingFileSink<String> streamingFileSink =
StreamingFileSink.forRowFormat(
new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.create().withRolloverInterval(1000).build())
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(5))
.withInactivityInterval(TimeUnit.SECONDS.toMillis(5))
.withMaxPartSize(1024)
.build())
.build();
if (parameterTool.get("input") == null) {
// Use the mockSource to generate the test data
Expand Down

0 comments on commit 11263af

Please sign in to comment.