Performance comparison of Parquet, Vaex, Dask, etc. in HDF5 with multiple files

The other day, I did A simple performance comparison of Pandas, Dask, and Vaex in CSV, Parquet, and HDF5 in a single file.

However, since I often handle a lot of files with time series data etc. in my usual work, this time I will roughly compare the performance including the number of lines that the memory is quite tight when handling multiple files and normally. To go.

TL;DR

――In the time series assumed data I tried this time, there was no difference between Vaex + uncompressed HDF5 and Vaex + Snappy compressed Parquet as I expected. Considering the ease of handling and file size, Parquet with Snappy compression seems to be a good choice. -* In the previous comparison with a single file, HDF5 was faster than Parquet when the number of lines in one file was much larger, so it seems that only one file contains a large amount of data. HDF5 seems to shine in the case. --Pandas and Dask don't seem to be very fast with HDF5 when there are many files in chronological data (especially Dask is significantly slower). ――As time series data, when totaling around 100,000 lines per file as verified this time, it seems that there is no extreme difference between Vaex and Dask (although Vaex is about twice as fast). If anything, the disk access aspect of the environment I tried this time may also be a bottleneck. ――However, it is very comfortable and helpful that the calculation can be completed in about one and a half minutes even if it is less than 200 million lines.

Environment to use

We will proceed with the following Docker image settings. The host is using a Windows 10 laptop.

FROM jupyter/datascience-notebook:python-3.8.6
RUN pip install vaex==2.6.1
RUN pip install jupyter-contrib-nbextensions==0.5.1
RUN jupyter contrib nbextension install --user
RUN jupyter nbextension enable hinterland/hinterland \
    && jupyter nbextension enable toc2/main

The OS and library environment is as follows.

--Ubuntu is 20.04

Target format

In the comparison of the previous article, it was like uncompressed HDF5 or Snappy compressed Parquet, so this time I will deal with two formats, HDF5 and Snappy compressed Parquet, without including CSV.

Also, regarding the library, if you want to compare the number of lines that can fit in the memory, including Pandas, if the number of lines becomes quite large, proceed with only Dask and Vaex.


Data preparation

Code from the previous performance comparison article will be used to some extent.

Assuming time-series data, we will prepare a data set with data of about 80,000 to 120,000 rows and 5 columns per day, and prepare data for 5 years from January 2016 to the end of December 2020.

The five columns are configured as follows.

--column_a: int-> Set a random value in the range of 0 to 4999999. --column_b: str-> Set the date and time character string (example: 2020-12-31 15:10:20). --column_c: int-> Set a random value in the range of 0 to 100. --column_d: int-> Set one of 100, 300, 500, 1000, 3000, 10000. --column_e: str-> char_num 20 Set a lowercase and uppercase alphabetic string of characters.

In addition, HDF5 has a different hierarchical structure depending on the library, so each library is saved separately.

from string import ascii_letters
import random
from datetime import date, datetime

import numpy as np
import pandas as pd
import dask.dataframe as dd
from pandas.tseries.offsets import Day
import vaex


def make_random_ascii_str(char_num):
    """
Generates a random lowercase and uppercase string with the specified number of characters.

    Parameters
    ----------
    char_num : int
The number of characters to generate.

    Returns
    -------
    random_str : str
The generated string.
    """
    return ''.join(random.choices(population=ascii_letters, k=char_num))


def make_pandas_df(row_num, char_num, date_str):
    """
Generate a Pandas data frame for validation.

    Parameters
    ----------
    row_num : int
The number of rows in the data frame to generate.
    char_num : str
The number of characters in the value to set in the string column.
    date_str : str
The string of the target date.

    Returns
    -------
    pandas_df : pd.DataFrame
The generated data frame. Each value is set in the following 5 columns.
        - column_a : int ->A random value in the range 0-4999999.
        - column_b : str ->Date and time strings (eg: 2020-12-31 15:10:20)。
        - column_c : int ->Random value in the range 0-100.
        - column_d : int -> 100, 300, 500, 1000, 3000,Any value of 10000.
        - column_e : str -> char_num A string of lowercase and uppercase alphabets with a number of 20 characters.
    """
    pandas_df = pd.DataFrame(
        index=np.arange(0, row_num), columns=['column_a'])
    row_num = len(pandas_df)
    pandas_df['column_a'] = np.random.randint(0, 5000000, size=row_num)
    
    random_hours = np.random.randint(low=0, high=24, size=row_num)
    random_minutes = np.random.randint(low=0, high=60, size=row_num)
    random_seconds = np.random.randint(low=0, high=60, size=row_num)
    times = []
    for i in range(row_num):
        hour_str = str(random_hours[i]).zfill(2)
        minute_str = str(random_minutes[i]).zfill(2)
        second_str = str(random_seconds[i]).zfill(2)
        time_str = f'{date_str} {hour_str}:{minute_str}:{second_str}'
        times.append(time_str)
    pandas_df['column_b'] = times

    pandas_df['column_c'] = np.random.randint(0, 100, size=row_num)
    pandas_df['column_d'] = np.random.choice(a=[100, 300, 500, 1000, 3000, 10000], size=row_num)
    pandas_df['column_e'] = [make_random_ascii_str(char_num=20) for _ in range(row_num)]
    pandas_df.sort_values(by='column_b', inplace=True)
    return pandas_df


