Библиотека Akka Streams уже содержит обширную документацию. Однако главная проблема для меня в том, что в нем содержится слишком много материала - я чувствую себя подавленным количеством концепций, которые мне нужно выучить. Множество показанных там примеров кажутся очень тяжелыми и не могут быть легко переведены на реальные варианты использования и поэтому являются довольно эзотерическими. Я думаю, что он дает слишком много деталей, не объясняя, как собрать все строительные блоки вместе и как именно это помогает решать конкретные проблемы.

Есть источники, приемники, потоки, этапы графа, частичные графы, материализация, DSL графа и многое другое, и я просто не знаю, с чего начать. Краткое руководство предназначено для начала, но я его не понимаю. Он просто добавляет концепции, упомянутые выше, без их объяснения. Кроме того, примеры кода не могут быть выполнены - есть недостающие части, что делает более или менее невозможным для меня следить за текстом.

Может ли кто-нибудь объяснить источники концепций, приемники, потоки, этапы графа, частичные графы, материализацию и, возможно, некоторые другие вещи, которые я пропустил, простыми словами и с легкими примерами, которые не объясняют каждую деталь (и которые, вероятно, не являются все равно нужно в начале)?

kiritsuku

Ответов: 1

Ответы (1)

Этот ответ основан на akka-stream версии 2.4.2. В других версиях API может немного отличаться. Зависимость может потреблять sbt:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

Хорошо, приступим. API Akka Streams состоит из трех основных типов. В отличие от Reactive Streams, эти типы намного более мощные и, следовательно, более сложные. Предполагается, что для всех примеров кода уже существуют следующие определения:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

Операторы import необходимы для объявления типов.система представляет систему актеров Akka, а материализатор представляет контекст оценки потока. В нашем случае мы используем ActorMaterializer, что означает, что потоки оцениваются поверх актеров. Оба значения помечены как implicit, что дает компилятору Scala возможность автоматически вводить эти две зависимости всякий раз, когда они необходимы. Мы также импортируем system.dispatcher, который является контекстом выполнения для Futures.

Новый API

У потоков Akka есть следующие ключевые свойства:

  • Они реализуют спецификацию Reactive Streams, три основные цели которой - противодавление, асинхронные и неблокирующие границы, а также взаимодействие между различными реализациями полностью применимы и к Akka Streams.
  • Они предоставляют абстракцию для механизма оценки потоков, который называется Materializer.
  • Программы сформулированы в виде многократно используемых строительных блоков, которые представлены тремя основными типами: Источник, Приемник и Поток. Строительные блоки образуют график, оценка которого основана на Materializer и требует явного запуска.

Далее будет дано более глубокое введение в использование трех основных типов.

Источник

A Source - создатель данных, он служит источником ввода для потока. Каждый Источник имеет один выходной канал и не имеет входного канала. Все данные проходят через выходной канал ко всему, что подключено к Source.

Source

Изображение взято с boldradius.com.

A Источник можно создать несколькими способами:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

В приведенных выше случаях мы загрузили Source с конечными данными, что означает, что они в конечном итоге прекратят работу. Не следует забывать, что реактивные потоки по умолчанию ленивы и асинхронны. Это означает, что нужно явно запросить оценку потока. В Akka Streams это можно сделать с помощью методов run *. runForeach не будет отличаться от хорошо известной функции foreach - с помощью добавления run он явно указывает, что мы запрашиваем оценку потока. Поскольку конечные данные утомительны, мы продолжаем с бесконечными:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

С помощью метода take мы можем создать искусственную точку остановки, которая не позволяет нам оценивать бесконечно долго. Поскольку поддержка акторов встроена, мы также можем легко кормить поток сообщениями, которые отправляются актеру:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

