У меня есть концентраторы событий, выводящие данные в Blob Storage с помощью функции Capture - она выводит все, что мы бросаем в очередь, в виде файла .avro.

Если я загружу этот файл и попытаюсь разобрать его с помощью библиотеки типа avro-js, у меня не возникнет никаких проблем - я смогу прекрасно прочитать файл и обработать его содержимое по своему усмотрению.

Но при работе с Azure Blob Storage с помощью Node я хотел бы обрабатывать файлы в том виде, в котором они были загружены. Формат, который я получаю при чтении файла, - это Buffer, но я не могу найти способ успешного разбора этого файла с помощью библиотеки (не могу найти правильный метод, если он есть).

Код, используемый для загрузки блоба из Azure, с несколькими опущенными битами:

const { BlobServiceClient } = require('@azure/storage-blob');
const blobServiceClient = BlobServiceClient.fromConnectionString(AZURE_STORAGE_CONNECTION_STRING);
const containerClient = blobServiceClient.getContainerClient("data");

const blockBlobClient = containerClient.getBlockBlobClient(blob.name);
    
const downloadBlockBlobResponse = await blockBlobClient.download(0);

Вывод фрагмента буфера в консоль:


Содержание при преобразовании в строку (вставка изображения, так как искаженные символы не выводятся должным образом):

введите описание изображения здесь

Попробовал прочитать файлы .avro как обычный текст, и хотя они в основном в порядке, есть несколько символов, которые искажены, и поэтому они не читаются как JSON (а я не хочу делать предположения о содержимом, чтобы попытаться вытащить тела сообщений).

Кто-нибудь успешно поднимал содержимое .avro из Azure на основе Buffers? Я вижу в сети много рекомендаций по загрузке этих файлов в Spark или Kafka, но не по простому чтению файлов в потоке.

<Спасибо!

Ответы (1)

Что касается этого вопроса, мы можем использовать пакет avsc для разбора файла avro с буфером. Для получения более подробной информации, пожалуйста, обратитесь к здесь.

Например

const avro = require("avsc");
const {
  BlobServiceClient,
  StorageSharedKeyCredential,
} = require("@azure/storage-blob");

const accountName = "";
const accountKey =
  "";
async function main() {
  const creds = new StorageSharedKeyCredential(accountName, accountKey);
  const blobServiceClient = new BlobServiceClient(
    `https://${accountName}.blob.core.windows.net`,
    creds
  );
  const containerClient = blobServiceClient.getContainerClient("");
  const blockBlobClient = containerClient.getBlockBlobClient(
    ""
  );

  const downloadBlockBlobResponse = await blockBlobClient.download(0);
  const buf = await streamToBuffer(
    downloadBlockBlobResponse.readableStreamBody
  );

  const decoder = new avro.streams.BlockDecoder({
    parseHook: (schema) => {
      console.log("the avro file schema:");
      console.log(schema);
      return avro.Type.forSchema(schema, { wrapUnions: true });
    },
  });

  decoder.on("data", (data) => {
    console.log(data);
  });
  decoder.end(buf);
}
async function streamToBuffer(readableStream) {
  return new Promise((resolve, reject) => {
    const chunks = [];
    readableStream.on("data", (data) => {
      chunks.push(data instanceof Buffer ? data : Buffer.from(data));
    });
    readableStream.on("end", () => {
      resolve(Buffer.concat(chunks));
    });
    readableStream.on("error", reject);
  });
}
main();

введите описание изображения здесь

2022 WebDevInsider