def get_pandas_hdf5_file_path(date_str):
    """
Get the path of the Pandas HDF5 file for the target date.

    Parameters
    ----------
    date_str : str
The character string of the target date.
    
    Returns
    -------
    file_path : str
The generated file path.
    """
    return f'pandas_{date_str}.hdf5'


def get_dask_hdf5_file_path(date_str):
    """
Get the path of the Dask HDF5 file for the target date.

    Parameters
    ----------
    date_str : str
The character string of the target date.
    
    Returns
    -------
    file_path : str
The generated file path.
    """
    return f'dask_{date_str}.hdf5'


def get_vaex_hdf5_file_path(date_str):
    """
Get the path of the Vaex HDF5 file for the target date.

    Parameters
    ----------
    date_str : str
The character string of the target date.
    
    Returns
    -------
    file_path : str
The generated file path.
    """
    return f'vaex_{date_str}.hdf5'


def get_parquet_file_path(date_str):
    """
Get the path of the Parquet file of the target date.

    Parameters
    ----------
    date_str : str
The character string of the target date.
    
    Returns
    -------
    file_path : str
The generated file path.
    """
    return f'{date_str}.parquet'


def save_data():
    """
Save each file of time series data for verification.
    """
    current_date = date(2016, 1, 1)
    last_date = date(2020, 12, 31)
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        row_num = random.randint(80_000, 120_000)
        print(
            datetime.now(), date_str, 'Start saving process. Number of lines:', row_num)
        
        pandas_df = make_pandas_df(
            row_num=row_num, char_num=20, date_str=date_str)
        vaex_df = vaex.from_pandas(df=pandas_df, copy_index=False)
        dask_df = dd.from_pandas(data=pandas_df, npartitions=1)
        
        pandas_df.to_hdf(
            path_or_buf=get_pandas_hdf5_file_path(date_str=date_str),
            key='data', mode='w')
        
        dask_df.to_hdf(
            path_or_buf=get_dask_hdf5_file_path(date_str=date_str),
            key='data', mode='w')
        
        vaex_df.export_hdf5(
            path=get_vaex_hdf5_file_path(date_str=date_str))
        
        vaex_df.export_parquet(
            path=get_parquet_file_path(date_str=date_str))
        
        current_date += Day()


save_data()
2021-01-17 07:58:24.950967 2016-01-01 save processing started. Number of lines: 83831
2021-01-17 07:58:26.994136 2016-01-02 save process started. Number of lines: 117457
2021-01-17 07:58:29.859080 2016-01-Start saving 03. Number of lines: 101470
2021-01-17 07:58:32.381448 2016-01-04 save process started. Number of lines: 88966
...

Add reading and calculation processing

Due to the lazy evaluation of Dask and Vaex, it is not possible to make a comparison only by reading, so we will proceed by assuming a comparison of how long it will take to complete the calculation in the form of reading → processing such as some calculation.

This time, we will set up the following two patterns.

-[Pattern 1]: Slice only the lines containing the character string ab with column_e-> Calculate the number of lines after slicing. -[Pattern 2]: Slice the value of column_a to a value of 3 million or less-> Slice only the line where the value of the string of column_e starts with a-> GROUP BY with the value of column_d-> column_a for each group Calculate the maximum value of.

Addition of reading process

We will add it in Pandas, Dask, and Vaex respectively.

