У меня есть следующий код, который хочет записать данные, сгенерированные с помощью datagen, в файл, но когда я запускаю приложение, целевой каталог не создается, и данные не записываются.

Когда я добавляю env.execute() в конец кода, он сообщает, что Нет операторов, определенных в потоковой топологии. Невозможно выполнить.

Я хотел бы спросить, как заставить приложение работать, спасибо.

test("insert into table") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)
    val ddl =
      """
      create temporary table abc(
       name STRING,
       age INT
      ) with (
        'connector' = 'datagen'
      )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    val sql =
      """
        select * from abc
      """.stripMargin(' ')

    val sinkDDL =
      s"""
      create temporary table xyz(
       name STRING,
       age INT
      ) with (
       'connector' = 'filesystem',
       'path' = 'D:\\${System.currentTimeMillis()}-csv' ,
       'format' = 'csv'

      )
      """.stripMargin(' ')

    tenv.executeSql(sinkDDL)

    val insertInSQL =
      """
      insert into xyz
      select name, age from abc
      """.stripMargin(' ')

    tenv.executeSql(insertInSQL)


//    env.execute()


  }

Tom

Ответов: 2

Ответы (2)

Я думаю, что вы должны иметь UDF в выполнении таблицы, см.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/udfs.html#table-functions

Вы можете посмотреть пример, написать функцию и вставить ее в свой sql-конвейер, это работает как "оператор" в вашем сообщении об ошибке.

Я думаю, что это действительно работает, просто не когда мы думаем, что работает :)

Я проверил это с помощью планировщика ссылок во Flink 1.12:

.
"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided"

Вызов env.execute() на StreamingEnvironment в конце фактически не требуется, поскольку каждый .executeSql() ранее в программе уже отправляет асинхронные задания. Тогда сток в вашем коде привязывается к одному из этих заданий, а не к заданию, которое запускает env.execute() (которое в данном случае является пустым заданием, что вызывает ошибку, о которой вы упоминаете). Я нашел подсказку об этом в этом ответе в списке рассылки.

Когда я запускаю код в вопросе (с планировщиком Blink и адаптируя вывод к 'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv' на моем хосте), я вижу, что постепенно создается несколько скрытых файлов. Я предполагаю, что они также скрываются на вашем хосте Windows (файлы, начинающиеся с . ниже, означают скрытые в Linux):

ls -ltra /tmp/hello-flink-1609574239647-csv
total 165876
drwxrwxrwt 40 root  root      12288 Jan  2 08:57 ..
-rw-rw-r--  1 svend svend 134217771 Jan  2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-0.inprogress.48863a2b-f022-401b-95e3-659ec4920162
drwxrwxr-x  2 svend svend      4096 Jan  2 08:59 .
-rw-rw-r--  1 svend svend  35616014 Jan  2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-1.inprogress.3412bcb0-d30d-43be-819b-1acf26a0a8bb

Происходит просто то, что политика скользящей обработки FileSystem SQL connector по умолчанию ждет гораздо дольше перед фиксацией файлов.

Если вы запускаете код из IDE, вы можете адаптировать создание среды следующим образом (обычно это делается в conf/flink-conf.yaml):

  val props = new Properties
  props.setProperty("execution.checkpointing.interval", "10000")  // 10000 ms
  val conf = ConfigurationUtils.createConfiguration(props)
  val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(1, conf)

и использовать небольшой размер файла в выходном коннекторе:

      create temporary table xyz(
       name STRING,
       age INT
      ) with (
       'connector' = 'filesystem',
       'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv' ,
       'format' = 'csv',
       'sink.rolling-policy.file-size' = '1Mb'

И теперь CSV-файлы должны фиксироваться гораздо быстрее:

ls -ltra hello-flink-1609575075617-csv
total 17896
-rw-rw-r--  1 svend svend 1048669 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-0
-rw-rw-r--  1 svend svend 1048644 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-1
-rw-rw-r--  1 svend svend 1048639 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-2
-rw-rw-r--  1 svend svend 1048676 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-3
-rw-rw-r--  1 svend svend 1048680 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-4
-rw-rw-r--  1 svend svend 1048642 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-5

2022 WebDevInsider