Skip to main content

CREATE PIPE Preview

Create a pipe object that automatically ingests files from a cloud storage location.

Syntax
CREATE PIPE [ IF NOT EXISTS ] <pipe_name>
[ DEDUPE_LOOKBACK_PERIOD <number_of_days> ]
AS COPY INTO <table_name>
FROM '@<storage_location_name>[ /<folder_name> ]'
[ FILE_FORMAT 'csv' | 'json' | 'parquet']
[ ( [csv_format_options] | [json_format_options] | [parquet_format_options] ) ]
csv_format_options
[ DATE_FORMAT '<string>' ]
[ EMPTY_AS_NULL [ '<boolean>' ] [, ...] ]
[ ESCAPE_CHAR '<escape_character>' ]
[ EXTRACT_HEADER '<boolean>' ]
[ FIELD_DELIMITER '<character>' ]
[ NULL_IF ( '<string>' [, ...] ) ]
[ ON_ERROR 'skip_file' ]
[ QUOTE_CHAR '<character>' ]
[ RECORD_DELIMITER '<character>' ]
[ SKIP_LINES <n> ]
[ TIME_FORMAT '<string>' ]
[ TIMESTAMP_FORMAT '<string>' ]
[ TRIM_SPACE [ '<boolean>' ] ]
json_format_options
[ DATE_FORMAT '<string>' ]
[ EMPTY_AS_NULL [ '<boolean>' ] [, ...] ]
[ NULL_IF ( '<string>' [, ...] ) [, ...] ]
[ ON_ERROR 'skip_file' ]
[ TIME_FORMAT '<string>' ]
[ TIMESTAMP_FORMAT '<string>' ]
[ TRIM_SPACE [ '<boolean>' ] ]
parquet_format_options
[ ON_ERROR 'skip_file' ]

Parameters

[ IF NOT EXISTS ] String   Optional

If you include this clause, the command runs regardless of whether the pipe exists and you receive a summary indicating whether the pipe could be created. If this clause is not specified, the command fails if the pipe to be created already exists.


<pipe_name> String

The unique name of the autoingest pipe that you are creating.


DEDUPE_LOOKBACK_PERIOD <number_of_days> Integer   Optional

The number of days that Dremio should look back when checking for file deduplication. The default is 14 days, but you can set the number between 0 to 90 days. If you set the parameter to 0 days, Dremio does not perform file deduplication.

If two files with the same name are written to the specified storage location within the DEDUPE_LOOKBACK_PERIOD, then the second file is considered a duplicate record and is not loaded, even if you explicitly delete and reupload a file of the same name.


AS COPY INTO <table_name> String

Use the COPY INTO command to specify the target table. The name of the target Iceberg table should include the necessary qualifier if the table is not in the current context. Example: catalog.salesschema.table2.


@<storage_location_name> String

The storage location that you want to load files from. The location must be a preconfigured Dremio source.

note

Autoingest pipes can only ingest data from Amazon S3 sources in Dremio.


/<folder_name> String   Optional

The folder and subfolders that you want to load files from. Add /<folder_name> to @<storage_location_name>.


FILE_FORMAT 'csv' | 'json' | 'parquet' String

The format of the file or files to copy data from. FILE_FORMAT must be specified, and all files loaded in a COPY INTO operation must be of the same file format.

You can use uncompressed or compressed CSV and JSON files. Compressed files must be in the gzip format, using the .gz extension, or in the bzip2 format, using the .bz2 extension.


csv_format_options String

Options that describe the formats and other characteristics of the source CSV file or files.


json_format_options String

Options that describe the formats and other characteristics of the source JSON file or files.


parquet_options String

Options that describe the formats and other characteristics of the source PARQUET file or files.

note

Only the ON ERROR option is supported for Parquet source files.

CSV Format Options

DATE_FORMAT '<string>' String   Optional

String that defines the format of date values in the data files to be loaded. If a value is not specified, YYYY-MM-DD is used. See Date/Time Formatting for more format elements.


EMPTY_AS_NULL [ '<boolean>' ] String   Optional

Boolean that specifies whether an empty string is considered a NULL field or an empty string. If a value is not specified, TRUE is used.


ESCAPE_CHAR '<escape_character>' String   Optional

A single character used as the escape character for the character specified by QUOTE_CHAR. The escape character provides a way to include the QUOTE_CHAR character inside a string literal by modifying the meaning of that character in the string. If a value is not specified, " is used.


EXTRACT_HEADER '<boolean>' String   Optional

