From b25dc9bcc2baddbdd9fc17981062486029eaa6e5 Mon Sep 17 00:00:00 2001
From: Grzegorz Mazur <teoretyk@gmail.com>
Date: Sat, 13 Nov 2021 21:24:53 +0100
Subject: [PATCH] get rid of zmpp closes #338

---
 cyacas/yacas-kernel/CMakeLists.txt           |   6 +-
 cyacas/yacas-kernel/include/yacas_engine.hpp |   6 +-
 cyacas/yacas-kernel/include/yacas_kernel.hpp |  24 ++--
 cyacas/yacas-kernel/src/yacas_engine.cpp     |  29 ++---
 cyacas/yacas-kernel/src/yacas_kernel.cpp     | 113 +++++++++----------
 5 files changed, 85 insertions(+), 93 deletions(-)

diff --git a/cyacas/yacas-kernel/CMakeLists.txt b/cyacas/yacas-kernel/CMakeLists.txt
index fe1600aa7..da9b0988a 100644
--- a/cyacas/yacas-kernel/CMakeLists.txt
+++ b/cyacas/yacas-kernel/CMakeLists.txt
@@ -16,12 +16,10 @@
 #
 #
 
+
 find_path (ZEROMQ_INCLUDE_DIR zmq.hpp)
 find_library (ZEROMQ_LIBRARY NAMES zmq)
 
-find_path (ZMQPP_INCLUDE_DIR zmqpp.hpp)
-find_library (ZMQPP_LIBRARY NAMES zmqpp)
-
 find_path (JSONCPP_INCLUDE_DIR json.h)
 find_library (JSONCPP_LIBRARY NAMES jsoncpp)
 
@@ -31,6 +29,6 @@ find_package (Boost REQUIRED date_time filesystem)
 include_directories (include)
 
 add_executable (yacas-kernel src/main.cpp src/yacas_kernel.cpp src/yacas_engine.cpp src/hmac_sha256.cpp src/base64.cpp)