Мы видим, что Futures выполняются асинхронно в разных потоках, что объясняет результат. В приведенном выше примере буфер для входящих элементов не требуется, и поэтому с помощью OverflowStrategy.fail мы можем настроить, что поток должен завершаться ошибкой при переполнении буфера. В частности, через этот интерфейс актора мы можем передавать поток через любой источник данных. Не имеет значения, созданы ли данные тем же потоком, другим потоком, другим процессом или они поступают из удаленной системы через Интернет.

Мойка

A Sink в основном противоположен Source. Это конечная точка потока и, следовательно, потребляет данные. A Sink имеет один входной канал и не имеет выходного канала.Приемники особенно необходимы, когда мы хотим указать поведение сборщика данных с возможностью многократного использования и без оценки потока. Уже известные методы run * не позволяют нам использовать эти свойства, поэтому вместо этого рекомендуется использовать Sink.

Sink

Изображение взято с boldradius.com.

Краткий пример работы Sink:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

Подключение Source к Sink может быть выполнено с помощью метода to. Он возвращает так называемый RunnableFlow, который, как мы позже увидим, является специальной формой Flow - потока, который можно выполнить, просто вызвав его run () метод.

Runnable Flow

Изображение взято с boldradius.com.

Конечно, все значения, поступающие в приемник, можно пересылать актеру:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Расход

Источники и приемники данных отлично подходят, если вам нужно соединение между потоками Akka и существующей системой, но с ними ничего нельзя сделать. Потоки - это последний недостающий элемент в базовой абстракции Akka Streams. Они действуют как связующее звено между разными потоками и могут использоваться для преобразования его элементов.

Flow

Изображение взято с boldradius.com.

Если Flow подключен к Source, результатом будет новый Source. Аналогично, Flow, подключенный к Sink, создает новый Sink. И Flow, подключенный как к Source, так и Sink, приводит к RunnableFlow. Следовательно, они находятся между входным и выходным каналом, но сами по себе не соответствуют одному из вариантов, пока они не подключены ни к Source, ни к Sink.

Full Stream

Изображение взято с boldradius.com.

Чтобы лучше понять Flows, мы рассмотрим несколько примеров:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

Via the via method we can connect a Source with a Flow. We need to specify the input type because the compiler can't infer it for us. As we can already see in this simple example, the flows invert and double are completely independent from any data producers and consumers. They only transform the data and forward it to the output channel. This means that we can reuse a flow among multiple streams:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1 и s2 представляют собой совершенно новые потоки - они не передают никакие данные через свои строительные блоки.

Неограниченные потоки данных

Прежде чем мы продолжим, мы должны сначала вернуться к некоторым ключевым аспектам Reactive Streams. Неограниченное количество элементов может прибыть в любую точку и может переводить поток в разные состояния. Помимо работающего потока, что является обычным состоянием, поток может быть остановлен либо из-за ошибки, либо из-за сигнала, который означает, что никакие дальнейшие данные не поступят. Поток можно смоделировать графическим способом, отмечая события на временной шкале, как здесь:

Shows that a stream is a sequence of ongoing events ordered in time

Изображение взято из Введение в реактивное программирование, которое вы пропустили.

Мы уже видели запускаемые потоки в примерах предыдущего раздела. Мы получаем RunnableGraph всякий раз, когда поток действительно может быть материализован, что означает, что Sink подключен к Source. До сих пор мы всегда материализовались в значение Unit, которое можно увидеть в типах:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

Для Source и Sink параметр второго типа, а для Flow параметр третьего типа обозначает материализованное значение. В этом ответе не объясняется полное значение материализации. Однако более подробную информацию о материализации можно найти в официальной документации . На данный момент нам нужно знать только то, что материализованное значение - это то, что мы получаем, когда запускаем поток. Поскольку до сих пор нас интересовали только побочные эффекты, в качестве материализованного значения мы получили Unit. Исключением была материализация раковины, которая привела к Future. Он вернул нам Future, поскольку это значение может обозначать, когда поток, подключенный к приемнику, был завершен. До сих пор предыдущие примеры кода хорошо объясняли концепцию, но они также были скучными, потому что мы имели дело только с конечными потоками или с очень простыми бесконечными потоками. Чтобы сделать его более интересным, ниже будет объяснен полный асинхронный и неограниченный поток.

