September 27, 2023

This information warehousing use case is about scale. The person is China Unicom, one of many world’s largest telecommunication service suppliers. Utilizing Apache Doris, they deploy a number of petabyte-scale clusters on dozens of machines to help their 15 billion each day log additions from their over 30 enterprise strains. Such a huge log evaluation system is a part of their cybersecurity administration. For the necessity of real-time monitoring, menace tracing, and alerting, they require a log analytic system that may mechanically accumulate, retailer, analyze, and visualize logs and occasion data.

From an architectural perspective, the system ought to be capable to undertake real-time evaluation of varied codecs of logs, and naturally, be scalable to help the massive and ever-enlarging information dimension. The remainder of this text is about what their log processing structure seems like and the way they understand steady information ingestion, low-cost storage, and fast queries with it.

System Structure

That is an outline of their information pipeline. The logs are collected into the information warehouse, and undergo a number of layers of processing.

  • ODS: Authentic logs and alerts from all sources are gathered into Apache Kafka. In the meantime, a duplicate of them might be saved in HDFS for information verification or replay.
  • DWD: That is the place the actual fact tables are. Apache Flink cleans, standardizes, backfills, and de-identifies the information, and write it again to Kafka. These truth tables will even be put into Apache Doris, in order that Doris can hint a sure merchandise or use them for dashboarding and reporting. As logs should not averse to duplication, the actual fact tables might be organized within the Duplicate Key model of Apache Doris.  
  • DWS: This layer aggregates information from DWD and lays the muse for queries and evaluation.
  • ADS: On this layer, Apache Doris auto-aggregates information with its Mixture Key mannequin, and auto-updates information with its Distinctive Key mannequin. 

Structure 2.0 evolves from Structure 1.0, which is supported by ClickHouse and Apache Hive. The transition arised from the person’s wants for real-time information processing and multi-table be part of queries. Of their expertise with ClickHouse, they discovered insufficient help for concurrency and multi-table joins, manifested by frequent timeouts in dashboarding and OOM errors in distributed joins.

Now let’s check out their observe in information ingestion, storage, and queries with Structure 2.0.

Actual-Case Follow

Secure Ingestion of 15 Billion Logs Per Day

Within the person’s case, their enterprise churns out 15 billion logs every single day. Ingesting such information quantity rapidly and stably is an actual downside. With Apache Doris, the really useful method is to make use of the Flink-Doris-Connector. It’s developed by the Apache Doris group for large-scale information writing. The part requires easy configuration. It implements Stream Load and may attain a writing pace of 200,000~300,000 logs per second, with out interrupting the information analytic workloads.

A lesson realized is that when utilizing Flink for high-frequency writing, you have to discover the correct parameter configuration in your case to keep away from information model accumulation. On this case, the person made the next optimizations:

  • Flink Checkpoint: They improve the checkpoint interval from 15s to 60s to cut back writing frequency and the variety of transactions processed by Doris per unit of time. This may relieve information writing strain and keep away from producing too many information variations.
  • Information Pre-Aggregation: For information of the identical ID however comes from varied tables, Flink will pre-aggregate it primarily based on the first key ID and create a flat desk, so as to keep away from extreme useful resource consumption brought on by multi-source information writing.
  • Doris Compaction: The trick right here consists of discovering the correct Doris backend (BE) parameters to allocate the correct amount of CPU sources for information compaction, setting the suitable variety of information partitions, buckets, and replicas (an excessive amount of information tablets will carry large overheads), and dialing up max_tablet_version_num to keep away from model accumulation.

These measures collectively guarantee each day ingestion stability. The person has witnessed steady efficiency and low compaction rating in Doris backend. As well as, the mix of information pre-processing in Flink and the Unique Key model in Doris can guarantee faster information updates.

Storage Methods to Scale back Prices by 50%

The dimensions and era charge of logs additionally impose strain on storage. Among the many immense log information, solely part of it’s of excessive informational worth, so storage must be differentiated. The person has three storage methods to cut back prices. 

  • ZSTD (ZStandard) compression algorithm: For tables bigger than 1TB, specify the compression technique as “ZSTD” upon desk creation, it can understand a compression ratio of 10:1. 
  • Tiered storage of cold and hot information: That is supported by a new feature of Doris. The person units an information “cooldown” interval of seven days. Which means information from the previous 7 days (specifically, scorching information) might be saved in SSD. As time goes by, scorching information “cools down” (getting older than 7 days), it will likely be mechanically moved to HDD, which is inexpensive. As information will get even “colder,” it will likely be moved to object storage for a lot decrease storage prices. Plus, in object storage, information might be saved with just one copy as an alternative of three. This additional cuts down prices and the overheads introduced by redundant storage. 
  • Differentiated reproduction numbers for various information partitions: The person has partitioned their information by time vary. The precept is to have extra replicas for newer information partitions and fewer for the older ones. Of their case, information from the previous 3 months is steadily accessed, in order that they have 2 replicas for this partition. Information that’s 3~6 months previous has two replicas, and information from 6 months in the past has one single copy.

With these three methods, the person has decreased their storage prices by 50%.

Differentiated Question Methods Based mostly on Information Measurement

Some logs should be instantly traced and situated, equivalent to these of irregular occasions or failures. To make sure real-time response to those queries, the person has completely different question methods for various information sizes:

  • Lower than 100G: The person makes use of the dynamic partitioning function of Doris. Small tables might be partitioned by date and enormous tables might be partitioned by hour. This may keep away from information skew. To additional guarantee stability of information inside a partition, they use the snowflake ID because the bucketing subject. Additionally they set a beginning offset of 20 days, which implies information of the latest 20 days might be saved. On this method, they discover the stability level between information backlog and analytic wants.
  • 100G~1T: These tables have their materialized views, that are the pre-computed outcome units saved in Doris. Thus, queries on these tables might be a lot sooner and fewer resource-consuming. The DDL syntax of materialized views in Doris is similar as these in PostgreSQL and Oracle.
  • Greater than 100T: These tables are put into the Mixture Key mannequin of Apache Doris and pre-aggregate them. On this method, we allow queries of two billion log data to be accomplished in 1~2s.

These methods have shortened the response time of queries. For instance, a question of a selected information merchandise used to take minutes, however now it may be completed in milliseconds. As well as, for giant tables that include 10 billion information data, queries on completely different dimensions can all be accomplished in a number of seconds.

Ongoing Plans

The person is now testing with the newly added inverted index in Apache Doris. It’s designed to hurry up full-text search of strings in addition to equivalence and vary queries of numerics and datetime. They’ve additionally supplied their worthwhile suggestions concerning the auto-bucketing logic in Doris: At the moment, Doris decides the variety of buckets for a partition  primarily based on the information dimension of the earlier partition. The issue for the person is, most of their new information is available in throughout daytime, however little at nights. So of their case, Doris creates too many buckets for evening information however too few in daylight, which is the other of what they want. They hope so as to add a brand new auto-bucketing logic, the place the reference for Doris to resolve the variety of buckets is the information dimension and distribution of the day past. They’ve come to the Apache Doris group and we are actually engaged on this optimization.