31#ifndef DW_FRAMEWORK_MANAGEDPORT_HPP_
32#define DW_FRAMEWORK_MANAGEDPORT_HPP_
39#include <dwshared/dwfoundation/dw/core/language/Optional.hpp>
40#include <dwshared/dwfoundation/dw/core/container/RingBuffer.hpp>
41#include <dwshared/dwfoundation/dw/core/container/StringView.hpp>
42#include <dwshared/dwfoundation/dw/core/language/Function.hpp>
86 virtual bool isBound() const noexcept = 0;
138 static constexpr char LOG_TAG[]{
"ManagedPortOutputBase"};
150 bool syncEnabled =
false;
191 void setCallbackBeforeSend(
dw::core::Function<dwStatus()> callback);
212 void sendAdvTimestamp();
225 dw::core::Function<dwStatus()> m_callbackBeforeSend;
226 uint32_t m_sendSeqNum;
242 uint32_t maxBuffers = 1U;
248 dwTime_t waitTime = 0;
262 bool enableReuse =
false;
267 bool syncEnabled =
false;
272 uint32_t dataOffset = 0U;
319 bool isBufferAvailable() const noexcept;
334 void setCallbackAfterRecv(
dw::core::Function<dwStatus()> callback);
345 void sendAdvTimestamp();
351 void releaseToChannel(
void* data);
355 bool recvSingle(dwTime_t waitTime);
356 bool stashConsumed();
358 void handleReuseDrop();
360 void releaseSingle();
361 bool postProcessLockstepReplayData(
GenericData packet);
362 dwTime_t getWaitTime();
365 void handleWaitFailure(dwStatus status);
366 void handleCallbackAfterRecv();
371 bool m_shouldDropFirstBuffer;
372 dw::core::Function<dwStatus()> m_callbackAfterRecv;
388 T* ptr{metadataPacket->
data.template getData<T>()};
392 throw ExceptionWithStatus(DW_INVALID_ARGUMENT,
"getBufferTyped: type mismatch");
401 explicit vectorIterable(dw::core::VectorFixed<GenericData> allBuffers)
402 : m_allBuffers(std::move(allBuffers))
410 class iterator :
public dw::core::VectorFixed<GenericData>::iterator
413 using Base = dw::core::VectorFixed<GenericData>::iterator;
416 iterator(Base&& base)
421 const Base& baseFromThis()
const
426 auto operator*()
const
429 return getBufferTyped<TT>(buffer);
434 iterator<T> begin() {
return iterator<T>(m_allBuffers.begin()); }
437 iterator<T> end() {
return iterator<T>(m_allBuffers.end()); }
440 iterator<const T> begin()
const {
return iterator<const T>(m_allBuffers.begin()); }
443 iterator<const T> end()
const {
return iterator<const T>(m_allBuffers.end()); }
446 dw::core::VectorFixed<GenericData> m_allBuffers;
458 "Channel packet type not declared. Ensure channel packet type "
459 "handling is declared with DWFRAMEWORK_DECLARE_PACKET_TYPE_POD "
460 "or DWFRAMEWORK_DECLARE_PACKET_TYPE_RELATION");
513 return detail::vectorIterable<T>(getAllBuffers());
523 return detail::getBufferTyped<T>(getBufferGeneric());
534 "Channel packet type not declared. Ensure channel packet type "
535 "handling is declared with DWFRAMEWORK_DECLARE_PACKET_TYPE_POD "
536 "or DWFRAMEWORK_DECLARE_PACKET_TYPE_RELATION");
540 struct PacketDeleter;
569 return detail::vectorIterable<T>(getAllBuffers());
589 return detail::getBufferTyped<T>(getBufferGeneric());
599 if (isBufferAvailable())
601 const dw::core::Optional<dwValidityStatus> valid{
getValidityStatus(getMetadata())};
604 const bool isValid{!valid.has_value() || (valid->validity == DW_VALIDITY_VALID)};
621 return isBufferAvailable() ? getBuffer() :
nullptr;
629 T* ptr{detail::getBufferTyped<T>(packet)};
645 class iterator :
public ManagedPortInputBase::RingBuffer::iterator
647 using Base = ManagedPortInputBase::RingBuffer::iterator;
653 : Base(std::move(base))
666 return detail::getBufferTyped<TT>(buffer);
692 void operator()(T* p)
694 static_cast<void>(p);
697 ManagedPortInput* port;
711 uint64_t DescriptorIndex>
712struct IsOutputNonPOD : std::integral_constant<bool, Direction == PortDirection::OUTPUT && parameter_traits<decltype(portDescriptorType<NodeT, Direction, DescriptorIndex>())>::PacketTID != DWFRAMEWORK_PACKET_ID_DEFAULT>
714 static_assert(DescriptorIndex < portDescriptorSize<NodeT, Direction>(),
"Invalid PortIndex.");
720 uint64_t DescriptorIndex>
721typename std::enable_if<
722 !IsOutputNonPOD<NodeT, Direction, DescriptorIndex>::value,
725 createPortSpecimenByStaticIndex()
727 GenericDataReference ref{make_specimen<decltype(portDescriptorType<NodeT, Direction, DescriptorIndex>())>(
nullptr)};
735 uint64_t DescriptorIndex>
736typename std::enable_if<
737 IsOutputNonPOD<NodeT, Direction, DescriptorIndex>::value,
740 createPortSpecimenByStaticIndex()
742 throw ExceptionWithStatus(DW_NOT_SUPPORTED,
"createPortSpecimenByStaticIndex: Non POD output port is not supported");
751 createPortSpecimenImpl(
size_t descriptorIndex, std::index_sequence<Idx...>)
753 constexpr size_t ArraySize{
sizeof...(Idx)};
756 std::array<GenericDataReference, ArraySize> specimens{
760 throw ExceptionWithStatus(DW_OUT_OF_BOUNDS,
"createPortSpecimenImpl: index out of bound.");
772 createPortSpecimenImpl(
size_t descriptorIndex, std::index_sequence<Idx...>)
774 constexpr size_t ArraySize{
sizeof...(Idx)};
777 return createPortSpecimenByStaticIndex<NodeT, Direction, 0>();
779 throw ExceptionWithStatus(DW_OUT_OF_BOUNDS,
"createPortSpecimenImpl: index out of bound.");
788 createPortSpecimenImpl(
size_t, std::index_sequence<Idx...>)
790 throw ExceptionWithStatus(DW_OUT_OF_BOUNDS,
"createPortSpecimenImpl: index out of bound.");
799 return detail::createPortSpecimenImpl<NodeT, Direction>(
801 std::make_index_sequence<portDescriptorSize<NodeT, Direction>()>{});
void * getPointer() const
dw::core::StringView m_name
The unique name within set of ports with the same direction.
ManagedPortBase & operator=(ManagedPortBase &&other)=delete
virtual bool isBound() const noexcept=0
void setNodeName(const dw::core::StringView &nodeName) noexcept
Set the name of the node this port belongs to.
virtual ~ManagedPortBase()=default
const dw::core::StringView & getNodeName() const
Get the name of the node this port belongs to.
ManagedPortBase(const ManagedPortBase &other)=delete
virtual void bindChannel(ChannelObject *channel)=0
ChannelObject * getChannel()
ChannelObject * m_channel
ManagedPortBase(ManagedPortBase &&other)=delete
ManagedPortBase & operator=(const ManagedPortBase &other)=delete
const dw::core::StringView & getName() const
Get the name of the port.
void bindLockstepSyncClient(dw::framework::lockstep::ILockstepSyncClient *syncClient)
dw::framework::lockstep::ILockstepSyncClient * m_lockstepSyncClient
ManagedPortBase(const dw::core::StringView &name)
dw::core::StringView m_nodeName
The name of the node this port belongs to.
void setCycleCount(uint32_t cycleCount)
void setPeriod(uint32_t period)
ConstructProperties constructProperties
BoundProperties boundProperties
bool isBufferAvailable() const noexcept
ManagedPortOutput(const dw::core::StringView &name, SpecimenT &ref)
ManagedPortOutput(const dw::core::StringView &name, ConstructProperties props)
ManagedPortOutput(const dw::core::StringView &name)
ManagedPortOutput(const dw::core::StringView &name, ConstructProperties props, SpecimenT &ref)
static constexpr const uint32_t DWFRAMEWORK_METADATA_PACKET_TYPE_ID_OFFSET
static GenericDataReference make_specimen(typename parameter_traits< T >::SpecimenT *specimen)
MetadataPayload * extractMetadata(GenericData packet)
ChannelPacketTypeID packetTypeID
The ID of the type of the endpoint.
dw::core::Optional< dwValidityStatus > getValidityStatus(ChannelMetadata const &header)
uint32_t ChannelPacketTypeID
constexpr ChannelPacketTypeID DWFRAMEWORK_PACKET_ID_DEFAULT
typename ManagedPortInput< T >::UniquePacketPtr UniquePacketPtr
constexpr size_t descriptorIndex()