Boolean that specifies if the first line in the CSV is a header. If a value is not specified, TRUE is used. If SKIP_LINES <n> is also specified and EXTRACT_HEADER is set to TRUE, the n+1 line in the file is considered the header.


FIELD_DELIMITER '<character>' String   Optional

The single character used to separate fields in the file. If a value is not specified, "," is used.


NULL_IF ( '<string>' [, ...] ) String   Optional

Replace strings in the data load source with NULL.


ON_ERROR 'skip_file' String   Optional

Specifies that the COPY INTO operation should stop processing the input file at the first error it encounters during the loading process.

note

ON_ERROR 'skip_file' is on by default for all autoingest pipes. No other ON_ERROR options are supported.

The first potential error is registered in the sys.project.copy_errors_history table, which you can query to get information about the job that ran the COPY INTO operation.

To get information about rejected records in particular files, query the sys.project.copy_errors_history table to obtain the ID of the job that ran the COPY INTO operation. Then, use the copy_errors() function in a SELECT command, specifying the job ID and the name of the target table.

The 'skip_file' option does not insert any rows from an input file that contains an error and only registers the first error in an input file and stops processing. As a result, the 'skip_file' option requires extra processing on the input files, regardless of the number of errors the input files contain. Skipping large files due to a small number of errors can delay the COPY INTO operation.


QUOTE_CHAR '<character>' String   Optional

The single character used to quote field data within a record. The default is a double-quotation mark.


RECORD_DELIMITER '<character>' String   Optional

One or more characters that separate records in an input file. Accepts common escape sequences. If a value is not specified, \r\n is used.


SKIP_LINES <n> Integer   Optional

Number of lines to ignore at the beginning of each input file. If no value is given, no lines are skipped. Must be a non-negative integer. If SKIP_LINES <n> is specified and EXTRACT_HEADER is also set to TRUE, the n+1 line in the file is considered to be the header.


TIME_FORMAT '<string>' String   Optional

String that defines the format of time values in the data files to be loaded. If a value is not specified, HH24:MI:SS.FFF is used. See Date/Time Formatting for more format elements.


TIMESTAMP_FORMAT '<string>' String   Optional

String that defines the format of timestamp values in the data files to be loaded. If a value is not specified, YYYY-MM-DD HH24:MI:SS.FFF is used. See Date/Time Formatting for more format elements.


TRIM_SPACE [ '<boolean>' ] String   Optional

Boolean that specifies whether or not to remove leading and trailing white space from strings. The default is FALSE.

JSON Format Options

DATE_FORMAT '<string>' String   Optional

String that defines the format of date values in the data files to be loaded. If a value is not specified, YYYY-MM-DD is used. See Date/Time Formatting for more format elements.


EMPTY_AS_NULL [ '<boolean>' ] String   Optional

Boolean that specifies whether an empty string is considered a NULL field or an empty string. If a value is not specified, TRUE is used.


NULL_IF ( '<string>' [, ...] ) String   Optional

Replace strings in the data load source with NULL.


ON_ERROR 'skip_file' String   Optional

Specifies that the COPY INTO operation should stop processing the input file at the first error it encounters during the loading process.

note

ON_ERROR 'skip_file' is on by default for all autoingest pipes. No other ON_ERROR options are supported.

The first potential error is registered in the sys.project.copy_errors_history table, which you can query to get information about the job that ran the COPY INTO operation.

To get information about rejected records in particular files, query the sys.project.copy_errors_history table to obtain the ID of the job that ran the COPY INTO operation. Then, use the copy_errors() function in a SELECT command, specifying the job ID and the name of the target table.

The 'skip_file' option does not insert any rows from an input file that contains an error and only registers the first error in an input file and stops processing. As a result, the 'skip_file' option requires extra processing on the input files, regardless of the number of errors the input files contain. Skipping large files due to a small number of errors can delay the COPY INTO operation.


TIME_FORMAT '<string>' String   Optional

String that defines the format of time values in the data files to be loaded. If a value is not specified, HH24:MI:SS.FFF is used. See Date/Time Formatting for more format elements.


TIMESTAMP_FORMAT '<string>' String   Optional

String that defines the format of timestamp values in the data files to be loaded. If a value is not specified, YYYY-MM-DD HH24:MI:SS.FFF is used. See Date/Time Formatting for more format elements.


TRIM_SPACE [ '<boolean>' ] String   Optional

Boolean that specifies whether or not to remove leading and trailing white space from strings. The default is FALSE.

Parquet Format Options

ON_ERROR 'skip_file' String   Optional

