1#include <vanetza/common/runtime.hpp>
2#include <vanetza/geonet/address.hpp>
3#include <vanetza/geonet/cbf_counter.hpp>
4#include <vanetza/geonet/cbf_packet_buffer.hpp>
5#include <vanetza/geonet/pdu.hpp>
6#include <vanetza/units/time.hpp>
15CbfPacket::CbfPacket(PendingPacket<GbcPdu>&& packet,
const MacAddress& sender) :
16 m_packet(
std::move(packet)), m_sender(sender)
20CbfPacket::CbfPacket(PendingPacket<GbcPdu, const MacAddress&>&& packet,
const MacAddress& sender) :
21 m_packet(PendingPacket<GbcPdu>(
std::move(packet), cBroadcastMacAddress)), m_sender(sender)
32 return m_packet.pdu().extended().source_position.gn_addr;
37 return m_packet.pdu().extended().sequence_number;
40Clock::duration CbfPacket::reduce_lifetime(Clock::duration d)
42 return m_packet.reduce_lifetime(d);
45std::size_t CbfPacket::length()
const
47 return m_packet.length();
51CbfPacketBuffer::CbfPacketBuffer(
Runtime& rt, TimerCallback cb, std::unique_ptr<CbfCounter> cnt, std::size_t bytes) :
52 m_runtime(rt), m_counter(
std::move(cnt)),
53 m_capacity_bytes(bytes), m_stored_bytes(0),
58CbfPacketBuffer::~CbfPacketBuffer()
65 bool packet_dropped =
false;
67 auto& id_map = m_timers.right;
68 auto found = id_map.find(
id);
69 if (found != id_map.end()) {
70 auto& packet = found->info;
71 m_stored_bytes -= packet->length();
72 m_counter->remove(
id);
73 m_packets.erase(packet);
75 packet_dropped =
true;
78 assert(m_packets.size() == m_timers.size());
79 return packet_dropped;
84 auto& timer_map = m_timers.left;
85 auto successor = timer_map.erase(timer_it);
86 if (successor == timer_map.begin() && !timer_map.empty()) {
94 if(timeout <= Clock::duration::zero())
return;
95 m_stored_bytes += packet.length();
96 const auto first_timer = m_timers.left.begin();
99 while (m_stored_bytes > m_capacity_bytes && !m_packets.empty()) {
100 m_stored_bytes -= m_packets.front().length();
101 const auto id = identifier(m_packets.front());
102 m_timers.right.erase(
id);
103 m_counter->remove(
id);
104 m_packets.pop_front();
107 Timer timer = { m_runtime, timeout };
108 const Identifier
id = identifier(packet);
109 m_packets.emplace_back(std::move(packet));
110 using timer_value = timer_bimap::value_type;
111 auto insertion = m_timers.insert(timer_value { timer, id, std::prev(m_packets.end()) });
112 if (!insertion.second) {
113 m_stored_bytes -= m_packets.back().length();
114 m_packets.pop_back();
120 if (m_timers.left.begin() != first_timer) {
123 assert(m_packets.size() == m_timers.size());
128 auto& id_map = m_timers.right;
129 auto found = id_map.find(
id);
130 if (found != id_map.end()) {
131 const Timer& timer = found->second;
134 id_map.replace_data(found,
Timer { m_runtime, timeout});
135 m_counter->increment(
id);
141 boost::optional<CbfPacket> packet;
143 auto& id_map = m_timers.right;
144 auto found = id_map.find(
id);
145 if (found != id_map.end()) {
146 const Timer& timer = found->second;
149 m_stored_bytes -= cbf_packet.
length();
151 packet.emplace(std::move(cbf_packet));
153 m_counter->remove(
id);
154 m_packets.erase(found->info);
163 const auto& id_map = m_timers.right;
164 auto found = id_map.find(
id);
165 return found != id_map.end() ? &(*found->info) :
nullptr;
170 return m_counter->counter(
id);
176 const Timer now { m_runtime, std::chrono::seconds(0) };
177 auto end = m_timers.left.upper_bound(now);
178 for (
auto it = m_timers.left.begin(); it != end;) {
180 const Timer& timer = it->first;
183 m_stored_bytes -= packet.
length();
185 m_timer_callback(std::move(packet).packet());
188 m_counter->remove(it->second);
189 m_packets.erase(it->info);
190 it = m_timers.left.erase(it);
194 if (!m_timers.empty()) {
201 const auto queuing_time = m_runtime.
now() - timer.start;
207 assert(!m_timers.empty());
209 Runtime::Callback cb = [
this](Clock::time_point) {
flush(); };
210 m_runtime.
schedule(m_timers.left.begin()->first.expiry, cb,
this);
214CbfPacketBuffer::Timer::Timer(
const Runtime& rt, Clock::duration timeout) :
215 expiry(rt.now() + timeout), start(rt.now())
219bool CbfPacketBuffer::Timer::operator<(
const Timer& other)
const
221 return this->expiry < other.expiry;
virtual void cancel(const void *scope)=0
virtual Clock::time_point now() const =0
virtual void schedule(Clock::time_point tp, const Callback &cb, const void *scope=nullptr)=0
std::size_t counter(const Identifier &packet) const
boost::optional< CbfPacket > fetch(const Identifier &id)
void remove_timer(typename timer_bimap::left_map::iterator)
bool remove(const Identifier &id)
void update(const Identifier &id, Clock::duration timeout)
void add(CbfPacket &&packet, Clock::duration timeout)
const CbfPacket * find(const Identifier &id) const
bool reduce_lifetime(const Timer &, CbfPacket &) const
std::size_t length() const
Clock::duration reduce_lifetime(Clock::duration d)