По состоянию на август 2017 г., Pandas DataFame.apply (), к сожалению, по-прежнему ограничен работой с одним ядром, а это означает, что многоядерный компьютер будет тратить большую часть своего вычислительного времени при запускеdf.apply (myfunc, axis = 1).

Как вы можете использовать все свои ядра для параллельного запуска приложения на фреймворке данных?

Roko Mijic

Ответов: 10

Ответы (10)

Вы можете использовать swifter пакет:

pip install swifter

(Обратите внимание, что вы можете использовать это в virtualenv, чтобы избежать конфликтов версий с установленными зависимостями.)

Swifter работает как плагин для панд, позволяя повторно использовать функцию apply:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Он автоматически определит наиболее эффективный способ распараллеливания функции, независимо от того, векторизован он (как в приведенном выше примере) или нет.

Дополнительные примеры и сравнение производительности доступны на GitHub. Обратите внимание, что пакет находится в активной разработке, поэтому API может измениться.

Также обратите внимание, что этот не будет работать автоматически для строковых столбцов. При использовании строк Swifter вернется к «простому» Pandas apply, который не будет параллельным. В этом случае даже принудительное использование dask не приведет к повышению производительности, и вам будет лучше просто разделить набор данных вручную и распараллелить с использованием multiprocessing.

Поскольку вопрос был «Как вы можете использовать все свои ядра для параллельного запуска apply на фрейме данных?», ответ также может быть с modin. Можно запустить все ядра параллельно, хотя в реальном времени хуже.

См. https://github.com/modin-project/modin. Он запускается поверх dask или ray. Они говорят: «Modin - это DataFrame, предназначенный для наборов данных от 1 МБ до 1 ТБ +». Я пробовал: pip3 install "modin" [ray] ". Modin vs pandas было - 12 секунд на шести ядрах против 6 секунд

Вот еще один, использующий Joblib и некоторый вспомогательный код из scikit-learn. Легкий (если у вас уже есть scikit-learn), хорошо, если вы предпочитаете больший контроль над тем, что он делает, поскольку joblib легко взломать.

from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples


def parallel_apply(df, func, n_jobs= -1, **kwargs):
    """ Pandas apply in parallel using joblib. 
    Uses sklearn.utils to partition input evenly.
    
    Args:
        df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
        func: Callable to apply
        n_jobs: Desired number of workers. Default value -1 means use all available cores.
        **kwargs: Any additional parameters will be supplied to the apply function
        
    Returns:
        Same as for normal Pandas DataFrame.apply()
        
    """
    
    if effective_n_jobs(n_jobs) == 1:
        return df.apply(func, **kwargs)
    else:
        ret = Parallel(n_jobs=n_jobs)(
            delayed(type(df).apply)(df[s], func, **kwargs)
            for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
        return pd.concat(ret)

Использование: результат = parallel_apply (my_dataframe, my_func)

Вместо

df["new"] = df["old"].map(fun)

делать

from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])

Для меня это небольшое улучшение по сравнению с

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
    df["new"] = pool.map(fun, df["old"])

, когда вы получаете индикацию прогресса и автоматическое пакетирование, если задания очень маленькие.

Просто хочу дать ответ на обновление Dask

import dask.dataframe as dd

def your_func(row):
  #do something
  return row

ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()

На моих 100000 записей, без Dask:

Время ЦП: пользовательские 6 минут 32 с, sys: 100 мс, всего: 6 минут 32 с Время на стене: 6 мин 32 с

С Dask:

Время ЦП: пользовательское 5,19 с, системное: 784 мс, всего: 5,98 с Время на стене: 1 мин 3 с

Самый простой способ - использовать Dask's map_partitions. Вам понадобится этот импорт (вам потребуется pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

и синтаксис

data = 
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return 

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(я считаю, что 30 - подходящее количество разделов, если у вас 16 ядер). Для полноты картины я рассчитал разницу на своей машине (16 ядер):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28,16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2,708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0,010668013244867325

Дает коэффициент ускорения 10 при переходе от pandas применяется к dask, применяемому к разделам. Конечно, если у вас есть функция, которую можно векторизовать, вы должны - в этом случае функция (y * (x ** 2 + 1)) тривиально векторизована, но есть много вещей, которые невозможно векторизовать.

вместо этого вы можете попробовать pandarallel: простой и эффективный инструмент для распараллеливания операций pandas на всех ваших процессорах (в Linux и macOS)

  • Распараллеливание требует затрат (создание экземпляров новых процессов, отправка данных через разделяемую память и т. Д.), Поэтому распараллеливание эффективно только в том случае, если объем вычислений для распараллеливания достаточно высок. При очень небольшом объеме данных использование параллелизации не всегда того стоит.
  • Применяемые функции НЕ должны быть лямбда-функциями.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

см. https://github.com/nalepae/pandarallel

Вот пример базового трансформатора sklearn, в котором pandas apply распараллеливается

импортировать многопроцессорность как mp
из sklearn.base импортировать TransformerMixin, BaseEstimator

класс ParllelTransformer (BaseEstimator, TransformerMixin):
    def __init __ (себя,
                 n_jobs = 1):
        "" "
        n_jobs - параллельные задания для запуска
        "" "
        self.variety = разнообразие
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit (self, X, y = None):
        вернуть себя
    def transform (self, X, * _):
        X_copy = X.copy ()
        ядра = mp.cpu_count ()
        перегородки = 1

        если self.n_jobs <= -1:
            разделы = ядра
        elif self.n_jobs <= 0:
            перегородки = 1
        еще:
            разделы = мин (self.n_jobs, ядра)

        если разделы == 1:
            # преобразовать последовательно
            вернуть X_copy.apply (self._transform_one)

        # разбиение данных на партии
        data_split = np.array_split (X_copy, разделы)

        pool = mp.Pool (ядра)

        # Здесь функция reduce - объединение преобразованных пакетов
        data = pd.concat (
            pool.map (self._preprocess_part, data_split)
        )

        pool.close ()
        pool.join ()
        вернуть данные
    def _transform_part (self, df_part):
        вернуть df_part.apply (self._transform_one)
    def _transform_one (сам, строка):
        # здесь какие-то трансформации
        обратная линия

подробнее см. https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

Если вы хотите остаться в нативном Python:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

применит функцию f параллельно к столбцу col фрейма данных df

Чтобы использовать все (физические или логические) ядра, вы можете попробовать mapply в качестве альтернативы swifter и pandarallel.

Вы можете установить количество ядер (и поведение фрагментов) при инициализации:

импортировать панды как pd
импортное сопоставление

mapply.init (n_workers = -1)

...

df.mapply (myfunc, ось = 1)

По умолчанию (n_workers = -1) пакет использует все физические процессоры, доступные в системе. Если ваша система использует гиперпоточность (обычно отображается вдвое больше физических процессоров), mapply порождает одного дополнительного рабочего, который будет отдавать приоритет многопроцессорному пулу над другими процессами в системе.

В зависимости от вашего определения всех ваших ядер, вы также можете использовать вместо этого все логические ядра (имейте в виду, что в этом случае процессы, связанные с процессором, будут сражаться за физические процессоры, что может замедлить вашу работу) :

импорт многопроцессорности
n_workers = multiprocessing.cpu_count ()

# или более явный
import psutil
n_workers = psutil.cpu_count (логический = Истина)

2022 WebDevInsider