HDFS
This engine provides integration with the Apache Hadoop ecosystem by allowing to manage data on HDFS via ClickHouse. This engine is similar to the File and URL engines, but provides Hadoop-specific features.
This feature is not supported by ClickHouse engineers, and it is known to have a sketchy quality. In case of any problems, fix them yourself and submit a pull request.
Usage
ENGINE = HDFS(URI, format)
Engine Parameters
URI
- whole file URI in HDFS. The path part ofURI
may contain globs. In this case the table would be readonly.format
- specifies one of the available file formats. To performSELECT
queries, the format must be supported for input, and to performINSERT
queries – for output. The available formats are listed in the Formats section.- [PARTITION BY expr]
PARTITION BY
PARTITION BY
— Optional. In most cases you don't need a partition key, and if it is needed you generally don't need a partition key more granular than by month. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead, make client identifier or name the first column in the ORDER BY expression).
For partitioning by month, use the toYYYYMM(date_column)
expression, where date_column
is a column with a date of the type Date. The partition names here have the "YYYYMM"
format.
Example:
1. Set up the hdfs_engine_table
table:
CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')
2. Fill file:
INSERT INTO hdfs_engine_table VALUES ('one', 1), ('two', 2), ('three', 3)
3. Query the data:
SELECT * FROM hdfs_engine_table LIMIT 2
┌─name─┬─value─┐
│ one │ 1 │
│ two │ 2 │
└──────┴───────┘
Implementation Details
Reads and writes can be parallel.
Not supported:
ALTER
andSELECT...SAMPLE
operations.- Indexes.
- Zero-copy replication is possible, but not recommended.
Zero-copy replication is not ready for productionZero-copy replication is disabled by default in ClickHouse version 22.8 and higher. This feature is not recommended for production use.
Globs in path
Multiple path components can have globs. For being processed file should exists and matches to the whole path pattern. Listing of files determines during SELECT
(not at CREATE
moment).
*
— Substitutes any number of any characters except/
including empty string.?
— Substitutes any single character.{some_string,another_string,yet_another_one}
— Substitutes any of strings'some_string', 'another_string', 'yet_another_one'
.{N..M}
— Substitutes any number in range from N to M including both borders.
Constructions with {}
are similar to the remote table function.
Example
Suppose we have several files in TSV format with the following URIs on HDFS:
- 'hdfs://hdfs1:9000/some_dir/some_file_1'
- 'hdfs://hdfs1:9000/some_dir/some_file_2'
- 'hdfs://hdfs1:9000/some_dir/some_file_3'
- 'hdfs://hdfs1:9000/another_dir/some_file_1'
- 'hdfs://hdfs1:9000/another_dir/some_file_2'
- 'hdfs://hdfs1:9000/another_dir/some_file_3'
There are several ways to make a table consisting of all six files:
CREATE TABLE table_with_range (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/some_file_{1..3}', 'TSV')
Another way:
CREATE TABLE table_with_question_mark (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/some_file_?', 'TSV')
Table consists of all the files in both directories (all files should satisfy format and schema described in query):
CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/*', 'TSV')
If the listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use ?
.
Example
Create table with files named file000
, file001
, ... , file999
:
CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
Configuration
Similar to GraphiteMergeTree, the HDFS engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (hdfs
) and user-level (hdfs_*
). The global configuration is applied first, and then the user-level configuration is applied (if it exists).
<!-- Global configuration options for HDFS engine type -->
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>clickuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<!-- Configuration specific for user "root" -->
<hdfs_root>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_root>
Configuration Options
Supported by libhdfs3
parameter | default value |
---|---|
rpc_client_connect_tcpnodelay | true |
dfs_client_read_shortcircuit | true |
output_replace-datanode-on-failure | true |
input_notretry-another-node | false |
input_localread_mappedfile | true |
dfs_client_use_legacy_blockreader_local | false |
rpc_client_ping_interval | 10 * 1000 |
rpc_client_connect_timeout | 600 * 1000 |
rpc_client_read_timeout | 3600 * 1000 |
rpc_client_write_timeout | 3600 * 1000 |
rpc_client_socket_linger_timeout | -1 |
rpc_client_connect_retry | 10 |
rpc_client_timeout | 3600 * 1000 |
dfs_default_replica | 3 |
input_connect_timeout | 600 * 1000 |
input_read_timeout | 3600 * 1000 |
input_write_timeout | 3600 * 1000 |
input_localread_default_buffersize | 1 1024 1024 |
dfs_prefetchsize | 10 |
input_read_getblockinfo_retry | 3 |
input_localread_blockinfo_cachesize | 1000 |
input_read_max_retry | 60 |
output_default_chunksize | 512 |
output_default_packetsize | 64 * 1024 |
output_default_write_retry | 10 |
output_connect_timeout | 600 * 1000 |
output_read_timeout | 3600 * 1000 |
output_write_timeout | 3600 * 1000 |
output_close_timeout | 3600 * 1000 |
output_packetpool_size | 1024 |
output_heartbeat_interval | 10 * 1000 |
dfs_client_failover_max_attempts | 15 |
dfs_client_read_shortcircuit_streams_cache_size | 256 |
dfs_client_socketcache_expiryMsec | 3000 |
dfs_client_socketcache_capacity | 16 |
dfs_default_blocksize | 64 1024 1024 |
dfs_default_uri | "hdfs://localhost:9000" |
hadoop_security_authentication | "simple" |
hadoop_security_kerberos_ticket_cache_path | "" |
dfs_client_log_severity | "INFO" |
dfs_domain_socket_path | "" |
HDFS Configuration Reference might explain some parameters.
ClickHouse extras
parameter | default value |
---|---|
hadoop_kerberos_keytab | "" |
hadoop_kerberos_principal | "" |
libhdfs3_conf | "" |
Limitations
hadoop_security_kerberos_ticket_cache_path
andlibhdfs3_conf
can be global only, not user specific
Kerberos support
If the hadoop_security_authentication
parameter has the value kerberos
, ClickHouse authenticates via Kerberos.
Parameters are here and hadoop_security_kerberos_ticket_cache_path
may be of help.
Note that due to libhdfs3 limitations only old-fashioned approach is supported,
datanode communications are not secured by SASL (HADOOP_SECURE_DN_USER
is a reliable indicator of such
security approach). Use tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh
for reference.
If hadoop_kerberos_keytab
, hadoop_kerberos_principal
or hadoop_security_kerberos_ticket_cache_path
are specified, Kerberos authentication will be used. hadoop_kerberos_keytab
and hadoop_kerberos_principal
are mandatory in this case.
HDFS Namenode HA support
libhdfs3 support HDFS namenode HA.
- Copy
hdfs-site.xml
from an HDFS node to/etc/clickhouse-server/
. - Add following piece to ClickHouse config file:
<hdfs>
<libhdfs3_conf>/etc/clickhouse-server/hdfs-site.xml</libhdfs3_conf>
</hdfs>
- Then use
dfs.nameservices
tag value ofhdfs-site.xml
as the namenode address in the HDFS URI. For example, replacehdfs://appadmin@192.168.101.11:8020/abc/
withhdfs://appadmin@my_nameservice/abc/
.
Virtual Columns
_path
— Path to the file. Type:LowCardinalty(String)
._file
— Name of the file. Type:LowCardinalty(String)
._size
— Size of the file in bytes. Type:Nullable(UInt64)
. If the size is unknown, the value isNULL
._time
— Last modified time of the file. Type:Nullable(DateTime)
. If the time is unknown, the value isNULL
.
Storage Settings
- hdfs_truncate_on_insert - allows to truncate file before insert into it. Disabled by default.
- hdfs_create_new_file_on_insert - allows to create a new file on each insert if format has suffix. Disabled by default.
- hdfs_skip_empty_files - allows to skip empty files while reading. Disabled by default.
See Also