Пример ClickStream

В качестве примера мы хотим иметь поток, который фиксирует события щелчка. Чтобы усложнить задачу, допустим, мы также хотим сгруппировать события щелчка, которые происходят через короткое время друг за другом. Таким образом, мы могли легко обнаружить двойные, тройные или десятикратные щелчки. Кроме того, мы хотим отфильтровать все отдельные клики. Сделайте глубокий вдох и представьте, как вы решите эту проблему императивным образом. Бьюсь об заклад, никто не сможет реализовать решение, которое работает правильно с первой попытки. Реактивно эту проблему решить тривиально. Фактически, решение настолько простое и понятное для реализации, что мы можем даже выразить его в диаграмме, которая непосредственно описывает поведение кода:

The logic of the click stream example

Изображение взято из Введение в реактивное программирование, которое вы пропустили.

Серые прямоугольники - это функции, которые описывают, как один поток преобразуется в другой. С функцией throttle мы накапливаем клики в течение 250 миллисекунд, функции map и filter не требуют пояснений. Цветные шары представляют собой событие, а стрелки показывают, как они проходят через наши функции. Позже на этапах обработки мы получаем все меньше и меньше элементов, которые проходят через наш поток, поскольку мы группируем их вместе и отфильтровываем. Код этого изображения будет выглядеть примерно так:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

Вся логика может быть представлена ​​всего в четырех строчках кода! В Scala мы могли бы написать его еще короче:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

Определение clickStream немного сложнее, но это только так, потому что пример программы работает на JVM, где захват событий щелчка не так легко возможен. Еще одна сложность заключается в том, что Akka по умолчанию не предоставляет функцию throttle. Вместо этого мы должны были написать это сами. Поскольку эта функция (как и в случае функций map или filter) может повторно использоваться в разных случаях использования, я не считаю эти строки количеством строк, которые нам нужно было реализовать. логика. Однако в императивных языках нормально, что логику нельзя повторно использовать так легко, и что различные логические шаги выполняются в одном месте, а не применяются последовательно, что означает, что мы, вероятно, исказили бы наш код с помощью логики дросселирования. Полный пример кода доступен как gist и не будет здесь больше обсуждаться.

Пример SimpleWebServer

Вместо этого следует обсудить другой пример. Хотя поток кликов является хорошим примером, позволяющим Akka Streams обрабатывать реальный пример, ему не хватает мощности для демонстрации параллельного выполнения в действии. В следующем примере будет представлен небольшой веб-сервер, который может обрабатывать несколько запросов параллельно. Веб-сервер должен иметь возможность принимать входящие соединения и получать от них байтовые последовательности, представляющие печатаемые знаки ASCII. Эти байтовые последовательности или строки должны быть разделены на все символы новой строки на более мелкие части. После этого сервер должен ответить клиенту каждой из разделенных строк. В качестве альтернативы он мог бы сделать что-нибудь еще со строками и выдать специальный токен ответа, но мы хотим, чтобы в этом примере он был простым и поэтому не вводил никаких необычных функций. Помните, что сервер должен иметь возможность обрабатывать несколько запросов одновременно, что в основном означает, что ни одному запросу не разрешено блокировать любой другой запрос от дальнейшего выполнения. Решить все эти требования может быть непросто - с Akka Streams, однако, нам не нужно больше, чем несколько строк, чтобы решить любую из них. Во-первых, давайте рассмотрим сам сервер:

server

По сути, есть только три основных строительных блока. Первый должен принимать входящие соединения. Второй нужен для обработки входящих запросов, а третий - для отправки ответа. Реализация всех этих трех строительных блоков лишь немного сложнее, чем реализация потока кликов:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

