Publish / Subscribe
The publish / subscribe pattern is implemented with classes zenoh::Publisher
and zenoh::Subscriber.
Publisher example:
#include "zenoh.hxx"
using namespace zenoh;
int main(int argc, char **argv) {
Config config = Config::create_default();
auto session = Session::open(std::move(config));
// Publish without creating a Publisher object
session.put(KeyExpr("demo/example/simple"), Bytes("Simple from session.put!"));
// Publish from a Publisher object
auto publisher = session.declare_publisher(KeyExpr("demo/example/simple"));
publisher.put("Simple from publisher.put!");
}
Subscriber example:
#include "zenoh.hxx"
#include <iostream>
using namespace zenoh;
int main(int argc, char **argv) {
Config config = Config::create_default();
auto session = Session::open(std::move(config));
auto subscriber = session.declare_subscriber(
KeyExpr("demo/example/simple"),
[](const Sample& sample) {
std::cout << "Received: " << sample.get_payload().as_string() << std::endl;
},
closures::none
);
// Wait for a key press to exit
char c = getchar();
}
Subscriber example with non-blocking stream interface:
#include "zenoh.hxx"
#include <chrono>
#include <iostream>
#include <thread>
using namespace zenoh;
using namespace std::chrono_literals;
int main(int argc, char **argv) {
Config config = Config::create_default();
auto session = Session::open(std::move(config));
auto subscriber = session.declare_subscriber(
KeyExpr("demo/example/simple"),
channels::FifoChannel(16) // use FIFO buffer to store unprocessed messages
);
while (true) {
auto res = subscriber.handler().try_recv();
if (std::holds_alternative<Sample>(res)) {
std::cout << "Received: " << std::get<Sample>(res).get_payload().as_string() << std::endl;
} else if (std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA) {
std::this_thread::sleep_for(1s); // do some other work
} else {
break; // channel is closed
}
}
}