@@ -38,7 +38,24 @@ void Transport::unregisterIncoming() {
3838
3939Transport::State Transport::state () const { return mState ; }
4040
41- void Transport::onRecv (message_callback callback) { mRecvCallback = std::move (callback); }
41+ void Transport::onRecv (message_callback callback) {
42+ std::vector<message_ptr> pending;
43+ {
44+ std::lock_guard lock (mPendingMutex );
45+ mRecvCallback = std::move (callback);
46+ if (mRecvCallback )
47+ pending = std::move (mPendingRecv );
48+ else
49+ mPendingRecv .clear ();
50+ }
51+ for (auto &msg : pending) {
52+ try {
53+ mRecvCallback (msg);
54+ } catch (const std::exception &e) {
55+ PLOG_WARNING << e.what ();
56+ }
57+ }
58+ }
4259
4360void Transport::onStateChange (state_callback callback) {
4461 mStateChangeCallback = std::move (callback);
@@ -52,6 +69,17 @@ bool Transport::send(message_ptr message) { return outgoing(message); }
5269
5370void Transport::recv (message_ptr message) {
5471 try {
72+ std::unique_lock lock (mPendingMutex );
73+ if (!mRecvCallback ) {
74+ // No callback registered yet; buffer the message for replay when
75+ // onRecv() is called. Bounded to avoid unbounded growth.
76+ if (mPendingRecv .size () < 8 )
77+ mPendingRecv .push_back (std::move (message));
78+ else
79+ PLOG_WARNING << " dropping incoming message, no receive callback" ;
80+ return ;
81+ }
82+ lock.unlock ();
5583 mRecvCallback (message);
5684 } catch (const std::exception &e) {
5785 PLOG_WARNING << e.what ();
0 commit comments