Функция mkServer принимает (помимо адреса и порта сервера) также систему акторов и материализатор в качестве неявных параметров. Поток управления сервера представлен привязкой, которая принимает источник входящих подключений и перенаправляет их в приемник входящих подключений. Внутри connectionHandler, который является нашим приемником, мы обрабатываем каждое соединение потоком serverLogic, который будет описан позже.привязка возвращает Future, которое завершается, когда сервер был запущен или запуск не удался, что может иметь место, когда порт уже занят другим процессом. Однако код не полностью отражает графику, поскольку мы не видим строительный блок, который обрабатывает ответы. Причина в том, что соединение уже само по себе обеспечивает эту логику. Это двунаправленный поток, а не просто однонаправленный, как потоки, которые мы видели в предыдущих примерах. Как и в случае с материализацией, такие сложные потоки здесь не объясняются. Официальная документация содержит много материала для покрытия более сложных потоковых диаграмм. На данный момент достаточно знать, что Tcp.IncomingConnection представляет собой соединение, которое знает, как получать запросы и как отправлять ответы. Часть, которая все еще отсутствует, - это строительный блок serverLogic. Это может выглядеть так:

server logic

И снова мы можем разделить логику на несколько простых строительных блоков, которые вместе образуют поток нашей программы. Сначала мы хотим разбить нашу последовательность байтов на строки, что мы должны делать всякий раз, когда мы находим символ новой строки. После этого байты каждой строки необходимо преобразовать в строку, потому что работать с необработанными байтами сложно. В целом мы могли получить двоичный поток сложного протокола, что сделало бы работу с входящими необработанными данными чрезвычайно сложной. Получив читаемую строку, мы можем создать ответ. Для простоты в нашем случае ответ может быть любым. В конце концов, мы должны преобразовать наш ответ в последовательность байтов, которую можно отправить по сети. Код всей логики может выглядеть так:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

Мы уже знаем, что serverLogic - это поток, который принимает ByteString и должен создавать ByteString. С помощью разделителя мы можем разделить ByteString на более мелкие части - в нашем случае это должно происходить всякий раз, когда встречается символ новой строки.получатель - это поток, который принимает все разбитые последовательности байтов и преобразует их в строку. Это, конечно, опасное преобразование, поскольку в строку следует преобразовывать только печатаемые символы ASCII, но для наших нужд этого достаточно.респондент - последний компонент, который отвечает за создание ответа и преобразование ответа обратно в последовательность байтов. В отличие от графики, мы не разделили этот последний компонент на две части, поскольку логика тривиальна. В конце мы подключаем все потоки через через функцию. Здесь можно спросить, позаботились ли мы о многопользовательском свойстве, о котором говорилось в начале. И действительно, мы это сделали, хотя это может быть не сразу очевидно. Глядя на этот рисунок, он должен стать более ясным:

server and server logic combined

Компонент serverLogic - это не что иное, как поток, содержащий более мелкие потоки. Этот компонент принимает ввод, который является запросом, и производит вывод, который является ответом. Поскольку потоки могут быть построены несколько раз, и все они работают независимо друг от друга, мы достигаем посредством этого вложения нашего многопользовательского свойства. Каждый запрос обрабатывается в рамках своего собственного запроса, и поэтому короткий текущий запрос может перекрыть ранее запущенный длительный запрос. Если вам интересно, определение serverLogic, которое было показано ранее, конечно, может быть написано намного короче путем встраивания большинства его внутренних определений:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

Тест веб-сервера может выглядеть так:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

Чтобы приведенный выше пример кода работал правильно, нам сначала нужно запустить сервер, который изображен скриптом startServer:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

Полный пример кода этого простого TCP-сервера можно найти здесь. Мы умеем писать не только сервер с Akka Streams, но и клиента. Это может выглядеть так:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

