-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SUPPORT]How to improve the speed of Flink writing to hudi ? #8071
Comments
Thanks for the feedback, I see that you use the hashing index with MOR table type, does increasing the parallelism can improve the writing throughput here? How about we increase the |
Yes,we try to increase the |
Which step is the boottleneck, bucket stream write? |
We are currently looking for the root cause, and the current guess is hudi bucket write. Because we are currently using the flink filesystem method to directly write parquet files to cos storage, the writing efficiency is very high, and the source is the same as hudi. The source parallelism is the same as the sink parallelism, which is 24. |
Does that mean you have the filesystem source? |
Yes, my colleague wrote the code for the filesystem of flink job . |
Before we wrote about But we can't determine which step in hudi takes a long time to cause? |
The fields SE/DE really takes time, especially for string type fields which has long strings. |
I understand that the 3,600 fields in our hudi table are basically double types. For such a relatively large wide table, is there any optimization plan to improve the write throughput? |
Do you wanna update? If not, you can just set the operation type as |
No,We have tried the insert mode, combined with the mor and cow table formats, but the write throughput still cannot be improved. |
@DavidZ1 If you are running it under append-only Snappy should be supported out of the box, but you will be trading off storage space for write throughput. I would recommend LZ4 if you have the appropriate bundles included in your environment. |
Thx, we use the flink jar method to write to hudi, but I can't find the relevant setting data compression format parameters in |
You are right that there are no Flink configs for it. What i did in the past was configure it using: Reference: |
Thx, We switched the encoding format and started to use the lz4 format to compress, but found that the platform does not support it, and the exception is as follows: Then we switched to the snappy format, and the writing performance did improve to a certain extent. However, due to the Tencent Cloud COS we used for storage, there was a list frequency control problem in cow writing, so the overall performance could not be greatly improved,and the exception is as follows:
|
Sorry for late reply, did you already use the append and it is still slow? |
Yeap, judging from the stack trace, he is running his job under append only mode.
This feels like a COS issue. @DavidZ1 you mentioned Perhaps, you could try increasing this so that lesser parquet files are written? Do note that your parquet sizes will get larger, and not sure how this will affect your read performance too. Given that this is an append only job, there should be no performance penalties incurred on write performance. |
Yes, Tencent cos currently has the problem of reading file list frequency control. Our business is still streaming signal type data transmitted from vehicles, so there will be a problem of delayed data storage and delayed reporting, resulting in. Of course, we add data processing logic for delayed reporting. The delayed reported data of the day will be written to the corresponding partition, and the data of the non-day will be written to a separate delayed partition of the day. I don't know if this delayed reporting of data processing will increase the pressure on file processing, causing the cow writing process to take more time? In addition, the bucket index and bucket number I use are set to 40, and I only keep the latest version, so there should be a maximum of 80 files in one hour partition, and the total number of files in one day's partition is 1920. In this case, I shouldn't have too many small files created immediately. Also ask a question, I see the default cos of Hudi source code (https://github.com/apache/hudi/blob/release-0.13.0/hudi-common/src/main/java/org/apache/hudi/common/ fs/StorageSchemes.java) does not support the append mode, but the cow format does support the append mode, so can it be simply understood that the implementation methods of Hudi mor and cow append are different? |
What version of Hudi did you use, previously we resolved a bug for append mode partial failover: https://github.com/apache/hudi/pull/7208/files, The phenomenon is that when partial failover happens, the task would hang out there until the timeout happens. The bug was not resolved until release 0.12.2. |
We are using is 0.13.0 for version of Hudi . |
Is there any preceding error throws out before the |
I checked the tasks running log, but did not see any other abnormal information. |
What about your avg checkpoint end-to-end time? |
|
You timeout threshold is 10min, it should be fine, why it task 5min to ckp, the append mode only flush new Parquets. |
Isn't it normal for streaming data to ck for 5 minutes? Any better optimization suggestions for ck? The bucket index I use theoretically only creates 50 files each time. I don't quite understand what you said about |
Thanks, for COW table with insert operation, Flink does not use any index, so the bucket index does not work here, the write throughput should be high, and for UPSERTs with bucket index, if you use the COW, yes, the performance is bad because the whole table/partition is almot rewritten each ckp. |
Thanks ,we use |
YES, we have adjusted the writing mode of the table, whether it is MOR or COW table, we have tried insert/upsert mode, but the overall performance cannot be improved. |
Maybe you can just take a reference of this doc: https://www.yuque.com/yuzhao-my9fz/kb/flqll8?#cJu7y |
Thx,I will read later. |
We're having a similar issue with write performance. The Hudi stream_write task takes between 8 and 10 minutes for a MoR table and between 9 and 11 minutes for a CoW table to write 600K records. This performance is slow even for when our S3 destination is completely empty and the corresponding Glue Catalog table doesn't exist. I'm including the details of our testing below: Test parameters:
CoW TestingCoW results (sample):
CoW options being set on HoodiePipeline.Builder:
CoW DAGMoR TestingMoR results (sample):
MoR options being set on HoodiePipeline.Builder:
MoR DAG |
The throughput would be optimized if you use the bucket index instead, BTW, what kind of state-backend did you use, did you enable the incremental checkpointing? |
Thanks for your response. Here are our properties for state-backend (from flink-conf on EMR), and as you can see, incremental checkpointing is enabled:
Do you see any potential problems with these configs? In relation to using the bucket index, are these the correct properties to try? I will do some testing with the defaults, but if you have any suggestions for improving these, please let me know.
Thanks in advance. |
@danny0405 : to update, I did try the bucket index, and the performance is still in the 9-10 minute range. Here are the timings for the tasks:
Here are the Hudi properties submitted to the sink builder:
Also, the metadata on the parquet files written by Hudi still has hoodie_bloom_filter_type_code=DYNAMIC_V0 and org.apache.hudi.bloomfilter in it, like it's still using BLOOM index. Is this expected? |
Guess you are a chinese, here is a chinese doc for bucket index: https://www.yuque.com/yuzhao-my9fz/kb/flqll8?#g3rZq |
Thanks for the link, Danny. I don't speak Chinese, so I'm not sure what most of it says. I will try to apply auto-translate to that page. In the meantime, do you have anything similar that's in English? |
Sorry, let me illustrate the options here, basically there are about 2 options that you need to take care for bucket index:
We also have a page for Flink in english on the hudi website: https://hudi.apache.org/docs/hoodie_deltastreamer#bucket-index |
Thank you, Danny. Yes, I got the best performance by using BUCKET index with non-nested, numeric, non-hive partitions. We were actually able to get it to perform in under 3 minutes for the stream_write task when we picked a partitioning field that resulted in fewer partitions (14 partitions) compared to our previous tests (> 90 partitions). It seems the number of partitions Hudi has to manage has a very large impact on performance. Would you be able to share any documents or blog posts that explain partitioning for performance further? I read through most of the documents under Concepts on the Hudi website (e.g. https://hudi.apache.org/docs/next/indexing), but didn't find a lot dealing in depth with partitioning strategies. |
For partitioning strategies, are you referring to the data buckets (or FileGroup in Hudi's notion) or just regular directories like Hive, if it is former, here is the doc about Hudi file layput: https://hudi.apache.org/docs/file_layouts. For cow table, each update of the existing file group would trigger a re-write of the whole parquet file, so there is somehow large write amplification. For mor table, Hudi just append the new records first, and the data compation task is defered as an async execution. |
@danny0405 |
|
Tips before filing an issue
Have you gone through our FAQs?
Join the mailing list to engage in conversations and get faster support at [email protected].
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
We currently have a car cloud business that consumes in real time through flink tasks and writes it into hudi. The source is kafka, and the messages of json in kafka are parsed. There are about 3600 fields for hudi table, 90% of which are of double type. However, our test found that flink writes to hudi at a faster speed It is relatively slow and cannot keep up with the speed of Kafka message production. We can't find the reason at present?
A clear and concise description of the problem.
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
Hudi version :0.12.2 and 0.13.0
Spark version : 3.2.2
Hive version : 3.2.1
Hadoop version : 3.2.2
Storage (HDFS/S3/GCS..) : COS (tencent cloud )
Running on Docker? (yes/no) : yes
Additional context
1.Hudi config
checkpoint.interval=300 checkpoint.timeout=600 compaction.max_memory=1024 payload.class.name=org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload compaction.delta_commits=20 compaction.trigger.strategy=num_or_time compaction.delta_seconds=3600 clean.policy=KEEP_LATEST_COMMITS clean.retain_commits=2 hoodie.bucket.index.num.buckets=40 archive.max_commits=50 archive.min_commits=40 table.type=MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true index.type=BUCKET write.operation=upsert compaction.schedule.enabled=true compaction.async.enabled=true
2.hoodie.properties
hoodie.table.precombine.field=acquire_timestamp hoodie.datasource.write.drop.partition.columns=false hoodie.table.partition.fields=pt,ht hoodie.table.type=MERGE_ON_READ hoodie.archivelog.folder=archived hoodie.table.cdc.enabled=false hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload hoodie.table.version=5 hoodie.timeline.layout.version=1 hoodie.table.recordkey.fields=vin,acquire_timestamp hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.name=ods_icv_can_hudi_temp hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5 hoodie.datasource.write.hive_style_partitioning=true
2.kafka
24 partitions, 200G messages per hour, each message is a JSON format, flink obtains about 3600 signal field data (double) from the JSON message
3.flink
We used 20 tasks (each task 2 core and 8gb memory) or 48 tasks (each task 1 core and 4gb memory) for the flink task. After running for an hour, we found that the speed of consumption could not keep up with the speed of message production.
We use Tencent Cloud's streaming computing platform Oceanus: 1 computing CU includes 1 core CPU and 4GB memory. According to the difference between upstream and downstream and processing logic, the processing capacity of 1CU is about 5000 pieces/second to 50000 pieces/second. The computing performance of simple services is about 30,000 entries/second/core to 50,000 entries/second/core, and the computing performance of complex services is about 5,000 entries/second/core to 10,000 entries/second/core.
When writing cos at the same time, there will be many small files, the maximum can reach 4000+.
Add any other context about the problem here.
Stacktrace
Add the stacktrace of the error.
The text was updated successfully, but these errors were encountered: