Мы разрабатываем программу, которая принимает и пересылает «сообщения», сохраняя при этом временную историю этих сообщений, чтобы она могла рассказать вам историю сообщений, если потребуется. Сообщения идентифицируются численно, обычно имеют размер около 1 килобайта, и нам необходимо хранить сотни тысяч таких сообщений.

Мы хотим оптимизировать эту программу для задержки: время между отправкой и получением сообщения должно быть меньше 10 миллисекунд.

Программа написана на Haskell и скомпилирована с помощью GHC. Однако мы обнаружили, что паузы при сборке мусора слишком велики для наших требований к задержке: более 100 миллисекунд в нашей реальной программе.

Следующая программа является упрощенной версией нашего приложения. Он использует Data.Map.Strict для хранения сообщений. Сообщения имеют вид ByteStrings, идентифицируемые Int. 1000000 сообщений вставляются в возрастающем числовом порядке, а самые старые сообщения постоянно удаляются, чтобы сохранить в истории не более 200000 сообщений.

module Main (main) where

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if 200000 < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

Мы скомпилировали и запустили эту программу, используя:

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
   3,116,460,096 bytes allocated in the heap
     385,101,600 bytes copied during GC
     235,234,800 bytes maximum residency (14 sample(s))
     124,137,808 bytes maximum slop
             600 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      6558 colls,     0 par    0.238s   0.280s     0.0000s    0.0012s
  Gen  1        14 colls,     0 par    0.179s   0.250s     0.0179s    0.0515s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.652s  (  0.745s elapsed)
  GC      time    0.417s  (  0.530s elapsed)
  EXIT    time    0.010s  (  0.052s elapsed)
  Total   time    1.079s  (  1.326s elapsed)

  %GC     time      38.6%  (40.0% elapsed)

  Alloc rate    4,780,213,353 bytes per MUT second

  Productivity  61.4% of total user, 49.9% of total elapsed

Важным показателем здесь является «максимальная пауза» 0,0515 с или 51 миллисекунда. Мы хотим уменьшить это как минимум на порядок.

Эксперименты показывают, что длина паузы GC определяется количеством сообщений в истории. Отношения примерно линейны или, возможно, суперлинейны. В следующей таблице показана эта взаимосвязь. (Вы можете увидеть наши сравнительные тесты здесь, а некоторые графики здесь.)

msgs history length  max GC pause (ms)
===================  =================
12500                                3
25000                                6
50000                               13
100000                              30
200000                              56
400000                             104
800000                             199
1600000                            487
3200000                           1957
6400000                           5378

Мы экспериментировали с несколькими другими переменными, чтобы выяснить, могут ли они уменьшить эту задержку, ни одна из которых не имеет большого значения. Среди этих неважных переменных: оптимизация (-O, -O2); Параметры RTS GC (-G, -H, -A, -c), количество ядер (-N), различные структуры данных (Data.Sequence), размер сообщений и количество сгенерированного кратковременного мусора. Неоспоримым определяющим фактором является количество сообщений в истории.

Наша рабочая теория состоит в том, что паузы линейны по количеству сообщений, потому что каждый цикл GC должен проходить через всю доступную рабочую память и копировать ее, что явно является линейной операцией.

Вопросы:

  • Верна ли эта теория линейного времени? Можно ли так просто выразить длину пауз сборщика мусора или же реальность сложнее?
  • Если пауза GC линейна в рабочей памяти, есть ли способ уменьшить задействованные постоянные факторы?
  • Есть ли варианты инкрементного GC или чего-то подобного? Мы можем видеть только исследовательские работы. Мы очень охотно жертвуем пропускной способностью для уменьшения задержки.
  • Есть ли какие-либо способы «разбить» память на меньшие циклы сборки мусора, кроме разделения на несколько процессов?

Ответы (5)

На самом деле у вас неплохо получается иметь время паузы 51 мс с более чем 200 МБ данных в реальном времени. Система, над которой я работаю, имеет большее максимальное время паузы с половиной этого объема живых данных.

Ваше предположение верно, время основной паузы сборщика мусора прямо пропорционально количеству данных в реальном времени, и, к сожалению, с GHC в его нынешнем виде этого не избежать. В прошлом мы экспериментировали с инкрементным сборщиком мусора, но это был исследовательский проект, и он не достиг уровня зрелости, необходимого для включения его в выпущенный GHC.

Одна вещь, которая, как мы надеемся, поможет с этим в будущем, - это компактные области: https://phabricator.haskell.org/D1264. Это своего рода ручное управление памятью, при котором вы уплотняете структуру в куче, и сборщику мусора не нужно ее пересекать. Он лучше всего работает с долгоживущими данными, но, возможно, его будет достаточно для использования для отдельных сообщений в ваших настройках. Мы планируем включить его в GHC 8.2.0.

Если вы находитесь в распределенной среде и у вас есть какой-то балансировщик нагрузки, вы можете использовать уловки, чтобы избежать попадания в паузу, вы в основном убедитесь, что балансировщик нагрузки не отправляет запросы к машинам, которые собираются выполнить крупный сборщик мусора и, конечно же, убедитесь, что машина по-прежнему завершает сборщик мусора, даже если не получает запросов.

Итак, вы обнаружили ограничение языков с GC: они не подходят для жестких систем реального времени.

У вас есть 2 варианта:

1st Увеличьте размер кучи и используйте двухуровневую систему кэширования, самые старые сообщения отправляются на диск, и вы сохраняете самые новые сообщения в памяти, вы можете сделать это с помощью подкачки ОС. Однако проблема с этим решением заключается в том, что подкачка может быть дорогостоящей в зависимости от возможностей чтения используемого вторичного блока памяти.

2-е. Запрограммируйте это решение, используя «C», и соедините его с FFI для haskell. Таким образом, вы можете управлять своей памятью. Это будет лучший вариант, так как вы можете контролировать объем необходимой памяти самостоятельно.

Как упоминалось в других ответах, сборщик мусора в GHC просматривает данные в реальном времени, что означает, что чем больше долговечных данных вы храните в памяти, тем дольше будут паузы GC.

GHC 8,2

Чтобы частично решить эту проблему, в GHC-8.2 была введена функция под названием компактные области. Это и функция исполняющей системы GHC, и библиотека, которая предоставляет удобный интерфейс для работы. Функция компактных регионов позволяет помещать ваши данные в отдельное место в памяти, и сборщик мусора не будет пересекать его во время фазы сбора мусора. Поэтому, если у вас есть большая структура, которую вы хотите сохранить в памяти, подумайте об использовании компактных областей. Однако сама компактная область не имеет мини-сборщика мусора внутри, она лучше работает для только для добавления структур данных, а не для чего-то вроде HashMap где вы также хотите удалить материал. Хотя вы можете побороть эту проблему. Дополнительные сведения см. В следующем сообщении в блоге:

GHC 8.10

Более того, начиная с GHC-8.10 реализован новый инкрементный алгоритм сборщика мусора с малой задержкой. Это альтернативный алгоритм сборки мусора, который не включен по умолчанию, но вы можете выбрать его, если хотите. Таким образом, вы можете переключить сборщик мусора по умолчанию на более новый, чтобы автоматически получить функции, предоставляемые компактными регионами, без необходимости выполнять упаковку и развертывание вручную. Однако новый GC - не серебряная пуля, и он не решает все проблемы автоматически, и у него есть свои компромиссы. Для тестов нового GC обратитесь к следующему репозиторию GitHub:

Я пробовал ваш фрагмент кода с подходом кольцевого буфера, используя IOVector в качестве базовой структуры данных. В моей системе (GHC 7.10.3, те же параметры компиляции) это привело к сокращению максимального времени (метрика, которую вы указали в своем OP) на ~ 22%.

Примечание. Я сделал здесь два предположения:

  1. Изменяемая структура данных подходит для этой проблемы (я полагаю, что передача сообщений в любом случае подразумевает ввод-вывод)
  2. Ваши идентификаторы сообщений являются непрерывными

С некоторым дополнительным параметром Int и арифметикой (например, когда messageId сбрасывается обратно на 0 или minBound), тогда должно быть просто определить, есть ли определенное сообщение в истории и получить его из соответствующего индекса в кольцевом буфере.

Для удобства тестирования:

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

import qualified Data.Vector.Mutable as Vector

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

data Chan2 = Chan2
    { next          :: !Int
    , maxId         :: !Int
    , ringBuffer    :: !(Vector.IOVector ByteString.ByteString)
    }

chanSize :: Int
chanSize = 200000

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))


newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize

pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
    let ix' = if ix == chanSize then 0 else ix + 1
    in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if chanSize < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main, main1, main2 :: IO ()

main = main2

main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])

Я должен согласиться с остальными - если у вас есть жесткие ограничения в реальном времени, то использование языка GC не идеально.

Однако вы можете попробовать поэкспериментировать с другими доступными структурами данных, а не только с Data.Map.

Я переписал его с помощью Data.Sequence и получил многообещающие улучшения:

msgs history length  max GC pause (ms)
===================  =================
12500                              0.7
25000                              1.4
50000                              2.8
100000                             5.4
200000                            10.9
400000                            21.8
800000                            46
1600000                           87
3200000                          175
6400000                          350

Несмотря на то, что вы оптимизируете задержку, я заметил, что улучшаются и другие показатели. В случае 200000 время выполнения сокращается с 1,5 до 0,2 с, а общее использование памяти снижается с 600 МБ до 27 МБ.

Замечу, что я обманул, подправив дизайн:

  • Я удалил Int из Msg, поэтому его нет в двух местах.
  • Вместо использования карты от Ints до ByteStrings, я использовал Sequence of ByteStrings, а вместо один Int на сообщение, я думаю, это можно сделать с одним Int для всего Sequence. Предполагая, что сообщения не могут быть переупорядочены, вы можете использовать одно смещение, чтобы перевести какое сообщение вы хотите туда, где оно находится в очереди.

(я включил дополнительную функцию getMsg, чтобы продемонстрировать это.)

{-# LANGUAGE BangPatterns #-}

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S

newtype Msg = Msg ByteString.ByteString

data Chan = Chan Int (Seq ByteString.ByteString)

message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))

maxSize :: Int
maxSize = 200000

pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
    Exception.evaluate $
        let newSize = 1 + S.length sq
            newSq = sq |> msgContent
        in
        if newSize <= maxSize
            then Chan offset newSq
            else
                case S.viewl newSq of
                    (_ :< newSq') -> Chan (offset+1) newSq'
                    S.EmptyL -> error "Can't happen"

getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
    where
    getMsg' i
        | i < 0            = Nothing
        | i >= S.length sq = Nothing
        | otherwise        = Just (Msg (S.index sq i))

main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])

2022 WebDevInsider