def read_pandas_df_from_hdf5(start_date, last_date):
    """
Reads Pandas dataframes in the specified date range from HDF5 files.

    Parameters
    ----------
    start_date : date
The start date of the date range.
    last_date : date
The last day of the date range.
    
    Returns
    -------
    df : pd.DataFrame
The loaded Pandas data frame.
    """
    df_list = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_pandas_hdf5_file_path(date_str=date_str)
        df = pd.read_hdf(path_or_buf=file_path, key='data')
        df_list.append(df)
        current_date += Day()
    df = pd.concat(df_list, ignore_index=True, copy=False)
    return df


def read_dask_df_from_hdf5(start_date, last_date):
    """
Read Dask dataframes in the specified date range from HDF5 files

    Parameters
    ----------
    start_date : date
The start date of the date range.
    last_date : date
The last day of the date range.
    
    Returns
    -------
    df : dd.DataFrame
The loaded Dask data frame.
    """
    df_list = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_dask_hdf5_file_path(date_str=date_str)
        df = dd.read_hdf(pattern=file_path, key='data')
        df_list.append(df)
        current_date += Day()
    df = dd.concat(dfs=df_list)
    return df


def read_vaex_df_from_hdf5(start_date, last_date):
    """
Read Vaex dataframes in a specified date range from an HDF5 file

    Parameters
    ----------
    start_date : date
The start date of the date range.
    last_date : date
The last day of the date range.
    
    Returns
    -------
    df : vaex.dataframe.DataFrame
The loaded Vaex data frame.
    """
    file_paths = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_vaex_hdf5_file_path(date_str=date_str)
        file_paths.append(file_path)
        current_date += Day()
    vaex_df = vaex.open_many(filenames=file_paths)
    return vaex_df


def read_pandas_df_from_parquet(start_date, last_date):
    """
Read Pandas dataframes in the specified date range from Parquet.

    Parameters
    ----------
    start_date : date
The start date of the date range.
    last_date : date
The last day of the date range.
    
    Returns
    -------
    df : pd.DataFrame
The loaded Pandas data frame.
    """
    df_list = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_parquet_file_path(date_str=date_str)
        df = pd.read_parquet(path=file_path)
        df_list.append(df)
        current_date += Day()
    df = pd.concat(df_list, ignore_index=True, copy=False)
    return df


def read_dask_df_from_parquet(start_date, last_date):
    """
Read Dask dataframes in a specified date range from a Parquet file

    Parameters
    ----------
    start_date : date
The start date of the date range.
    last_date : date
The last day of the date range.
    
    Returns
    -------
    df : dd.DataFrame
The loaded Dask data frame.
    """
    df_list = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_parquet_file_path(date_str=date_str)
        df = dd.read_parquet(path=file_path)
        df_list.append(df)
        current_date += Day()
    df = dd.concat(dfs=df_list)
    return df


def read_vaex_df_from_parquet(start_date, last_date):
    """
Read Vaex dataframes in the specified date range from a Parquet file

    Parameters
    ----------
    start_date : date
The start date of the date range.
    last_date : date
The last day of the date range.
    
    Returns
    -------
    df : vaex.dataframe.DataFrame
The loaded Vaex data frame.
    """
    file_paths = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_parquet_file_path(date_str=date_str)
        file_paths.append(file_path)
        current_date += Day()
    vaex_df = vaex.open_many(filenames=file_paths)
    return vaex_df

Addition of pattern 1 processing

--Slice only lines containing the string ab with column_e --Calculate the number of rows after slicing

We will add the process in each library.

def calculate_pattern_1_with_pandas_df(pandas_df):
    """
Calculate the first pattern in a Pandas data frame.

    Parameters
    ----------
    pandas_df : pd.DataFrame
The target Pandas data frame.
    
    Returns
    -------
    row_count : int
The number of lines in the calculation result.
    """
    pandas_df = pandas_df[pandas_df['column_e'].str.contains('ab')]
    row_count = len(pandas_df)
    return row_count


def calculate_pattern_1_with_dask_df(dask_df):
    """
The calculation of the first pattern is done in the Dask data frame.

    Parameters
    ----------
    dask_df : dd.DataFrame
The data frame of the target Dask.

    Returns
    -------
    row_count : int
The number of lines in the calculation result.
    """
    dask_df = dask_df[dask_df['column_e'].str.contains('ab')]
    row_count = len(dask_df)
    return row_count


def calculate_pattern_1_with_vaex_df(vaex_df):
    """
The calculation of the third pattern is performed in the Vaex data frame.

    Parameters
    ----------
    vaex_df : vaex.dataframe.DataFrame
The target Vaex data frame.

    Returns
    -------
    row_count : int
The number of lines in the calculation result.
    """
    vaex_df = vaex_df[vaex_df['column_e'].str.contains('ab')]
    row_count = len(vaex_df)
    return row_count

