Program Listing for File zmqutil.cpp¶
↰ Return to documentation for file (lib/utilities/zmqutil.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "zmqutil.hpp"
#include <string>
// Convert string to 0MQ string and send to socket
bool s_send(zmq::socket_t &socket, const std::string &string, int more) {
//zmq::message_t message(string.size());
zmq_msg_t message;
zmq_msg_init_size(&message, string.size());
memcpy(zmq_msg_data(&message), string.data(), string.size());
std::size_t rc = zmq_msg_send(&message, socket, more);
zmq_msg_close(&message);
return rc == string.size();
}
bool s_send_multi(zmq::socket_t &socket, const zmq_frames &frames) {
if (frames.empty())
return true;
// all frames but last one
for (unsigned int i = 0; i < frames.size() - 1; ++i)
if (!s_send(socket, frames[i], ZMQ_SNDMORE))
return false;
// last frame
return s_send(socket, frames.back());
}
bool sockopt_rcvmore(zmq::socket_t &socket) {
int64_t rcvmore = 0;
size_t type_size = sizeof(int64_t);
zmq_getsockopt(socket, ZMQ_RCVMORE, &rcvmore, &type_size);
return rcvmore != 0;
}
// Receive 0MQ string from socket and convert into string
bool s_recv(zmq::socket_t &socket, std::string &s_message, int more) {
zmq_msg_t message;
zmq_msg_init (&message);
int size = zmq_msg_recv(&message, socket, more);
if (size == -1) {
return false;
}
s_message.assign(static_cast<char *>(zmq_msg_data(&message)), zmq_msg_size(&message));
zmq_msg_close(&message);
return true;
}
/*bool s_nonblocking_recv(zmq::socket_t &socket, std::string &s_message) {
zmq_msg_t message;
zmq_msg_init (&message);
int size = zmq_msg_recv(&message, socket, ZMQ_DONTWAIT);
if (size == -1) {
return false;
}
s_message.assign(static_cast<char *>(zmq_msg_data(&message)), zmq_msg_size(&message));
zmq_msg_close(&message);
return true;
}*/
zmq_frames s_blocking_recv_multi(zmq::socket_t &socket) {
zmq_frames frames;
std::string message;
do {
s_recv(socket, message);
frames.push_back(message);
message.clear();
} while (sockopt_rcvmore(socket));
return frames;
}
bool s_nonblocking_recv_multi(zmq::socket_t &socket, zmq_frames &frames) {
std::string message;
do {
if (!s_recv(socket, message, ZMQ_DONTWAIT)) {
break;
}
frames.push_back(message);
message.clear();
} while (sockopt_rcvmore(socket));
return !frames.empty();
}