В данный момент я пишу чистую библиотеку Rust MQTT5 (я знаю, что существуют уже существующие, но я больше пытаюсь изучить Rust) и наткнулся на эту проблему.

Я использую последнюю стабильную версию rust с tokio 1.0.1.

Когда я отправляю пакет по проводу, я часто ожидаю ответа от сервера (пример ниже PingReq/PingAck, Ping/Pong).

Отбросив много логики, касающейся таймаутов и столкновений пакетов, я написал упрощенную версию логики на JavaScript (поскольку я знаю его достаточно хорошо).

Как эта логика будет переведена на язык Rust и его фьючерсы? Или, чтобы быть более ясным: могу ли я как-то воссоздать поведение функции обратного вызова resolve() в awaitPackage + onIncomingPacket?

class Client {
  awaitedPacketTypes = {};

  /**
   * a ping consist of a send ping and a receive pong
   */
  async ping(){
    await this.sendPacket("Ping");
    return await this.awaitPackage("Pong");
  }

  async sendPacket(packetType) { /*...*/ }
  
  /**
   * This expects a specific packet type to be received in the future
   * @param {*} packetType 
   */
  awaitPackage(packetType) {
    return new Promise((resolve, reject) => {
      this.awaitedPacketTypes[packetType] = {
        resolve,
        reject
      };
    });
  }

  /**
   * This gets called for every packet from the network side and calls the correct resolver if something waits for this packet type
   * @param {*} packet 
   */
  onIncomingPacket(packet) {
    if(this.awaitedPacketTypes[packet.type]) {
      this.awaitedPacketTypes[packet.type].resolve(packet);
      this.awaitedPacketTypes[packet.type] = undefined;
    } else {
      /*...*/
    }
  }
}

Snapstromegon

Ответов: 1

Ответы (1)

Или, чтобы быть более ясным: могу ли я как-то воссоздать поведение функции обратного вызова resolve() в awaitPackage + onIncomingPacket?

Кинда? Rust Future - это только "что-то, что можно опросить на предмет готовности", это концепция гораздо более низкого уровня, чем JS promise.

Есть библиотеки, которые утверждают, что предоставляют обещания в стиле JS, но большинство асинхронных библиотек, вероятно, предоставляют аналогичный объект с другим названием. Например, в Tokio вам, вероятно, нужен oneshot channel, то есть канал, по которому может быть отправлено одно значение, что приводит к чему-то вроде:

struct Packet { r#type: &'static str }
struct Client {
  ожидание: Mutex>>
}

impl Client {
    async fn ping(&self) -> Packet {
        self.send_packet("Pong").await;
        self.await_package("Pong").await.unwrap()
    }
    async fn send_packet(&self, _: &'static str) {}
    fn await_package(&self, packet_type: &'static str) -> Receiver {
        let (tx, rx) = channel();
        self.awaited.lock().unwrap().insert(packet_type, tx);
        rx
    }
    fn on_incoming_packet(&self, packet: Packet) {
        if let Some(tx) = self.awaited.lock().unwrap().remove(packet.r#type) {
            tx.send(packet);
        }
    }
}

2022 WebDevInsider