Addition of pattern 2 processing

--Slice to a value of 3 million or less for column_a --Slice only to the line where the value of the string of column_e starts with a --GROUP BY with the value of column_d --Calculate the maximum value of column_a for each group

We will add the process in each library.

def calculate_pattern_2_with_pandas_df(pandas_df):
    """
Calculate the second pattern in a Pandas data frame.

    Parameters
    ----------
    pandas_df : pd.DataFrame
The target Pandas data frame.

    Returns
    -------
    max_sr : pd.Series
A series that stores each calculated total value.
    """
    pandas_df = pandas_df[pandas_df['column_a'] <= 3_000_000]
    pandas_df = pandas_df[pandas_df['column_e'].str.startswith('a')]
    grouped = pandas_df.groupby(by='column_d')
    max_df = grouped.max()
    max_sr = max_df['column_a']
    return max_sr


def calculate_pattern_2_with_dask_df(dask_df):
    """
The calculation of the second pattern is done in the Dask data frame.

    Parameters
    ----------
    dask_df : dd.DataFrame
The data frame of the target Dask.

    Returns
    -------
    max_sr : pd.Series
A series that stores each calculated total value.
    """
    dask_df = dask_df[dask_df['column_a'] <= 3_000_000]
    dask_df = dask_df[dask_df['column_e'].str.startswith('a')]
    grouped = dask_df.groupby(by='column_d')
    max_df = grouped.max()
    max_sr = max_df['column_a']
    max_sr = max_sr.compute()
    return max_sr


def calculate_pattern_2_with_vaex_df(vaex_df):
    """
The calculation of the second pattern is performed in the Vaex data frame.

    Parameters
    ----------
    vaex_df : vaex.dataframe.DataFrame
The target Vaex data frame.

    Returns
    -------
    max_sr : pd.Series
A series that stores each calculated total value.
    """
    vaex_df = vaex_df[vaex_df['column_a'] <= 3_000_000]
    vaex_df = vaex_df[vaex_df['column_e'].str.startswith('a')]
    max_df = vaex_df.groupby(
        by='column_d',
        agg={
            'column_a': vaex.agg.max,
        })
    max_df = max_df.to_pandas_df(column_names=['column_a', 'column_d'])
    max_df.index = max_df['column_d']
    max_sr = max_df['column_a']
    return max_sr

Addition of read and aggregate processing

Since each reading process and aggregation process have been added, we will write a process that combines them.

from timeit import timeit
from copy import deepcopy
import sys


class ReadAndCalcRunner:

    def __init__(self, label, pattern, read_func, calc_func):
        """
A class that handles the setting and execution processing of the combination of reading and calculation processing.

        Parameters
        ----------
        label : str
A label for identifying the target combination.
Example: csv_no_compression_pandas
        pattern : int
Target pattern (1 to 3).
        read_func : Callable
A function that handles reading processing. Arguments are optional and change the data frame
You need a format.
        calc_func : Callable
A function that handles computational processing. A format that accepts a data frame as the first argument
You need to specify something.
        """
        self.label = label
        self.pattern = pattern
        self._read_func = read_func
        self._calc_func = calc_func

    def run(self, n, start_date, last_date, debug=False):
        """
Perform reading and calculation processing. After execution, mean_in seconds attribute
The average number of seconds of execution (float) is set.

        Parameters
        ----------
        n : int
Number of executions. The larger the number, the higher the accuracy of the processing time, but it is completed.
Please note that it will take a long time to do so.
        start_date : date
The start date of the date range.
        last_date : date
The last day of the date range.
        debug : bool, default False
Debug settings. If True is specified, the calculation result will be output.
        """
        statement = 'df = self._read_func(start_date=start_date, last_date=last_date);'
        if not debug:
            statement += 'self._calc_func(df);'
        else:
            statement += 'result = self._calc_func(df); print(result)'
        result_seconds = timeit(
            stmt=statement,
            number=n, globals=locals())
        self.mean_seconds = result_seconds / n


this_module = sys.modules[__name__]
FORMATS = (
    'parquet',
    'hdf5',
)
LIBS = (
    'pandas',
    'dask',
    'vaex',
)
PATTERNS = (1, 2)

runners = []

