31#ifndef DW_FRAMEWORK_PORT_H_
32#define DW_FRAMEWORK_PORT_H_
36#include <dwshared/dwfoundation/dw/core/container/StringView.hpp>
37#include <dwshared/dwfoundation/dw/core/logger/Logger.hpp>
56T* getBufferTyped(GenericData buffer)
59 T* ptr{metadataPacket->
data.template getData<T>()};
63 throw ExceptionWithStatus(DW_INVALID_ARGUMENT,
"getBufferTyped: type mismatch");
71 static_assert(std::is_constructible<T>::value,
"T must be constructible");
73 explicit vectorIterable(dw::core::VectorFixed<GenericData> allBuffers)
74 : m_allBuffers(std::move(allBuffers))
80 class iterator :
public dw::core::VectorFixed<GenericData>::iterator
82 static_assert(std::is_constructible<TT>::value,
"TT must be constructible");
85 using Base = dw::core::VectorFixed<GenericData>::iterator;
88 : Base(std::move(base))
92 const Base& baseFromThis()
const
99 GenericData buffer{*baseFromThis()};
100 return getBufferTyped<TT>(buffer);
104 iterator<T> begin() {
return iterator<T>(m_allBuffers.begin()); }
106 iterator<T> end() {
return iterator<T>(m_allBuffers.end()); }
109 dw::core::VectorFixed<GenericData> m_allBuffers;
155 static_assert(std::is_constructible<T>::value,
"T must be constructible");
164 static_assert(std::is_copy_constructible<SpecimenT>::value,
"SpecimenT is not copy constructible");
167 static constexpr char LOG_TAG[]{
"PortOutput"};
174 void* m_onDataReadyOpaque;
176 uint32_t m_sendSeqNum;
186 , m_channelProducer(nullptr)
187 , m_reference(std::move(ref))
188 , m_onDataReadyOpaque()
199 , m_channelProducer(
nullptr)
201 , m_waiterAttrs(std::move(waiterAttrs))
202 , m_signalerAttrs(std::move(signalerAttrs))
203 , m_onDataReadyOpaque()
222 throw ExceptionWithStatus(DW_INVALID_ARGUMENT,
"PortOutput: bindChannel: port already bound");
224 if (
nullptr == channel)
226 throw ExceptionWithStatus(DW_INVALID_ARGUMENT,
"PortOutput: bindChannel: expected channel != nullptr");
237 if (
nullptr == m_channelProducer)
239 throw ExceptionWithStatus(DW_INTERNAL_ERROR,
"PortOutput bindChannel: wrong channel implementations returned.");
242 dw::core::Logger::Verbosity::DEBUG);
256 throw ExceptionWithStatus(DW_INVALID_ARGUMENT,
"PortOutput: bindCbindChannelForPODTypePackethannel: port already bound");
258 if (
nullptr == channel)
260 throw ExceptionWithStatus(DW_INVALID_ARGUMENT,
"PortOutput: bindChannelForPODTypePacket: expected channel != nullptr");
264 throw dw::core::ExceptionWithStatus(DW_CALL_NOT_ALLOWED,
"PortOutput: bindChannelForPODTypePacket: setting channel to use POD type only allowed for local channels.");
272 ref.typeSize =
sizeof(T);
274 ref.setWaiterAttributes = m_waiterAttrs;
275 ref.setSignalerAttributes = m_signalerAttrs;
276 ref.onDataReadyOpaque = m_onDataReadyOpaque;
277 ref.onDataReady = m_onDataReady;
280 if (
nullptr == m_channelProducer)
282 throw ExceptionWithStatus(DW_INTERNAL_ERROR,
"PortOutput bindChannelForPODTypePacket: wrong channel implementations returned.");
285 dw::core::Logger::Verbosity::DEBUG);
292 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortOutput: setOnDataReady: channel already bound");
294 m_onDataReadyOpaque = opaque;
295 m_onDataReady = std::move(onDataReady);
300 return (
nullptr != m_channelProducer);
303 dwStatus
wait(dwTime_t timeout)
307 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortOutput: wait: no bound channel");
310 return m_channelProducer->
wait(timeout);
317 dwStatus status{DW_FAILURE};
319 if (m_channelProducer)
321 status = m_channelProducer->
get(&genericData);
324 if (DW_SUCCESS != status)
335 virtual dwStatus
send(T* frame,
const dwTime_t* publishTimestamp =
nullptr)
337 if (!m_channelProducer)
339 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortOutput: channel not bound");
344 return m_channelProducer->
send(payload);
350 if (!m_channelProducer)
352 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortOutput: channel not bound");
361 if (!m_channelProducer)
363 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortOutput: channel not bound");
375 if (!m_channelProducer)
377 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortOutput: channel not bound");
392 return detail::vectorIterable<T>(m_channelProducer->
getAllBuffers());
399 if (m_sendSeqNum < std::numeric_limits<
decltype(m_sendSeqNum)>::max())
405 m_sendSeqNum = std::numeric_limits<
decltype(m_sendSeqNum)>::min();
434 static_assert(std::is_constructible<T>::value,
"T must be constructible");
436 "Channel packet type not declared. Ensure channel packet type "
437 "handling is declared with DWFRAMEWORK_DECLARE_PACKET_TYPE_POD "
438 "or DWFRAMEWORK_DECLARE_PACKET_TYPE_RELATION");
440 static constexpr char LOG_TAG[]{
"PortInput"};
449 static_assert(std::is_copy_constructible<SpecimenT>::value,
"SpecimenT is not copy constructible");
458 , m_channelConsumer(nullptr)
460 , m_calledRecvImpl(RECV_API_CALLED_NONE)
461 , m_lastTypedData(nullptr)
462 , m_lastReleasePtr(nullptr)
463 , m_existingUniquePtr(false)
464 , m_reference(std::move(ref))
467 , m_onDataReadyOpaque()
481 , m_channelConsumer(
nullptr)
483 , m_calledRecvImpl(RECV_API_CALLED_NONE)
484 , m_lastTypedData(
nullptr)
485 , m_lastReleasePtr(
nullptr)
486 , m_existingUniquePtr(
false)
487 , m_waiterAttrs(std::move(waiterAttrs))
488 , m_signalerAttrs(std::move(signalerAttrs))
489 , m_onDataReadyOpaque()
499 , m_channelConsumer(
nullptr)
501 , m_calledRecvImpl(RECV_API_CALLED_NONE)
502 , m_lastTypedData(
nullptr)
503 , m_lastReleasePtr(
nullptr)
504 , m_existingUniquePtr(
false)
506 , m_waiterAttrs(std::move(waiterAttrs))
507 , m_signalerAttrs(std::move(signalerAttrs))
508 , m_onDataReadyOpaque()
516 if (
nullptr != m_channelConsumer &&
nullptr != m_lastReleasePtr)
518 if (m_existingUniquePtr.load())
520 DW_LOGE << dw::core::StringView{
"~PortInput: Cannot release reused packet since the unique_ptr has not been returned by caller yet"} << Logger::State::endl;
524 static_cast<void>(m_channelConsumer->
release(m_lastReleasePtr));
536 throw ExceptionWithStatus(DW_INVALID_ARGUMENT,
"PortInput: bindChannel: port already bound");
538 if (
nullptr == channel)
540 throw ExceptionWithStatus(DW_INVALID_ARGUMENT,
"PortInput: bindChannel: expected channel != nullptr");
547 if (m_reference.has_value())
549 ref = make_specimen<T>(&m_reference.value());
553 ref.setWaiterAttributes = m_waiterAttrs;
554 ref.setSignalerAttributes = m_signalerAttrs;
555 ref.onDataReadyOpaque = m_onDataReadyOpaque;
556 ref.onDataReady = m_onDataReady;
559 if (
nullptr == m_channelConsumer)
561 throw ExceptionWithStatus(DW_INTERNAL_ERROR,
"PortInput bindChannel: wrong channel implementations returned.");
565 dw::core::Logger::Verbosity::DEBUG);
570 return !(
nullptr == m_channelConsumer);
577 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortInput: setOnDataReady: channel already bound");
579 m_onDataReadyOpaque = opaque;
580 m_onDataReady = std::move(onDataReady);
584 dwStatus
wait(dwTime_t timeout)
588 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortInput: wait: no bound channel");
600 return DW_NOT_AVAILABLE;
608 bool hasLast{(
nullptr != m_last.get()) || (
nullptr != m_lastTypedData)};
609 dwTime_t waitTime{hasLast ? 0 : timeout};
610 dwStatus status{m_channelConsumer->
wait(waitTime)};
611 if (hasLast && (DW_TIME_OUT == status || DW_NOT_AVAILABLE == status))
620 virtual std::shared_ptr<T>
recv()
622 if (RECV_API_CALLED_RECV_UNIQUE == m_calledRecvImpl)
624 throw ExceptionWithStatus(DW_CALL_NOT_ALLOWED,
"PortInput: recv() can't be called after calling recvUnique() before");
626 m_calledRecvImpl = RECV_API_CALLED_RECV;
629 std::shared_ptr<T> result{};
636 T* typedData{
nullptr};
638 void* releasePtr{
nullptr};
644 releasePtr = data.getPointer();
654 dwStatus status{m_channelConsumer->
recv(&data)};
655 if (DW_SUCCESS != status)
657 if (
nullptr != m_last)
678 releasePtr = data.getPointer();
684 result = std::shared_ptr<T>(typedData, [channelConsumer, releasePtr](T*) {
685 static_cast<void>(channelConsumer->release(releasePtr));
712 port->m_existingUniquePtr =
false;
724 if (RECV_API_CALLED_RECV == m_calledRecvImpl)
726 throw ExceptionWithStatus(DW_CALL_NOT_ALLOWED,
"PortInput: recvUnique() can't be called after calling recv() before");
728 m_calledRecvImpl = RECV_API_CALLED_RECV_UNIQUE;
737 T* typedData{
nullptr};
739 void* releasePtr{
nullptr};
744 releasePtr = data.getPointer();
754 dwStatus status{m_channelConsumer->
recv(&data)};
755 if (DW_SUCCESS != status)
757 return makeUniquePtr();
771 releasePtr = data.getPointer();
774 return makeUniquePtr(typedData, releasePtr);
780 if (!m_channelConsumer)
782 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortInput: channel not bound");
791 if (!m_channelConsumer)
793 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortInput: channel not bound");
805 if (!m_channelConsumer)
807 throw ExceptionWithStatus(DW_NOT_AVAILABLE,
"PortInput: channel not bound");
822 return detail::vectorIterable<T>(m_channelConsumer->
getAllBuffers());
826 UniquePacketPtr makeUniquePtr(T* typedData =
nullptr,
void* releasePtr =
nullptr)
833 if (m_existingUniquePtr.load())
838 throw ExceptionWithStatus(DW_CALL_NOT_ALLOWED,
"Cannot return unique_ptr of reused packet since previous unique_ptr has not been returned");
841 if (
nullptr == typedData &&
nullptr == m_lastTypedData)
846 if (
nullptr != typedData)
848 if (
nullptr != m_lastTypedData)
851 static_cast<void>(m_channelConsumer->
release(m_lastReleasePtr));
853 m_lastTypedData = typedData;
854 m_lastReleasePtr = releasePtr;
857 m_existingUniquePtr =
true;
858 return std::move(
UniquePacketPtr(m_lastTypedData, PacketDeleter{
this, releasePtr}));
862 static constexpr uint8_t RECV_API_CALLED_NONE{0U};
864 static constexpr uint8_t RECV_API_CALLED_RECV{1U};
866 static constexpr uint8_t RECV_API_CALLED_RECV_UNIQUE{2U};
868 ChannelObject::Consumer* m_channelConsumer;
871 uint8_t m_calledRecvImpl;
873 std::shared_ptr<T> m_last;
877 void* m_lastReleasePtr;
879 std::atomic<bool> m_existingUniquePtr;
880 dw::core::Optional<SpecimenT> m_reference;
883 void* m_onDataReadyOpaque;
888constexpr char PortInput<T>::LOG_TAG[];
virtual dwStatus recv(GenericData *data)=0
virtual dwStatus release(void *data)=0
virtual dwStatus wait(dwTime_t timeout)=0
SyncWaiter & getSyncWaiter()
virtual dw::core::VectorFixed< GenericData > getAllBuffers()=0
SyncSignaler & getSyncSignaler()
virtual dwTime_t getCurrentTime()=0
virtual dwStatus get(GenericData *data)=0
virtual dwStatus send(void *data)=0
virtual void setSignalFences(void *data, dw::core::span< const NvSciSyncFence > postFences)=0
virtual void getWaitFences(void *data, dw::core::span< NvSciSyncFence > &waitFences)=0
virtual const ChannelParams & getParams() const =0
virtual Consumer * getConsumer(const GenericDataReference &ref)=0
virtual Producer * getProducer(const GenericDataReference &ref)=0
bool getReuseEnabled() const
ChannelType getType() const
static dwStatus guard(TryBlock const &tryBlock, ::dw::core::Logger::Verbosity verbosity=::dw::core::Logger::Verbosity::ERROR)
virtual ~PortBase()=default
detail::vectorIterable< T > getAllBufferIter()
virtual dwStatus send(T *frame, const dwTime_t *publishTimestamp=nullptr)
dwStatus wait(dwTime_t timeout)
static constexpr char LOG_TAG[]
dwStatus bindChannelWithReference(ChannelObject *channel, GenericDataReference &ref)
ChannelMetadata & getMetadata(T *frame)
dwStatus bindChannelForPODTypePacket(ChannelObject *channel)
dwStatus bindChannel(ChannelObject *channel) override
void setOnDataReady(void *opaque, OnDataReady onDataReady)
PortOutput(SpecimenT const &ref)
typename parameter_traits< T >::SpecimenT SpecimenT
ChannelObject::SyncSignaler & getSyncSignaler()
PortOutput(SpecimenT const &ref, OnSetSyncAttrs signalerAttrs, OnSetSyncAttrs waiterAttrs={})
ChannelObject::SyncWaiter & getSyncWaiter()
static constexpr PortDirection DIRECTION
void populateDefaultMetadata(ChannelMetadata &header, const dwTime_t *publishTimestamp)
PortOutput(SpecimenT &&ref)
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > &fences)
void setSignalFences(T *frame, dw::core::span< const NvSciSyncFence > fences)
ChannelObject * m_channel
virtual dwStatus bindChannel(ChannelObject *channel)=0
virtual ChannelObject * getChannel()
T * extractInternalPacket(GenericData genericData)
void parseDataSynced(const ChannelParams ¶ms) override
MetadataPayload * getMetadataPacket(T *frame)
OnDataReady onDataReady
lambda to handle data ready
static constexpr const uint32_t DWFRAMEWORK_METADATA_PACKET_TYPE_ID_OFFSET
MetadataPayload * extractMetadata(GenericData packet)
dw::core::Function< void(NvSciSyncAttrList)> OnSetSyncAttrs
void setTimestamp(ChannelMetadata &header, dwTime_t const ×tamp)
OnSetSyncAttrs setSignalerAttributes
lambda to set the signaler attributes of the endpoint.
ChannelPacketTypeID packetTypeID
The ID of the type of the endpoint.
uint16_t validFields
Bit map defining which ChannelMetadata fields are set. See MetadataFlags.
dw::core::Function< void()> OnDataReady
uint32_t ChannelPacketTypeID
constexpr ChannelPacketTypeID DWFRAMEWORK_PACKET_ID_DEFAULT
uint32_t producerId
Id of the producer channel.
typename ManagedPortInput< T >::UniquePacketPtr UniquePacketPtr
uint32_t iterationCount
Producer iteration count.
void setSequenceNumber(ChannelMetadata &header, uint32_t const &sequenceNum)
OnSetSyncAttrs setWaiterAttributes
lambda to set the waiter attributes of the endpoint.
@ METADATA_ITERATION_COUNT
Producer iteration count is set.
void * onDataReadyOpaque
pointer hint for data ready
@ SHMEM_LOCAL
local shared memory
ChannelPacketTypeID getNewPacketID(ChannelPacketTypeID packetTypeID)
void stampSyncCount(uint32_t &syncCountOut) const