Полный код TCP-клиента можно найти здесь. Код выглядит очень похоже, но, в отличие от сервера, нам больше не нужно управлять входящими подключениями.

Сложные графики

В предыдущих разделах мы видели, как мы можем создавать простые программы из потоков. Однако в действительности часто недостаточно просто полагаться на уже встроенные функции для создания более сложных потоков. Если мы хотим иметь возможность использовать Akka Streams для произвольных программ, нам нужно знать, как создавать собственные настраиваемые структуры управления и комбинируемые потоки, которые позволяют нам решать сложность наших приложений. Хорошей новостью является то, что Akka Streams был разработан для масштабирования с учетом потребностей пользователей, и чтобы дать вам краткое введение в более сложные части Akka Streams, мы добавляем еще несколько функций в наш пример клиент / сервер.

Мы пока не можем закрыть соединение. На этом этапе все становится немного сложнее, потому что API потока, который мы видели до сих пор, не позволяет нам остановить поток в произвольной точке. Однако существует абстракция GraphStage, которую можно использовать для создания произвольных этапов обработки графа с любым количеством портов ввода или вывода. Давайте сначала посмотрим на серверную часть, где мы представляем новый компонент, называемый closeConnection:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

Этот API выглядит намного более громоздким, чем API потока. Неудивительно, что здесь нужно сделать очень много императивных шагов. Взамен у нас есть больший контроль над поведением наших потоков. В приведенном выше примере мы указываем только один входной и один выходной порт и делаем их доступными для системы, переопределяя значение shape. Кроме того, мы определили так называемые InHandler и OutHandler, которые в этом порядке отвечают за получение и отправку элементов. Если вы внимательно посмотрели на пример полного потока кликов, вы уже должны распознать эти компоненты. В InHandler мы захватываем элемент, и если это строка с одним символом 'q', мы хотим закрыть поток. Чтобы дать клиенту возможность узнать, что поток скоро закроется, мы выдаем строку «BYE», а затем сразу же закрываем этап. Компонент closeConnection может быть объединен с потоком с помощью метода через, который был представлен в разделе о потоках.

Помимо возможности закрывать соединения, было бы неплохо, если бы мы могли показать приветственное сообщение для вновь созданного соединения. Для этого нам еще раз нужно пойти немного дальше:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

Функция serverLogic теперь принимает входящее соединение в качестве параметра. Внутри его тела мы используем DSL, который позволяет нам описывать сложное поведение потока. С помощью welcome мы создаем поток, который может выдавать только один элемент - приветственное сообщение.логика - это то, что было описано как serverLogic в предыдущем разделе. Единственное заметное отличие состоит в том, что мы добавили к нему closeConnection. А теперь перейдем к самой интересной части DSL. Функция GraphDSL.create делает доступным построитель b, который используется для представления потока в виде графика. С помощью функции ~> можно соединять порты ввода и вывода друг с другом. Компонент Concat, который используется в примере, может объединять элементы и здесь используется для добавления приветственного сообщения перед другими элементами, выходящими из internalLogic. В последней строке мы делаем доступными только входной порт логики сервера и выходной порт объединенного потока, потому что все остальные порты должны оставаться деталями реализации компонента serverLogic. Для более подробного ознакомления с графическим DSL потоков Akka посетите соответствующий раздел официальной документации . Полный пример кода сложного TCP-сервера и клиента, который может с ним взаимодействовать, можно найти здесь. Всякий раз, когда вы открываете новое соединение от клиента, вы должны видеть приветственное сообщение, а набрав "q" на клиенте, вы должны увидеть сообщение о том, что соединение было отменено.

Есть еще некоторые темы, которые не были охвачены этим ответом. Особенно материализация может напугать того или иного читателя, но я уверен, что с материалом, который здесь освещен, каждый сможет сделать следующие шаги самостоятельно. Как уже было сказано, официальная документация - хорошее место для продолжения изучения Akka Streams.

2022 WebDevInsider