for format_str in FORMATS:
    for lib_str in LIBS:
        for pattern in PATTERNS:
            label = f'{format_str}_{lib_str}'
            read_func_name = f'read_{lib_str}_df_from_{format_str}'
            read_func = getattr(this_module, read_func_name)

            calc_func_name = f'calculate_pattern_{pattern}_with_{lib_str}_df'
            calc_func = getattr(this_module, calc_func_name)

            runners.append(
                ReadAndCalcRunner(
                    label=label,
                    pattern=pattern,
                    read_func=read_func,
                    calc_func=calc_func)
            )

Added processing for plotting

We will add functions for comparing processing times.

import matplotlib
from matplotlib.ticker import ScalarFormatter, FormatStrFormatter
import matplotlib.pyplot as plt

matplotlib.style.use('ggplot')


def plot_time_info(time_sr, patten, print_sr = True):
    """
Plot the processing time.

    Parameters
    ----------
    time_sr : pd.Series
A series that stores the processing time.
    patten : int
Target aggregation processing pattern (1 to 3).
    print_sr : bool, default True
Whether to output and display a series of plot contents.
    """
    sr = time_sr.copy()
    sr.sort_values(inplace=True, ascending=False)
    if print_sr:
        print(sr.sort_values(ascending=True))
    title = f'Read and calculation seconds (pattern: {patten})'
    ax = sr.plot(kind='barh', figsize=(10, len(sr) // 2), title=title)
    ax.xaxis.set_major_formatter(FormatStrFormatter('%.3f'))
    plt.show()

Addition of processing to flow each processing

In order to simplify the description, add a function to process each process (reading, aggregation, visualization) at once.

def run_and_get_result_time_sr(
        runners, pattern, n, start_date, last_date, skip_pandas, skip_dask_hdf5,
        skip_dask_parquet):
    """
Executed each measurement process of the specified pattern and stored the value of each number of seconds of the result.
Get the series.

    Parameters
    ----------
    runners : list of ReadAndCalcRunner
A list of instances that hold definitions of execution processes.
    pattern : int
Pattern to execute(1~3)。
    n : int
Number of executions. The larger the number, the higher the accuracy of the processing time, but it is completed.
Please note that it will take a long time to do so.
    start_date : date
The start date of the date range.
    last_date : date
The last day of the date range.
    skip_pandas : bool
Whether to skip Pandas processing.
    skip_dask_hdf5 : bool
Whether to skip processing of HDF5 format Dask.
    skip_dask_parquet : bool
Whether to skip the processing of Dask in Parquet format.

    Returns
    -------
    sr : pd.Series
A series that stores measurement results. Each format and index
A concatenated string of labels and patterns for library identification is set,
The value is set to the number of seconds.
    """
    runners = deepcopy(runners)
    data_dict = {}
    for runner in runners:
        if runner.pattern != pattern:
            continue
        if skip_pandas and 'pandas' in runner.label:
            continue
        if skip_dask_hdf5 and 'dask' in runner.label and 'hdf5' in runner.label:
            continue
        if skip_dask_parquet and 'dask' in runner.label and 'parquet' in runner.label:
            continue
        label = f'{runner.label}_{pattern}'
        print(datetime.now(), label, 'Start processing...')
        runner.run(n=n, start_date=start_date, last_date=last_date)
        data_dict[label] = runner.mean_seconds
    sr = pd.Series(data=data_dict)
    return sr


def run_overall(
        n, pattern, start_date, last_date, skip_pandas, skip_dask_hdf5,
        skip_dask_parquet):
    """
Each process (reading, aggregation, visualization, etc.) for the target pattern
Run overall.

    Parameters
    ----------
    n : int
The number of reads and calculations performed. The larger the number, the more accurate the processing time,
Please note that it will take a long time to complete.
    pattern : int
The target calculation pattern (1 to 3).
    start_date : date
The start date of the date range.
    last_date : date
The last day of the date range.
    skip_pandas : bool
Whether to skip Pandas processing.
    skip_dask_hdf5 : bool
Whether to skip processing of HDF5 format Dask.
    skip_dask_parquet : bool
Whether to skip the processing of Dask in Parquet format.
    """
    time_sr = run_and_get_result_time_sr(
        runners=runners, pattern=pattern, n=n, start_date=start_date,
        last_date=last_date, skip_pandas=skip_pandas,
        skip_dask_hdf5=skip_dask_hdf5,
        skip_dask_parquet=skip_dask_parquet)
    plot_time_info(time_sr=time_sr, patten=pattern)

Actually execute

Now that we are ready, we will add up each processing time. Try each of the following conditions. In addition, Pandas targets up to 3 months, and after that, it will be executed only in Dask and Vaex.

――One month (about 3 million lines) ――For 3 months (about 9 million lines) ――For 6 months (about 18 million lines) --One year's worth (about 36 million lines) ――For 3 years (about 108 million lines) ――For 5 years (about 180 million lines)

Aggregation for one month

pattern 1:

run_overall(
    n=5, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 1, 31),
    skip_pandas=False)
hdf5_vaex_1         1.015516
parquet_vaex_1      1.028920
parquet_pandas_1    2.685143
parquet_dask_1      2.828006
hdf5_pandas_1       3.311069
hdf5_dask_1         7.616159

image.png

--Although it is Snappy compressed, the processing time of Parquet + Vaex is quite close to that of uncompressed HDF5. --Pandas and Dask seem to be slower in HDF5 (despite uncompressed). I'm particularly concerned about Dask's processing time.

Pattern 2:

run_overall(
    n=5, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 1, 31),
    skip_pandas=False)