Specifies that the COPY INTO operation should stop processing the input file at the first error it encounters during the loading process.

note

ON_ERROR 'skip_file' is on by default for all autoingest pipes. No other ON_ERROR options are supported.

The first potential error is registered in the sys.project.copy_errors_history table, which you can query to get information about the job that ran the COPY INTO operation.

To get information about rejected records in particular files, query the sys.project.copy_errors_history table to obtain the ID of the job that ran the COPY INTO operation. Then, use the copy_errors() function in a SELECT command, specifying the job ID and the name of the target table.

Examples

Create an autoingest pipe from an Amazon S3 source
CREATE PIPE Example_pipe
AS COPY INTO Table_one
FROM@s3_source/folder’
FILE_FORMAT 'csv'
Create an autoingest pipe with a custom lookback period
CREATE PIPE Example_pipe
DEDUPE_LOOKBACK_PERIOD 5
AS COPY INTO Table_one
FROM ‘@<storage_location_name>/files’
FILE_FORMAT 'csv'

Autoingest Pipe Operation

Follow the steps in this section to configure an autoingest pipe.

Step 1: Create an Iceberg Table

You can create the table in any catalog, although the table schema must match the expected schema for all files in the cloud storage source. See the example below:

Example of creating an Iceberg table
CREATE TABLE Pipe_sink
(Col_one int, Col_two varchar)

If a file has at least one record that does not match the schema, Dremio skips the entire file. To check the file load status, see sys.project.copy_file_history.

Step 2: Create an Autoingest Pipe

When you use the CREATE PIPE SQL command to create an autoingest pipe, you must use a preconfigured data source in Dremio that allows for ingesting files, which is used as the @<storage_location_name> in the COPY INTO statement. In this source, you can specify the folder or its subfolders by adding /<folder_name>. Any new files that are added to the specified folder or its subdirectories are loaded into the target table.

The following example shows how to create an autoingest pipe for an Amazon S3 source:

Example of creating a pipe
CREATE PIPE Example_pipe AS
COPY INTO Pipe_sink
FROM '@<storage_location_name>/folder'
FILE_FORMAT 'csv'

Step 3: Retrieve Cloud Command Settings

To link the pipe to your cloud storage location and retrieve cloud command settings, run a SELECT statement for cloud_cli_command_settings:

Example of retrieving cloud command settings
SELECT cloud_cli_command_settings
FROM sys.project."pipes"
WHERE pipe_name = 'Example_pipe'

The result is a cloud-specific CLI command that you can run to connect your cloud storage to Dremio.

For example, the command returns this output for an Amazon S3 source:

cloud_cli_command_settings
aws s3api put-bucket-notification-configuration --bucket tlelek-test-bucket --notification-configuration "$(notification=$(aws s3api get-bucket-notification-configuration --bucket pipetest); if [ -z "$notification" ]; then echo '{"TopicConfigurations": []}'; else echo "$notification"; fi | jq --argjson new_config '{"Id":"pipe_5b4b8cdb-a308-4919-ac42-486d4bbcd879_525ffe04-97bb-473e-8e76-fa559faa7d0d","TopicArn":"arn:aws:sns:us-west-2:1234abc:dremio-69505c78-17f8-456e-ba37-aa72a151fec8","Events":["s3:ObjectCreated:Put","s3:ObjectCreated:Post"],"Filter":{"Key":{"FilterRules":[{"Name":"prefix","Value":"ingestionE2E/232"}]}}}' '.TopicConfigurations += [$new_config]')"

Step 4: Execute Cloud Command Settings

cloud_cli_command_settings is a self-contained AWS CLI command that sets the appropriate event notifications on the Amazon S3 bucket identified in the pipe definition to alert Dremio when new files are added to the bucket.

To execute the CLI command, you must have s3:PutBucketNotification permissions. Contact your cloud administrator to ensure that you have the required permissions.

After you execute the cloud_cli_command_settings CLI command, it may take a few minutes to take effect.

note

Amazon S3 does not support multiple notifications for the same prefix path, so you must have unique paths for the S3 notification settings.

Step 5: Add New Files to Your Amazon S3 Source

Now that you have established connectivity between your cloud storage and Dremio, the autoingest pipe will run every time you add new files to the source.

If a pipe is operating frequently, you may need to run OPTIMIZE TABLE to compact small data files in your Iceberg table. The frequency of maintenance depends on the incoming file size and load frequency.

tip

For other autoingest pipe commands, see ALTER PIPE, DESCRIBE PIPE, and DROP PIPE.