CREATE PIPE Preview
Create a pipe object that automatically ingests files from a cloud storage location.
SyntaxCREATE PIPE [ IF NOT EXISTS ] <pipe_name>
[ DEDUPE_LOOKBACK_PERIOD <number_of_days> ]
AS COPY INTO <table_name>
FROM '@<storage_location_name>[ /<folder_name> ]'
[ FILE_FORMAT 'csv' | 'json' | 'parquet']
[ ( [csv_format_options] | [json_format_options] | [parquet_format_options] ) ]
[ DATE_FORMAT '<string>' ]
[ EMPTY_AS_NULL [ '<boolean>' ] [, ...] ]
[ ESCAPE_CHAR '<escape_character>' ]
[ EXTRACT_HEADER '<boolean>' ]
[ FIELD_DELIMITER '<character>' ]
[ NULL_IF ( '<string>' [, ...] ) ]
[ ON_ERROR 'skip_file' ]
[ QUOTE_CHAR '<character>' ]
[ RECORD_DELIMITER '<character>' ]
[ SKIP_LINES <n> ]
[ TIME_FORMAT '<string>' ]
[ TIMESTAMP_FORMAT '<string>' ]
[ TRIM_SPACE [ '<boolean>' ] ]
[ DATE_FORMAT '<string>' ]
[ EMPTY_AS_NULL [ '<boolean>' ] [, ...] ]
[ NULL_IF ( '<string>' [, ...] ) [, ...] ]
[ ON_ERROR 'skip_file' ]
[ TIME_FORMAT '<string>' ]
[ TIMESTAMP_FORMAT '<string>' ]
[ TRIM_SPACE [ '<boolean>' ] ]
[ ON_ERROR 'skip_file' ]
Parameters
[ IF NOT EXISTS ] String Optional
If you include this clause, the command runs regardless of whether the pipe exists and you receive a summary indicating whether the pipe could be created. If this clause is not specified, the command fails if the pipe to be created already exists.
<pipe_name> String
The unique name of the autoingest pipe that you are creating.
DEDUPE_LOOKBACK_PERIOD <number_of_days> Integer Optional
The number of days that Dremio should look back when checking for file deduplication. The default is 14 days, but you can set the number between 0 to 90 days. If you set the parameter to 0 days, Dremio does not perform file deduplication.
If two files with the same name are written to the specified storage location within the DEDUPE_LOOKBACK_PERIOD, then the second file is considered a duplicate record and is not loaded, even if you explicitly delete and reupload a file of the same name.
AS COPY INTO <table_name> String
Use the COPY INTO command to specify the target table. The name of the target Iceberg table should include the necessary qualifier if the table is not in the current context. Example: catalog.salesschema.table2.
@<storage_location_name> String
The storage location that you want to load files from. The location must be a preconfigured Dremio source.
Autoingest pipes can only ingest data from Amazon S3 sources in Dremio.
/<folder_name> String Optional
The folder and subfolders that you want to load files from. Add /<folder_name> to @<storage_location_name>.
FILE_FORMAT 'csv' | 'json' | 'parquet' String
The format of the file or files to copy data from. FILE_FORMAT must be specified, and all files loaded in a COPY INTO operation must be of the same file format.
You can use uncompressed or compressed CSV and JSON files. Compressed files must be in the gzip format, using the .gz extension, or in the bzip2 format, using the .bz2 extension.
csv_format_options String
Options that describe the formats and other characteristics of the source CSV file or files.
json_format_options String
Options that describe the formats and other characteristics of the source JSON file or files.
parquet_options String
Options that describe the formats and other characteristics of the source PARQUET file or files.
Only the ON ERROR option is supported for Parquet source files.
CSV Format Options
DATE_FORMAT '<string>' String Optional
String that defines the format of date values in the data files to be loaded. If a value is not specified, YYYY-MM-DD is used. See Date/Time Formatting for more format elements.
EMPTY_AS_NULL [ '<boolean>' ] String Optional
Boolean that specifies whether an empty string is considered a NULL field or an empty string. If a value is not specified, TRUE is used.
ESCAPE_CHAR '<escape_character>' String Optional
A single character used as the escape character for the character specified by QUOTE_CHAR. The escape character provides a way to include the QUOTE_CHAR character inside a string literal by modifying the meaning of that character in the string. If a value is not specified, " is used.
EXTRACT_HEADER '<boolean>' String Optional
Boolean that specifies if the first line in the CSV is a header. If a value is not specified, TRUE is used. If SKIP_LINES <n> is also specified and EXTRACT_HEADER is set to TRUE, the n+1 line in the file is considered the header.
FIELD_DELIMITER '<character>' String Optional
The single character used to separate fields in the file. If a value is not specified, "," is used.
NULL_IF ( '<string>' [, ...] ) String Optional
Replace strings in the data load source with NULL.
ON_ERROR 'skip_file' String Optional
Specifies that the COPY INTO operation should stop processing the input file at the first error it encounters during the loading process.
ON_ERROR 'skip_file' is on by default for all autoingest pipes. No other ON_ERROR options are supported.
The first potential error is registered in the sys.project.copy_errors_history table, which you can query to get information about the job that ran the COPY INTO operation.
To get information about rejected records in particular files, query the sys.project.copy_errors_history table to obtain the ID of the job that ran the COPY INTO operation. Then, use the copy_errors() function in a SELECT command, specifying the job ID and the name of the target table.
The 'skip_file' option does not insert any rows from an input file that contains an error and only registers the first error in an input file and stops processing. As a result, the 'skip_file' option requires extra processing on the input files, regardless of the number of errors the input files contain. Skipping large files due to a small number of errors can delay the COPY INTO operation.
QUOTE_CHAR '<character>' String Optional
The single character used to quote field data within a record. The default is a double-quotation mark.
RECORD_DELIMITER '<character>' String Optional
One or more characters that separate records in an input file. Accepts common escape sequences. If a value is not specified, \r\n is used.
SKIP_LINES <n> Integer Optional
Number of lines to ignore at the beginning of each input file. If no value is given, no lines are skipped. Must be a non-negative integer. If SKIP_LINES <n> is specified and EXTRACT_HEADER is also set to TRUE, the n+1 line in the file is considered to be the header.
TIME_FORMAT '<string>' String Optional
String that defines the format of time values in the data files to be loaded. If a value is not specified, HH24:MI:SS.FFF is used. See Date/Time Formatting for more format elements.
TIMESTAMP_FORMAT '<string>' String Optional
String that defines the format of timestamp values in the data files to be loaded. If a value is not specified, YYYY-MM-DD HH24:MI:SS.FFF is used. See Date/Time Formatting for more format elements.
TRIM_SPACE [ '<boolean>' ] String Optional
Boolean that specifies whether or not to remove leading and trailing white space from strings. The default is FALSE.
JSON Format Options
DATE_FORMAT '<string>' String Optional
String that defines the format of date values in the data files to be loaded. If a value is not specified, YYYY-MM-DD is used. See Date/Time Formatting for more format elements.
EMPTY_AS_NULL [ '<boolean>' ] String Optional
Boolean that specifies whether an empty string is considered a NULL field or an empty string. If a value is not specified, TRUE is used.
NULL_IF ( '<string>' [, ...] ) String Optional
Replace strings in the data load source with NULL.
ON_ERROR 'skip_file' String Optional
Specifies that the COPY INTO operation should stop processing the input file at the first error it encounters during the loading process.
ON_ERROR 'skip_file' is on by default for all autoingest pipes. No other ON_ERROR options are supported.
The first potential error is registered in the sys.project.copy_errors_history table, which you can query to get information about the job that ran the COPY INTO operation.
To get information about rejected records in particular files, query the sys.project.copy_errors_history table to obtain the ID of the job that ran the COPY INTO operation. Then, use the copy_errors() function in a SELECT command, specifying the job ID and the name of the target table.
The 'skip_file' option does not insert any rows from an input file that contains an error and only registers the first error in an input file and stops processing. As a result, the 'skip_file' option requires extra processing on the input files, regardless of the number of errors the input files contain. Skipping large files due to a small number of errors can delay the COPY INTO operation.
TIME_FORMAT '<string>' String Optional
String that defines the format of time values in the data files to be loaded. If a value is not specified, HH24:MI:SS.FFF is used. See Date/Time Formatting for more format elements.
TIMESTAMP_FORMAT '<string>' String Optional
String that defines the format of timestamp values in the data files to be loaded. If a value is not specified, YYYY-MM-DD HH24:MI:SS.FFF is used. See Date/Time Formatting for more format elements.
TRIM_SPACE [ '<boolean>' ] String Optional
Boolean that specifies whether or not to remove leading and trailing white space from strings. The default is FALSE.
Parquet Format Options
ON_ERROR 'skip_file' String Optional
Specifies that the COPY INTO operation should stop processing the input file at the first error it encounters during the loading process.
ON_ERROR 'skip_file' is on by default for all autoingest pipes. No other ON_ERROR options are supported.
The first potential error is registered in the sys.project.copy_errors_history table, which you can query to get information about the job that ran the COPY INTO operation.
To get information about rejected records in particular files, query the sys.project.copy_errors_history table to obtain the ID of the job that ran the COPY INTO operation. Then, use the copy_errors() function in a SELECT command, specifying the job ID and the name of the target table.
Examples
Create an autoingest pipe from an Amazon S3 sourceCREATE PIPE Example_pipe
AS COPY INTO Table_one
FROM ‘@s3_source/folder’
FILE_FORMAT 'csv'
CREATE PIPE Example_pipe
DEDUPE_LOOKBACK_PERIOD 5
AS COPY INTO Table_one
FROM ‘@<storage_location_name>/files’
FILE_FORMAT 'csv'
Autoingest Pipe Operation
Follow the steps in this section to configure an autoingest pipe.
Step 1: Create an Iceberg Table
You can create the table in any catalog, although the table schema must match the expected schema for all files in the cloud storage source. See the example below:
Example of creating an Iceberg tableCREATE TABLE Pipe_sink
(Col_one int, Col_two varchar)
If a file has at least one record that does not match the schema, Dremio skips the entire file. To check the file load status, see sys.project.copy_file_history.
Step 2: Create an Autoingest Pipe
When you use the CREATE PIPE SQL command to create an autoingest pipe, you must use a preconfigured data source in Dremio that allows for ingesting files, which is used as the @<storage_location_name> in the COPY INTO statement. In this source, you can specify the folder or its subfolders by adding /<folder_name>. Any new files that are added to the specified folder or its subdirectories are loaded into the target table.
The following example shows how to create an autoingest pipe for an Amazon S3 source:
Example of creating a pipeCREATE PIPE Example_pipe AS
COPY INTO Pipe_sink
FROM '@<storage_location_name>/folder'
FILE_FORMAT 'csv'
Step 3: Retrieve Cloud Command Settings
To link the pipe to your cloud storage location and retrieve cloud command settings, run a SELECT statement for cloud_cli_command_settings:
SELECT cloud_cli_command_settings
FROM sys.project."pipes"
WHERE pipe_name = 'Example_pipe'
The result is a cloud-specific CLI command that you can run to connect your cloud storage to Dremio.
For example, the command returns this output for an Amazon S3 source:
| cloud_cli_command_settings |
|---|
| aws s3api put-bucket-notification-configuration --bucket tlelek-test-bucket --notification-configuration "$(notification=$(aws s3api get-bucket-notification-configuration --bucket pipetest); if [ -z "$notification" ]; then echo '{"TopicConfigurations": []}'; else echo "$notification"; fi | jq --argjson new_config '{"Id":"pipe_5b4b8cdb-a308-4919-ac42-486d4bbcd879_525ffe04-97bb-473e-8e76-fa559faa7d0d","TopicArn":"arn:aws:sns:us-west-2:1234abc:dremio-69505c78-17f8-456e-ba37-aa72a151fec8","Events":["s3:ObjectCreated:Put","s3:ObjectCreated:Post"],"Filter":{"Key":{"FilterRules":[{"Name":"prefix","Value":"ingestionE2E/232"}]}}}' '.TopicConfigurations += [$new_config]')" |
Step 4: Execute Cloud Command Settings
cloud_cli_command_settings is a self-contained AWS CLI command that sets the appropriate event notifications on the Amazon S3 bucket identified in the pipe definition to alert Dremio when new files are added to the bucket.
To execute the CLI command, you must have s3:PutBucketNotification permissions. Contact your cloud administrator to ensure that you have the required permissions.
After you execute the cloud_cli_command_settings CLI command, it may take a few minutes to take effect.
Amazon S3 does not support multiple notifications for the same prefix path, so you must have unique paths for the S3 notification settings.
Step 5: Add New Files to Your Amazon S3 Source
Now that you have established connectivity between your cloud storage and Dremio, the autoingest pipe will run every time you add new files to the source.
If a pipe is operating frequently, you may need to run OPTIMIZE TABLE to compact small data files in your Iceberg table. The frequency of maintenance depends on the incoming file size and load frequency.
For other autoingest pipe commands, see ALTER PIPE, DESCRIBE PIPE, and DROP PIPE.