22 #include "robot_memory.h"
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/misc/string_conversions.h>
29 #include <utils/misc/string_split.h>
30 #include <utils/system/hostinfo.h>
31 #ifdef USE_TIMETRACKER
32 # include <utils/time/tracker.h>
34 #include <utils/time/tracker_macros.h>
36 #include <bsoncxx/builder/basic/document.hpp>
38 #include <mongocxx/client.hpp>
39 #include <mongocxx/exception/operation_exception.hpp>
40 #include <mongocxx/read_preference.hpp>
45 using namespace mongocxx;
46 using namespace bsoncxx;
76 mongo_connection_manager_ = mongo_connection_manager;
77 blackboard_ = blackboard;
78 mongodb_client_local_ =
nullptr;
79 mongodb_client_distributed_ =
nullptr;
83 RobotMemory::~RobotMemory()
85 mongo_connection_manager_->delete_client(mongodb_client_local_);
86 mongo_connection_manager_->delete_client(mongodb_client_distributed_);
87 delete trigger_manager_;
88 blackboard_->close(rm_if_);
89 #ifdef USE_TIMETRACKER
98 log(
"Started RobotMemory");
99 default_collection_ =
"robmem.test";
101 default_collection_ = config_->get_string(
"/plugins/robot-memory/default-collection");
105 debug_ = config_->get_bool(
"/plugins/robot-memory/more-debug-output");
108 database_name_ =
"robmem";
110 database_name_ = config_->get_string(
"/plugins/robot-memory/database");
113 distributed_dbs_ = config_->get_strings(
"/plugins/robot-memory/distributed-db-names");
115 cfg_coord_database_ = config_->get_string(
"/plugins/robot-memory/coordination/database");
116 cfg_coord_mutex_collection_ =
117 config_->get_string(
"/plugins/robot-memory/coordination/mutex-collection");
119 using namespace std::chrono_literals;
122 log(
"Connect to local mongod");
123 mongodb_client_local_ = mongo_connection_manager_->create_client(
"robot-memory-local");
125 if (config_->exists(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")
126 && config_->get_bool(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
128 log(
"Connect to distributed mongod");
129 mongodb_client_distributed_ =
130 mongo_connection_manager_->create_client(
"robot-memory-distributed");
134 rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
135 config_->get_string(
"/plugins/robot-memory/interface-name").c_str());
136 rm_if_->set_error(
"");
137 rm_if_->set_result(
"");
144 log_deb(
"Initialized RobotMemory");
146 #ifdef USE_TIMETRACKER
149 ttc_events_ = tt_->add_class(
"RobotMemory Events");
150 ttc_cleanup_ = tt_->add_class(
"RobotMemory Cleanup");
157 TIMETRACK_START(ttc_events_);
158 trigger_manager_->check_events();
159 TIMETRACK_END(ttc_events_);
160 TIMETRACK_START(ttc_cleanup_);
161 computables_manager_->cleanup_computed_docs();
162 TIMETRACK_END(ttc_cleanup_);
163 #ifdef USE_TIMETRACKER
164 if (++tt_loopcount_ % 5 == 0) {
165 tt_->print_to_stdout();
179 const std::string & collection_name,
180 mongocxx::options::find query_options)
182 collection collection = get_collection(collection_name);
183 log_deb(std::string(
"Executing Query " + to_json(query) +
" on collection " + collection_name));
186 computables_manager_->check_and_compute(query, collection_name);
193 return collection.find(query, query_options);
194 }
catch (mongocxx::operation_exception &e) {
196 std::string(
"Error for query ") + to_json(query) +
"\n Exception: " + e.what();
211 collection collection = get_collection(collection_name);
212 log_deb(std::string(
"Inserting " + to_json(doc) +
" into collection " + collection_name));
217 collection.insert_one(doc);
218 }
catch (mongocxx::operation_exception &e) {
219 std::string error =
"Error for insert " + to_json(doc) +
"\n Exception: " + e.what();
220 log_deb(error,
"error");
235 const std::string & collection_name,
238 collection collection = get_collection(collection_name);
240 log_deb(std::string(
"Creating index " + to_json(keys) +
" on collection " + collection_name));
247 using namespace bsoncxx::builder::basic;
248 collection.create_index(keys, make_document(kvp(
"unique", unique)));
249 }
catch (operation_exception &e) {
250 std::string error =
"Error when creating index " + to_json(keys) +
"\n Exception: " + e.what();
251 log_deb(error,
"error");
267 collection collection = get_collection(collection_name);
268 std::string insert_string =
"[";
269 for (
auto &&doc : docs) {
270 insert_string += to_json(doc) +
",\n";
272 insert_string +=
"]";
274 log_deb(std::string(
"Inserting vector of documents " + insert_string +
" into collection "
282 collection.insert_many(docs);
283 }
catch (operation_exception &e) {
284 std::string error =
"Error for insert " + insert_string +
"\n Exception: " + e.what();
285 log_deb(error,
"error");
301 return insert(from_json(obj_str), collection);
314 const bsoncxx::document::view &update,
315 const std::string & collection_name,
318 collection collection = get_collection(collection_name);
319 log_deb(std::string(
"Executing Update " + to_json(update) +
" for query " + to_json(query)
320 +
" on collection " + collection_name));
327 collection.update_many(query,
328 builder::basic::make_document(
329 builder::basic::kvp(
"$set", builder::concatenate(update))),
330 options::update().upsert(upsert));
331 }
catch (operation_exception &e) {
332 log_deb(std::string(
"Error for update " + to_json(update) +
" for query " + to_json(query)
333 +
"\n Exception: " + e.what()),
351 const std::string & update_str,
352 const std::string & collection,
355 return update(query, from_json(update_str), collection, upsert);
369 const document::view &update,
370 const std::string & collection_name,
374 collection collection = get_collection(collection_name);
376 log_deb(std::string(
"Executing findOneAndUpdate " + to_json(update) +
" for filter "
377 + to_json(filter) +
" on collection " + collection_name));
383 collection.find_one_and_update(filter,
385 options::find_one_and_update().upsert(upsert).return_document(
386 return_new ? options::return_document::k_after
387 : options::return_document::k_before));
391 std::string error =
"Error for update " + to_json(update) +
" for query " + to_json(filter)
392 +
"FindOneAndUpdate unexpectedly did not return a document";
393 log_deb(error,
"warn");
394 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp(
"error", error));
396 }
catch (operation_exception &e) {
397 std::string error =
"Error for update " + to_json(update) +
" for query " + to_json(filter)
398 +
"\n Exception: " + e.what();
399 log_deb(error,
"error");
400 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp(
"error", error));
415 collection collection = get_collection(collection_name);
416 log_deb(std::string(
"Executing Remove " + to_json(query) +
" on collection " + collection_name));
419 collection.delete_many(query);
420 }
catch (operation_exception &e) {
421 log_deb(std::string(
"Error for query " + to_json(query) +
"\n Exception: " + e.what()),
437 bsoncxx::document::value
439 const std::string & collection,
440 const std::string & js_map_fun,
441 const std::string & js_reduce_fun)
462 collection collection = get_collection(collection_name);
463 log_deb(std::string(
"Aggregating in collection " + collection_name));
468 return collection.aggregate(pipeline, mongocxx::options::aggregate{});
469 }
catch (operation_exception &e) {
471 std::string(
"Error when aggregating " + to_json(pipeline.view_array()) +
"\n Exception: ")
473 log_deb(error,
"error");
487 collection collection = get_collection(collection_name);
488 log_deb(
"Dropping collection " + collection_name);
503 log_deb(
"Clearing whole robot memory");
504 mongodb_client_local_->database(database_name_).drop();
520 const std::string &directory,
521 std::string target_dbcollection)
523 if (target_dbcollection ==
"") {
524 target_dbcollection = dbcollection;
527 drop_collection(target_dbcollection);
532 auto [db, collection] = split_db_collection_string(dbcollection);
535 log_deb(std::string(
"Restore collection " + collection +
" from " + path),
"warn");
537 auto [target_db, target_collection] = split_db_collection_string(target_dbcollection);
540 std::string command =
"/usr/bin/mongorestore --dir " + path +
" -d " + target_db +
" -c "
541 + target_collection +
" --host=" + get_hostport(dbcollection);
542 log_deb(std::string(
"Restore command: " + command),
"warn");
543 FILE *bash_output = popen(command.c_str(),
"r");
547 log(std::string(
"Unable to restore collection" + collection),
"error");
550 std::string output_string =
"";
552 while (!feof(bash_output)) {
553 if (fgets(buffer, 100, bash_output) == NULL) {
556 output_string += buffer;
559 if (output_string.find(
"Failed") != std::string::npos) {
560 log(std::string(
"Unable to restore collection" + collection),
"error");
561 log_deb(output_string,
"error");
580 log_deb(std::string(
"Dump collection " + dbcollection +
" into " + path),
"warn");
582 auto [db, collection] = split_db_collection_string(dbcollection);
584 std::string command =
"/usr/bin/mongodump --out=" + path +
" --db=" + db
585 +
" --collection=" + collection +
" --forceTableScan"
586 +
" --host=" + get_hostport(dbcollection);
587 log(std::string(
"Dump command: " + command),
"info");
588 FILE *bash_output = popen(command.c_str(),
"r");
591 log(std::string(
"Unable to dump collection" + collection),
"error");
594 std::string output_string =
"";
596 while (!feof(bash_output)) {
597 if (fgets(buffer, 100, bash_output) == NULL) {
600 output_string += buffer;
603 if (output_string.find(
"Failed") != std::string::npos) {
604 log(std::string(
"Unable to dump collection" + collection),
"error");
605 log_deb(output_string,
"error");
612 RobotMemory::log(
const std::string &what,
const std::string &info)
614 if (!info.compare(
"error"))
615 logger_->log_error(name_,
"%s", what.c_str());
616 else if (!info.compare(
"warn"))
617 logger_->log_warn(name_,
"%s", what.c_str());
618 else if (!info.compare(
"debug"))
619 logger_->log_debug(name_,
"%s", what.c_str());
621 logger_->log_info(name_,
"%s", what.c_str());
625 RobotMemory::log_deb(
const std::string &what,
const std::string &level)
633 RobotMemory::log_deb(
const bsoncxx::document::view &query,
634 const std::string & what,
635 const std::string & level)
638 log(query, what, level);
643 RobotMemory::log(
const bsoncxx::document::view &query,
644 const std::string & what,
645 const std::string & level)
647 log(what +
" " + to_json(query), level);
655 RobotMemory::is_distributed_database(
const std::string &dbcollection)
657 return std::find(distributed_dbs_.begin(),
658 distributed_dbs_.end(),
659 split_db_collection_string(dbcollection).first)
660 != distributed_dbs_.end();
664 RobotMemory::get_hostport(
const std::string &dbcollection)
666 if (distributed_ && is_distributed_database(dbcollection)) {
667 return config_->get_string(
"/plugins/mongodb/clients/robot-memory-distributed-direct/hostport");
669 return config_->get_string(
"/plugins/mongodb/clients/robot-memory-local-direct/hostport");
679 RobotMemory::get_mongodb_client(
const std::string &collection)
682 return mongodb_client_local_;
684 if (is_distributed_database(collection)) {
685 return mongodb_client_distributed_;
687 return mongodb_client_local_;
698 RobotMemory::get_collection(
const std::string &dbcollection)
700 auto db_coll_pair = split_db_collection_string(dbcollection);
702 if (is_distributed_database(dbcollection)) {
703 client = mongodb_client_distributed_;
705 client = mongodb_client_local_;
707 return client->database(db_coll_pair.first)[db_coll_pair.second];
717 trigger_manager_->remove_trigger(trigger);
727 computables_manager_->remove_computable(computable);
741 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
742 using namespace bsoncxx::builder;
743 basic::document insert_doc{};
744 insert_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
745 subdoc.append(basic::kvp(
"lock-time",
true));
747 insert_doc.append(basic::kvp(
"_id", name));
748 insert_doc.append(basic::kvp(
"locked",
false));
751 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
752 auto write_concern = mongocxx::write_concern();
753 write_concern.majority(std::chrono::milliseconds(0));
754 collection.insert_one(insert_doc.view(), options::insert().write_concern(write_concern));
756 }
catch (operation_exception &e) {
757 logger_->log_info(name_,
"Failed to create mutex %s: %s", name.c_str(), e.what());
771 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
772 using namespace bsoncxx::builder;
773 basic::document destroy_doc;
774 destroy_doc.append(basic::kvp(
"_id", name));
777 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
778 auto write_concern = mongocxx::write_concern();
779 write_concern.majority(std::chrono::milliseconds(0));
780 collection.delete_one(destroy_doc.view(),
781 options::delete_options().write_concern(write_concern));
783 }
catch (operation_exception &e) {
784 logger_->log_info(name_,
"Failed to destroy mutex %s: %s", name.c_str(), e.what());
803 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
805 std::string locked_by{identity};
806 if (identity.empty()) {
808 locked_by = host_info.
name();
812 using namespace bsoncxx::builder;
813 basic::document filter_doc;
814 filter_doc.append(basic::kvp(
"_id", name));
816 filter_doc.append(basic::kvp(
"locked",
false));
819 basic::document update_doc;
820 update_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
821 subdoc.append(basic::kvp(
"lock-time",
true));
823 update_doc.append(basic::kvp(
"$set", [locked_by](basic::sub_document subdoc) {
824 subdoc.append(basic::kvp(
"locked",
true));
825 subdoc.append(basic::kvp(
"locked-by", locked_by));
829 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
830 auto write_concern = mongocxx::write_concern();
831 write_concern.majority(std::chrono::milliseconds(0));
833 collection.find_one_and_update(filter_doc.view(),
835 options::find_one_and_update()
837 .return_document(options::return_document::k_after)
838 .write_concern(write_concern));
843 auto new_view = new_doc->view();
844 return (new_view[
"locked-by"].get_utf8().value.to_string() == locked_by
845 && new_view[
"locked"].get_bool());
847 }
catch (operation_exception &e) {
848 logger_->log_error(name_,
"Mongo OperationException: %s", e.what());
851 basic::document check_doc;
852 check_doc.append(basic::kvp(
"_id", name));
853 check_doc.append(basic::kvp(
"locked",
true));
854 check_doc.append(basic::kvp(
"locked-by", locked_by));
856 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
857 auto res = collection.find_one(check_doc.view());
858 logger_->log_info(name_,
"Checking whether mutex was acquired succeeded");
860 logger_->log_warn(name_,
861 "Exception during try-lock for %s, "
862 "but mutex was still acquired",
865 logger_->log_info(name_,
866 "Exception during try-lock for %s, "
867 "and mutex was not acquired",
870 return static_cast<bool>(res);
871 }
catch (operation_exception &e) {
872 logger_->log_error(name_,
873 "Mongo OperationException while handling "
874 "the first exception: %s",
894 return mutex_try_lock(name,
"", force);
905 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
907 std::string locked_by{identity};
908 if (identity.empty()) {
910 locked_by = host_info.
name();
913 using namespace bsoncxx::builder;
915 basic::document filter_doc;
916 filter_doc.append(basic::kvp(
"_id", name));
917 filter_doc.append(basic::kvp(
"locked-by", locked_by));
919 basic::document update_doc;
920 update_doc.append(basic::kvp(
"$set", [](basic::sub_document subdoc) {
921 subdoc.append(basic::kvp(
"locked",
false));
923 update_doc.append(basic::kvp(
"$unset", [](basic::sub_document subdoc) {
924 subdoc.append(basic::kvp(
"locked-by",
true));
925 subdoc.append(basic::kvp(
"lock-time",
true));
930 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
931 auto write_concern = mongocxx::write_concern();
932 write_concern.majority(std::chrono::milliseconds(0));
934 collection.find_one_and_update(filter_doc.view(),
936 options::find_one_and_update()
938 .return_document(options::return_document::k_after)
939 .write_concern(write_concern));
943 return new_doc->view()[
"locked"].get_bool();
944 }
catch (operation_exception &e) {
961 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
963 std::string locked_by{identity};
964 if (identity.empty()) {
966 locked_by = host_info.
name();
969 using namespace bsoncxx::builder;
971 basic::document filter_doc;
972 filter_doc.append(basic::kvp(
"_id", name));
973 filter_doc.append(basic::kvp(
"locked",
true));
974 filter_doc.append(basic::kvp(
"locked-by", locked_by));
978 basic::document update_doc;
979 update_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
980 subdoc.append(basic::kvp(
"lock-time",
true));
982 update_doc.append(basic::kvp(
"$set", [locked_by](basic::sub_document subdoc) {
983 subdoc.append(basic::kvp(
"locked",
true));
984 subdoc.append(basic::kvp(
"locked-by", locked_by));
989 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
990 auto write_concern = mongocxx::write_concern();
991 write_concern.majority(std::chrono::milliseconds(0));
993 collection.find_one_and_update(filter_doc.view(),
995 options::find_one_and_update()
997 .return_document(options::return_document::k_after)
998 .write_concern(write_concern));
999 return static_cast<bool>(new_doc);
1000 }
catch (operation_exception &e) {
1001 logger_->log_warn(name_,
"Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1022 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1024 auto keys = builder::basic::make_document(builder::basic::kvp(
"lock-time",
true));
1027 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1028 collection.create_index(keys.view(),
1029 builder::basic::make_document(
1030 builder::basic::kvp(
"expireAfterSeconds", max_age_sec)));
1031 }
catch (operation_exception &e) {
1032 logger_->log_warn(name_,
"Creating TTL index failed: %s", e.what());
1047 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1049 using std::chrono::high_resolution_clock;
1050 using std::chrono::milliseconds;
1051 using std::chrono::time_point;
1052 using std::chrono::time_point_cast;
1054 auto max_age_ms = milliseconds(
static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1055 time_point<high_resolution_clock, milliseconds> expire_before =
1056 time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1057 types::b_date expire_before_mdb(expire_before);
1060 using namespace bsoncxx::builder;
1061 basic::document filter_doc;
1062 filter_doc.append(basic::kvp(
"locked",
true));
1063 filter_doc.append(basic::kvp(
"lock-time", [expire_before_mdb](basic::sub_document subdoc) {
1064 subdoc.append(basic::kvp(
"$lt", expire_before_mdb));
1067 basic::document update_doc;
1068 update_doc.append(basic::kvp(
"$set", [](basic::sub_document subdoc) {
1069 subdoc.append(basic::kvp(
"locked",
false));
1071 update_doc.append(basic::kvp(
"$unset", [](basic::sub_document subdoc) {
1072 subdoc.append(basic::kvp(
"locked-by",
true));
1073 subdoc.append(basic::kvp(
"lock-time",
true));
1078 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1079 auto write_concern = mongocxx::write_concern();
1080 write_concern.majority(std::chrono::milliseconds(0));
1081 collection.update_many(filter_doc.view(),
1083 options::update().write_concern(write_concern));
1085 }
catch (operation_exception &e) {
1086 log(std::string(
"Failed to expire locks: " + std::string(e.what())),
"error");
Class holding information for a single computable this class also enhances computed documents by addi...
This class manages registering computables and can check if any computables are invoced by a query.
Manager to realize triggers on events in the robot memory.
Class holding all information about an EventTrigger.
bool mutex_create(const std::string &name)
Explicitly create a mutex.
int dump_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory")
Dump (= save) a collection to the filesystem to restore it later.
bool mutex_destroy(const std::string &name)
Destroy a mutex.
bool mutex_renew_lock(const std::string &name, const std::string &identity)
Renew a mutex.
bool mutex_unlock(const std::string &name, const std::string &identity)
Release lock on mutex.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
mongocxx::cursor query(bsoncxx::document::view query, const std::string &collection_name="", mongocxx::options::find query_options=mongocxx::options::find())
Query information from the robot memory.
bsoncxx::document::value find_one_and_update(const bsoncxx::document::view &filter, const bsoncxx::document::view &update, const std::string &collection, bool upsert=false, bool return_new=true)
Atomically update and retrieve document.
bool mutex_setup_ttl(float max_age_sec)
Setup time-to-live index for mutexes.
bsoncxx::document::value mapreduce(const bsoncxx::document::view &query, const std::string &collection, const std::string &js_map_fun, const std::string &js_reduce_fun)
Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
int remove(const bsoncxx::document::view &query, const std::string &collection="")
Remove documents from the robot memory.
int insert(bsoncxx::document::view, const std::string &collection="")
Inserts a document into the robot memory.
mongocxx::cursor aggregate(mongocxx::pipeline &pipeline, const std::string &collection="")
Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3....
void remove_computable(Computable *computable)
Remove previously registered computable.
int drop_collection(const std::string &collection)
Drop (= remove) a whole collection and all documents inside it.
int update(const bsoncxx::document::view &query, const bsoncxx::document::view &update, const std::string &collection="", bool upsert=false)
Updates documents in the robot memory.
bool mutex_try_lock(const std::string &name, bool force=false)
Try to acquire a lock for a mutex.
int create_index(bsoncxx::document::view keys, const std::string &collection="", bool unique=false)
Create an index on a collection.
int clear_memory()
Remove the whole database of the robot memory and all documents inside.
RobotMemory(fawkes::Configuration *config, fawkes::Logger *logger, fawkes::Clock *clock, fawkes::MongoDBConnCreator *mongo_connection_manager, fawkes::BlackBoard *blackboard)
Robot Memory Constructor with objects of the thread.
int restore_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory", std::string target_dbcollection="")
Restore a previously dumped collection from a directory.
bool mutex_expire_locks(float max_age_sec)
Expire old locks on mutexes.
The BlackBoard abstract class.
This is supposed to be the central clock in Fawkes.
Interface for configuration handling.
Base class for exceptions in Fawkes.
const char * name()
Get full hostname.
Interface for a MongoDB connection creator.
static std::string resolve_path(std::string s)
Resolves path-string with @...@ tags.
Fawkes library namespace.