Publish / Subscribe

The publish / subscribe pattern is implemented with classes zenoh::Publisher and zenoh::Subscriber.

Publisher example:

#include "zenoh.hxx"

using namespace zenoh;

int main(int argc, char **argv) {
   Config config = Config::create_default();
   auto session = Session::open(std::move(config));
   // Publish without creating a Publisher object
   session.put(KeyExpr("demo/example/simple"), Bytes("Simple from session.put!"));

   // Publish from a Publisher object
   auto publisher = session.declare_publisher(KeyExpr("demo/example/simple"));
   publisher.put("Simple from publisher.put!");
}

Subscriber example:

#include "zenoh.hxx"
#include <iostream>

using namespace zenoh;

int main(int argc, char **argv) {
   Config config = Config::create_default();
   auto session = Session::open(std::move(config));
   auto subscriber = session.declare_subscriber(
      KeyExpr("demo/example/simple"),
      [](const Sample& sample) {
         std::cout << "Received: " << sample.get_payload().as_string() << std::endl;
      },
      closures::none
   );
   // Wait for a key press to exit
   char c = getchar();
}

Subscriber example with non-blocking stream interface:

#include "zenoh.hxx"
#include <chrono>
#include <iostream>
#include <thread>

using namespace zenoh;
using namespace std::chrono_literals;

int main(int argc, char **argv) {
   Config config = Config::create_default();
   auto session = Session::open(std::move(config));
   auto subscriber = session.declare_subscriber(
      KeyExpr("demo/example/simple"),
      channels::FifoChannel(16)  // use FIFO buffer to store unprocessed messages
   );
   while (true) {
      auto res = subscriber.handler().try_recv();
      if (std::holds_alternative<Sample>(res)) {
         std::cout << "Received: " << std::get<Sample>(res).get_payload().as_string() << std::endl;
      } else if (std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA) {
         std::this_thread::sleep_for(1s); // do some other work
      } else {
         break; // channel is closed
      }
   }
}