-target_link_libraries (yacas-kernel libyacas ${ZMQPP_LIBRARY} ${ZEROMQ_LIBRARY} ${JSONCPP_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} ${Boost_LIBRARIES} pthread ${CMAKE_DL_LIBS})
+target_link_libraries (yacas-kernel libyacas ${ZEROMQ_LIBRARY} ${JSONCPP_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} ${Boost_LIBRARIES} pthread ${CMAKE_DL_LIBS})
 
 install (TARGETS yacas-kernel DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/cyacas/yacas-kernel/include/yacas_engine.hpp b/cyacas/yacas-kernel/include/yacas_engine.hpp
index 16a1ac800..b0929ece8 100644
--- a/cyacas/yacas-kernel/include/yacas_engine.hpp
+++ b/cyacas/yacas-kernel/include/yacas_engine.hpp
@@ -27,7 +27,7 @@
 
 #include "yacas/yacas.h"
 
-#include <zmqpp/zmqpp.hpp>
+#include <zmq.hpp>
 
 #include <atomic>
 #include <condition_variable>
@@ -40,7 +40,7 @@
 class YacasEngine {
 public:
     YacasEngine(const std::string& scripts_path,
-                const zmqpp::context& ctx,
+                zmq::context_t& ctx,
                 const std::string& endpoint = "inproc://engine");
 
     ~YacasEngine();
@@ -65,7 +65,7 @@ class YacasEngine {
 
     std::thread* _worker_thread;
 
-    zmqpp::socket _socket;
+    zmq::socket_t _socket;
 
     std::atomic<bool> _shutdown;
 };
diff --git a/cyacas/yacas-kernel/include/yacas_kernel.hpp b/cyacas/yacas-kernel/include/yacas_kernel.hpp
index 91d36ac01..1cc4eabf9 100644
--- a/cyacas/yacas-kernel/include/yacas_kernel.hpp
+++ b/cyacas/yacas-kernel/include/yacas_kernel.hpp
@@ -30,8 +30,8 @@
 
 #include <boost/uuid/random_generator.hpp>
 #include <jsoncpp/json/json.h>
-#include <zmqpp/zmqpp.hpp>
-
+#include <zmq.hpp>
+#include <zmq_addon.hpp>
 #include <map>
 #include <sstream>
 
@@ -60,12 +60,12 @@ class YacasKernel : NonCopyable {
 
     class Request : NonCopyable {
     public:
-        Request(const Session& session, const zmqpp::message& msg);
+        Request(const Session& session, const zmq::multipart_t& msg);
 
         const Json::Value& header() const { return _header; }
         const Json::Value& content() const { return _content; }
 
-        void reply(zmqpp::socket&,
+        void reply(zmq::socket_t&,
                    const std::string& type,
                    const Json::Value& content) const;
 
@@ -78,19 +78,19 @@ class YacasKernel : NonCopyable {
     };
 
     void _handle_shell(const std::shared_ptr<Request>& request);
-    void _handle_engine(const zmqpp::message& msg);
+    void _handle_engine(const zmq::multipart_t& msg);
 
     Session _session;
 
-    zmqpp::context _ctx;
+    zmq::context_t _ctx;
 
-    zmqpp::socket _hb_socket;
-    zmqpp::socket _iopub_socket;
-    zmqpp::socket _control_socket;
-    zmqpp::socket _stdin_socket;
-    zmqpp::socket _shell_socket;
+    zmq::socket_t _hb_socket;
+    zmq::socket_t _iopub_socket;
+    zmq::socket_t _control_socket;
+    zmq::socket_t _stdin_socket;
+    zmq::socket_t _shell_socket;
 
-    zmqpp::socket _engine_socket;
+    zmq::socket_t _engine_socket;
 
     unsigned long _execution_count;
 
diff --git a/cyacas/yacas-kernel/src/yacas_engine.cpp b/cyacas/yacas-kernel/src/yacas_engine.cpp
index 6ed60ca3b..3226b113c 100644
--- a/cyacas/yacas-kernel/src/yacas_engine.cpp
+++ b/cyacas/yacas-kernel/src/yacas_engine.cpp
@@ -22,15 +22,16 @@
  * Created on November 7, 2015, 12:52 PM
  */
 
-#include <jsoncpp/json/writer.h>
-
 #include "yacas_engine.hpp"
 
+#include <jsoncpp/json/writer.h>
+#include <zmq_addon.hpp>
+
 YacasEngine::YacasEngine(const std::string& scripts_path,
-                         const zmqpp::context& ctx,
+                         zmq::context_t& ctx,
                          const std::string& endpoint) :
     _yacas(_side_effects),
-    _socket(ctx, zmqpp::socket_type::pair),
+    _socket(ctx, zmq::socket_type::pair),
     _shutdown(false)
 {
     _yacas.Evaluate(std::string("DefaultDirectory(\"") + scripts_path +
@@ -90,11 +91,11 @@ void YacasEngine::_worker()
         Json::Value calculate_content;
         calculate_content["id"] = Json::Value::UInt64(ti.id);
         calculate_content["expr"] = ti.expr;
-        zmqpp::message status_msg;
-        status_msg << "calculate"
-                   << Json::writeString(Json::StreamWriterBuilder(),
-                                        calculate_content);
-        _socket.send(status_msg);
+        zmq::multipart_t status_msg;
+        status_msg.addstr("calculate");
+        status_msg.addstr(Json::writeString(Json::StreamWriterBuilder(),
+                                        calculate_content));
+        status_msg.send(_socket);
 
         _side_effects.clear();
         _side_effects.str("");
@@ -111,10 +112,10 @@ void YacasEngine::_worker()
 
         result_content["side_effects"] = _side_effects.str();
 
-        zmqpp::message result_msg;
-        result_msg << "result"
-                   << Json::writeString(Json::StreamWriterBuilder(),
-                                        result_content);
-        _socket.send(result_msg);
+        zmq::multipart_t result_msg;
+        result_msg.addstr("result");
+        result_msg.addstr(Json::writeString(Json::StreamWriterBuilder(),
+                                        result_content));
+        result_msg.send(_socket);
     }
 }
diff --git a/cyacas/yacas-kernel/src/yacas_kernel.cpp b/cyacas/yacas-kernel/src/yacas_kernel.cpp
index d8f35914f..f2667d02c 100644
--- a/cyacas/yacas-kernel/src/yacas_kernel.cpp
+++ b/cyacas/yacas-kernel/src/yacas_kernel.cpp
@@ -30,8 +30,6 @@
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/uuid/uuid_io.hpp>
 
-#include <fstream>
-#include <iostream>
 #include <regex>
 #include <set>
 #include <string>
@@ -48,12 +46,12 @@ namespace {
 YacasKernel::YacasKernel(const std::string& scripts_path,
                          const Json::Value& config) :
     _session(config["key"].asString()),
-    _hb_socket(_ctx, zmqpp::socket_type::reply),
-    _iopub_socket(_ctx, zmqpp::socket_type::publish),
-    _control_socket(_ctx, zmqpp::socket_type::router),
-    _stdin_socket(_ctx, zmqpp::socket_type::router),
-    _shell_socket(_ctx, zmqpp::socket_type::router),
-    _engine_socket(_ctx, zmqpp::socket_type::pair),
+    _hb_socket(_ctx, zmq::socket_type::rep),
+    _iopub_socket(_ctx, zmq::socket_type::pub),
+    _control_socket(_ctx, zmq::socket_type::router),
+    _stdin_socket(_ctx, zmq::socket_type::router),
+    _shell_socket(_ctx, zmq::socket_type::router),
+    _engine_socket(_ctx, zmq::socket_type::pair),
     _execution_count(1),
     _engine(scripts_path, _ctx, "inproc://engine"),
     _tex_output(true),
@@ -82,33 +80,35 @@ YacasKernel::YacasKernel(const std::string& scripts_path,
 
 void YacasKernel::run()
 {
-    zmqpp::poller poller;
-
-    poller.add(_hb_socket);
-    poller.add(_control_socket);
-    poller.add(_stdin_socket);
-    poller.add(_shell_socket);
-    poller.add(_iopub_socket);
-    poller.add(_engine_socket);
+    std::vector<zmq::pollitem_t> items{
+        {_hb_socket, 0, ZMQ_POLLIN, 0},
+        {_control_socket, 0, ZMQ_POLLIN, 0},
+        {_stdin_socket, 0, ZMQ_POLLIN, 0},
+        {_shell_socket, 0, ZMQ_POLLIN, 0},
+        {_iopub_socket, 0, ZMQ_POLLIN, 0},
+        {_engine_socket, 0, ZMQ_POLLIN, 0}
+    };
 
     for (;;) {
-        poller.poll();
+        zmq::poll(items);
+
+        std::cerr << "toratoratora" << std::endl;
 
-        if (poller.has_input(_hb_socket)) {
-            zmqpp::message msg;
-            _hb_socket.receive(msg);
+        if (items[0].revents & ZMQ_POLLIN) { // heartbeat
+            zmq::message_t msg;
+            _hb_socket.recv(msg);
             _hb_socket.send(msg);
         }
 
-        if (poller.has_input(_shell_socket)) {
-            zmqpp::message msg;
-            _shell_socket.receive(msg);
+        if (items[3].revents & ZMQ_POLLIN) { // shell
+            zmq::multipart_t msg;
+            msg.recv(_shell_socket);
             _handle_shell(std::make_shared<Request>(_session, msg));
         }
 
-        if (poller.has_input(_control_socket)) {
-            zmqpp::message msg;
-            _control_socket.receive(msg);
+        if (items[1].revents & ZMQ_POLLIN) { // control
+            zmq::multipart_t msg;
+            msg.recv(_control_socket);
             Request request(_session, msg);
 
             if (request.header()["msg_type"].asString() == "shutdown_request")
@@ -118,14 +118,14 @@ void YacasKernel::run()
         if (_shutdown)
             return;
 
-        if (poller.has_input(_stdin_socket)) {
-            zmqpp::message msg;
-            _stdin_socket.receive(msg);
+        if (items[2].revents & ZMQ_POLLIN) { // stdin
+            zmq::message_t msg;
+            _stdin_socket.recv(msg);
         }
 
-        if (poller.has_input(_engine_socket)) {
-            zmqpp::message msg;
-            _engine_socket.receive(msg);
+        if (items[5].revents & ZMQ_POLLIN) { // engine
+            zmq::multipart_t msg;
+            msg.recv(_engine_socket);
             _handle_engine(msg);
         }
     }
@@ -138,17 +138,13 @@ YacasKernel::Session::Session(const std::string& key) :
 }
 
 YacasKernel::Request::Request(const Session& session,
-                              const zmqpp::message& msg) :
+                              const zmq::multipart_t& msg) :
     _session(session)
 {
-    std::string header_buf;
-    msg.get(header_buf, 3);
-    std::string parent_header_buf;
-    msg.get(parent_header_buf, 4);
-    std::string metadata_buf;
-    msg.get(metadata_buf, 5);
-    std::string content_buf;
-    msg.get(content_buf, 6);
+    const std::string header_buf{msg.peekstr(3)};
+    const std::string parent_header_buf{msg.peekstr(4)};
+    const std::string metadata_buf{msg.peekstr(5)};
+    const std::string content_buf{msg.peekstr(6)};
 
     HMAC_SHA256 auth(_session.auth());
 
@@ -157,13 +153,12 @@ YacasKernel::Request::Request(const Session& session,
     auth.update(metadata_buf);
     auth.update(content_buf);
 
-    std::string signature_buf;
-    msg.get(signature_buf, 2);
+    std::string signature_buf{msg.peekstr(2)};
 
     if (auth.hexdigest() != signature_buf)
         throw std::runtime_error("invalid signature");
 
-    msg.get(_identities_buf, 0);
+    _identities_buf = msg.peekstr(0);
 
     Json::Reader reader;
 
@@ -172,7 +167,7 @@ YacasKernel::Request::Request(const Session& session,
     reader.parse(metadata_buf, _metadata);
 }
 
-void YacasKernel::Request::reply(zmqpp::socket& socket,
+void YacasKernel::Request::reply(zmq::socket_t& socket,
                                  const std::string& msg_type,
                                  const Json::Value& content) const
 {
@@ -199,16 +194,17 @@ void YacasKernel::Request::reply(zmqpp::socket& socket,
     auth.update(metadata_buf);
     auth.update(content_buf);
 
-    zmqpp::message msg;
-    msg.add(_identities_buf);
-    msg.add("<IDS|MSG>");
-    msg.add(auth.hexdigest());
-    msg.add(header_buf);
-    msg.add(parent_header_buf);
-    msg.add(metadata_buf);
-    msg.add(content_buf);
+    zmq::multipart_t msg;
 
-    socket.send(msg);
+    msg.addstr(_identities_buf);
+    msg.addstr("<IDS|MSG>");
+    msg.addstr(auth.hexdigest());
+    msg.addstr(header_buf);
+    msg.addstr(parent_header_buf);
+    msg.addstr(metadata_buf);
+    msg.addstr(content_buf);
+
+    msg.send(socket);
 }
 
 void YacasKernel::_handle_shell(const std::shared_ptr<Request>& request)
@@ -316,13 +312,10 @@ void YacasKernel::_handle_shell(const std::shared_ptr<Request>& request)
     }
 }
 
-void YacasKernel::_handle_engine(const zmqpp::message& msg)
+void YacasKernel::_handle_engine(const zmq::multipart_t& msg)
 {
-    std::string msg_type;
-    msg.get(msg_type, 0);
-
-    std::string content_buf;
-    msg.get(content_buf, 1);
+    std::string msg_type{msg.peekstr(0)};
+    std::string content_buf{msg.peekstr(1)};
 
     Json::Value content;
     Json::Reader().parse(content_buf, content);
