gusucode.com > target工具箱matlab源码程序 > target/codertarget/rtos/targetservices/ASIOSender.cpp
/* Copyright 2013-2016 The MathWorks, Inc. */ #include "ASIOSender.hpp" #include <boost/foreach.hpp> #include <boost/iterator/transform_iterator.hpp> #include <boost/chrono.hpp> namespace coder { namespace tgtsvc { /** Functor to create a boost::asio::const_buffer from a Message*. */ struct MessageToBuffer { boost::asio::const_buffer operator()(const Message *msg) const { return boost::asio::const_buffer(msg->transmitStart(), msg->transmitSize()); } }; /** * Implementation of boost::asio::ConstBufferSequence for gather writes. * T must be a sequence of Message* */ template <typename T> struct ConstBufferSequence { typedef boost::asio::const_buffer value_type; typedef boost::transform_iterator<MessageToBuffer, typename T::const_iterator, boost::asio::const_buffer> const_iterator; ConstBufferSequence(T &l) : list_(l) {} /// clears the list and deletes all the messages in it void clearAndDelete() { while (!list_.empty()) { delete(list_.front()); list_.pop_front(); } } const_iterator begin() const { return const_iterator(list_.begin()); } const_iterator end() const { return const_iterator(list_.end()); } T &list_; }; template <typename AsyncWriteStream> ASIOSender<AsyncWriteStream>::ASIOSender(AsyncWriteStream &s, ErrorCallback ec) : socket_(s), errorCallback_(ec), toSend_(QUEUE_SIZE, NULL), beingSent_(QUEUE_SIZE, NULL), status_(TSE_SUCCESS), waitingSenders_(0), inProgress_(false) { toSend_.clear(); beingSent_.clear(); } template <typename AsyncWriteStream> ASIOSender<AsyncWriteStream>::~ASIOSender() { boost::unique_lock<boost::mutex> lock(mtx_); status_ = TSE_ERROR; while (waitingSenders_ > 0) { lock.unlock(); full_.notify_all(); boost::this_thread::sleep(boost::posix_time::milliseconds(20)); lock.lock(); } clearQueues(); } template <typename AsyncWriteStream> TSEStatus ASIOSender<AsyncWriteStream>::enqueue(Message *msg, Message::Priority p) { boost::unique_lock<boost::mutex> lock(mtx_); while (toSend_.size() >= QUEUE_SIZE && status_ == TSE_SUCCESS) { ++waitingSenders_; try { full_.wait(lock); } // don't send if the wait throws an error catch (...) { --waitingSenders_; return TSE_RESOURCE_UNAVAILABLE; } --waitingSenders_; } if (status_ == TSE_SUCCESS) { toSend_.push_back(msg); if (!inProgress_) { startSending(); } return TSE_SUCCESS; } else { return TSE_ERROR; } } template <typename AsyncWriteStream> bool ASIOSender<AsyncWriteStream>::pending() { boost::unique_lock<boost::mutex> lock(mtx_); return (toSend_.size() != 0 || inProgress_ == true); } /** * Starts another asynchronous send if there is data available. * Required preconditions: * mtx_ must be locked * inProgress_ must be false * beingSent_ must be empty */ template <typename AsyncWriteStream> void ASIOSender<AsyncWriteStream>::startSending() { assert(inProgress_ == false); if (toSend_.empty()) return; assert(beingSent_.empty()); beingSent_.swap(toSend_); full_.notify_all(); ConstBufferSequence<MessageQueue> buffSeq(beingSent_); inProgress_ = true; boost::asio::async_write(socket_, buffSeq, boost::bind(&ASIOSender::sendComplete, this, boost::asio::placeholders::error)); } template <typename AsyncWriteStream> void ASIOSender<AsyncWriteStream>::sendComplete(const boost::system::error_code &error) { BOOST_FOREACH(Message *m, beingSent_) { delete m; } beingSent_.clear(); boost::lock_guard<boost::mutex> guard(mtx_); inProgress_ = false; if (error) { status_ = TSE_ERROR; clearQueues(); errorCallback_(error); } else { startSending(); } } template <typename AsyncWriteStream> void ASIOSender<AsyncWriteStream>::clearQueues() { BOOST_FOREACH(Message *m, beingSent_) { delete m; } beingSent_.clear(); BOOST_FOREACH(Message *m, toSend_) { delete m; } toSend_.clear(); } template class ASIOSender<boost::asio::ip::tcp::socket>; template class ASIOSender<boost::asio::serial_port>; }}