hdf5_vaex_2         0.766808
parquet_vaex_2      0.848183
parquet_pandas_2    2.436566
parquet_dask_2      2.961728
hdf5_pandas_2       4.134251
hdf5_dask_2         8.657277

image.png

Aggregation for 3 months

pattern 1:

run_overall(
    n=5, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 3, 31),
    skip_pandas=False)
hdf5_vaex_1          2.260799
parquet_vaex_1       2.649166
parquet_dask_1       8.578201
parquet_pandas_1     8.656629
hdf5_pandas_1        9.994132
hdf5_dask_1         22.766739

image.png

Pattern 2:

run_overall(
    n=5, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 3, 31),
    skip_pandas=False)
hdf5_vaex_2          1.894970
parquet_vaex_2       2.529330
parquet_pandas_2     7.110901
hdf5_pandas_2        9.252198
parquet_dask_2      10.688318
hdf5_dask_2         23.362928

image.png

Aggregation for 6 months

From here, skip Pandas and proceed only with Dask and Vaex. Also, due to processing time, we will reduce the number of executions for each pattern from 5 to 3 times.

In addition, Dask + HDF5 is quite slow, so I'll adjust it so that it can be skipped due to processing time.

pattern 1:

run_overall(
    n=3, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 6, 30),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_1        4.621812
parquet_vaex_1     5.633019
parquet_dask_1    17.827765

image.png

Pattern 2:

run_overall(
    n=3, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 6, 30),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_2        4.231214
parquet_vaex_2     5.312496
parquet_dask_2    17.153308

image.png

Aggregation for one year

We will reduce the number of executions to 2.

pattern 1:

run_overall(
    n=2, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_1        9.618381
parquet_vaex_1    11.091080
parquet_dask_1    36.335810

image.png

Pattern 2:

run_overall(
    n=2, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_2        9.132881
parquet_vaex_2    11.136143
parquet_dask_2    34.377085

image.png

Aggregation for 3 years

The number of trials will be greatly reduced, but after that, due to processing time, it will be executed only once.

pattern 1:

run_overall(
    n=1, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2018, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_1        40.676083
parquet_vaex_1     43.035784
parquet_dask_1    100.698389

image.png

Pattern 2:

run_overall(
    n=1, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2018, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_2        40.061167
parquet_vaex_2     42.218093
parquet_dask_2    102.830116

image.png

Aggregation for 5 years

Is it affected by running on Docker? For some reason, the Jupyter kernel crashes while running Dask's, so I'll try to proceed with Vaex only for 5 years.

pattern 1:

run_overall(
    n=1, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2020, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True,
    skip_dask_parquet=True)
parquet_vaex_1    95.578259
hdf5_vaex_1       99.258315

image.png

Since the number of trials is one, there is a possibility that it may have shaken, but the result is faster with Parquet.

Pattern 2:

run_overall(
    n=1, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2020, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True,
    skip_dask_parquet=True)
hdf5_vaex_2       78.231278
parquet_vaex_2    93.696743

Here, HDF5 is usually faster.

image.png

Recommended Posts

Performance comparison of Parquet, Vaex, Dask, etc. in HDF5 with multiple files
I compared the performance of Vaex, Dask, and Pandas in CSV, Parquet, and HDF5 formats (for single files).
Configure a module with multiple files in Django
Upload multiple files in Flask
Convert files written in python etc. to pdf with syntax highlighting
Get a list of files in a folder with python without a path