uvw  2.10.0
stream.h
1 #ifndef UVW_STREAM_INCLUDE_H
2 #define UVW_STREAM_INCLUDE_H
3 
4 
5 #include <algorithm>
6 #include <iterator>
7 #include <cstddef>
8 #include <utility>
9 #include <memory>
10 #include <uv.h>
11 #include "request.hpp"
12 #include "handle.hpp"
13 #include "loop.h"
14 
15 
16 namespace uvw {
17 
18 
24 struct ConnectEvent {};
25 
26 
32 struct EndEvent {};
33 
34 
40 struct ListenEvent {};
41 
42 
48 struct ShutdownEvent {};
49 
50 
56 struct WriteEvent {};
57 
58 
64 struct DataEvent {
65  explicit DataEvent(std::unique_ptr<char[]> buf, std::size_t len) noexcept;
66 
67  std::unique_ptr<char[]> data;
68  std::size_t length;
69 };
70 
71 
72 namespace details {
73 
74 
75 struct ConnectReq final: public Request<ConnectReq, uv_connect_t> {
76  using Request::Request;
77 
78  template<typename F, typename... Args>
79  void connect(F &&f, Args&&... args) {
80  invoke(std::forward<F>(f), get(), std::forward<Args>(args)..., &defaultCallback<ConnectEvent>);
81  }
82 };
83 
84 
85 struct ShutdownReq final: public Request<ShutdownReq, uv_shutdown_t> {
86  using Request::Request;
87 
88  void shutdown(uv_stream_t *handle);
89 };
90 
91 
92 template<typename Deleter>
93 class WriteReq final: public Request<WriteReq<Deleter>, uv_write_t> {
94  using ConstructorAccess = typename Request<WriteReq<Deleter>, uv_write_t>::ConstructorAccess;
95 
96 public:
97  WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<char[], Deleter> dt, unsigned int len)
98  : Request<WriteReq<Deleter>, uv_write_t>{ca, std::move(loop)},
99  data{std::move(dt)},
100  buf{uv_buf_init(data.get(), len)}
101  {}
102 
103  void write(uv_stream_t *handle) {
104  this->invoke(&uv_write, this->get(), handle, &buf, 1, &this->template defaultCallback<WriteEvent>);
105  }
106 
107  void write(uv_stream_t *handle, uv_stream_t *send) {
108  this->invoke(&uv_write2, this->get(), handle, &buf, 1, send, &this->template defaultCallback<WriteEvent>);
109  }
110 
111 private:
112  std::unique_ptr<char[], Deleter> data;
113  uv_buf_t buf;
114 };
115 
116 
117 }
118 
119 
127 template<typename T, typename U>
128 class StreamHandle: public Handle<T, U> {
129  static constexpr unsigned int DEFAULT_BACKLOG = 128;
130 
131  static void readCallback(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
132  T &ref = *(static_cast<T*>(handle->data));
133  // data will be destroyed no matter of what the value of nread is
134  std::unique_ptr<char[]> data{buf->base};
135 
136  // nread == 0 is ignored (see http://docs.libuv.org/en/v1.x/stream.html)
137  // equivalent to EAGAIN/EWOULDBLOCK, it shouldn't be treated as an error
138  // for we don't have data to emit though, it's fine to suppress it
139 
140  if(nread == UV_EOF) {
141  // end of stream
142  ref.publish(EndEvent{});
143  } else if(nread > 0) {
144  // data available
145  ref.publish(DataEvent{std::move(data), static_cast<std::size_t>(nread)});
146  } else if(nread < 0) {
147  // transmission error
148  ref.publish(ErrorEvent(nread));
149  }
150  }
151 
152  static void listenCallback(uv_stream_t *handle, int status) {
153  T &ref = *(static_cast<T*>(handle->data));
154  if(status) { ref.publish(ErrorEvent{status}); }
155  else { ref.publish(ListenEvent{}); }
156  }
157 
158 public:
159 #ifdef _MSC_VER
160  StreamHandle(typename Handle<T, U>::ConstructorAccess ca, std::shared_ptr<Loop> ref)
161  : Handle<T, U>{ca, std::move(ref)}
162  {}
163 #else
164  using Handle<T, U>::Handle;
165 #endif
166 
174  void shutdown() {
175  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
176  ptr->publish(event);
177  };
178 
179  auto shutdown = this->loop().template resource<details::ShutdownReq>();
180  shutdown->template once<ErrorEvent>(listener);
181  shutdown->template once<ShutdownEvent>(listener);
182  shutdown->shutdown(this->template get<uv_stream_t>());
183  }
184 
195  void listen(int backlog = DEFAULT_BACKLOG) {
196  this->invoke(&uv_listen, this->template get<uv_stream_t>(), backlog, &listenCallback);
197  }
198 
218  template<typename S>
219  void accept(S &ref) {
220  this->invoke(&uv_accept, this->template get<uv_stream_t>(), this->template get<uv_stream_t>(ref));
221  }
222 
230  void read() {
231  this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback);
232  }
233 
239  void stop() {
240  this->invoke(&uv_read_stop, this->template get<uv_stream_t>());
241  }
242 
255  template<typename Deleter>
256  void write(std::unique_ptr<char[], Deleter> data, unsigned int len) {
257  auto req = this->loop().template resource<details::WriteReq<Deleter>>(std::move(data), len);
258  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
259  ptr->publish(event);
260  };
261 
262  req->template once<ErrorEvent>(listener);
263  req->template once<WriteEvent>(listener);
264  req->write(this->template get<uv_stream_t>());
265  }
266 
279  void write(char *data, unsigned int len) {
280  auto req = this->loop().template resource<details::WriteReq<void(*)(char *)>>(std::unique_ptr<char[], void(*)(char *)>{data, [](char *) {}}, len);
281  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
282  ptr->publish(event);
283  };
284 
285  req->template once<ErrorEvent>(listener);
286  req->template once<WriteEvent>(listener);
287  req->write(this->template get<uv_stream_t>());
288  }
289 
309  template<typename S, typename Deleter>
310  void write(S &send, std::unique_ptr<char[], Deleter> data, unsigned int len) {
311  auto req = this->loop().template resource<details::WriteReq<Deleter>>(std::move(data), len);
312  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
313  ptr->publish(event);
314  };
315 
316  req->template once<ErrorEvent>(listener);
317  req->template once<WriteEvent>(listener);
318  req->write(this->template get<uv_stream_t>(), this->template get<uv_stream_t>(send));
319  }
320 
340  template<typename S>
341  void write(S &send, char *data, unsigned int len) {
342  auto req = this->loop().template resource<details::WriteReq<void(*)(char *)>>(std::unique_ptr<char[], void(*)(char *)>{data, [](char *) {}}, len);
343  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
344  ptr->publish(event);
345  };
346 
347  req->template once<ErrorEvent>(listener);
348  req->template once<WriteEvent>(listener);
349  req->write(this->template get<uv_stream_t>(), this->template get<uv_stream_t>(send));
350  }
351 
363  int tryWrite(std::unique_ptr<char[]> data, unsigned int len) {
364  uv_buf_t bufs[] = { uv_buf_init(data.get(), len) };
365  auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
366 
367  if(bw < 0) {
368  this->publish(ErrorEvent{bw});
369  bw = 0;
370  }
371 
372  return bw;
373  }
374 
386  template<typename V, typename W>
387  int tryWrite(std::unique_ptr<char[]> data, unsigned int len, StreamHandle<V, W> &send) {
388  uv_buf_t bufs[] = { uv_buf_init(data.get(), len) };
389  auto bw = uv_try_write2(this->template get<uv_stream_t>(), bufs, 1, send.raw());
390 
391  if(bw < 0) {
392  this->publish(ErrorEvent{bw});
393  bw = 0;
394  }
395 
396  return bw;
397  }
398 
410  int tryWrite(char *data, unsigned int len) {
411  uv_buf_t bufs[] = { uv_buf_init(data, len) };
412  auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
413 
414  if(bw < 0) {
415  this->publish(ErrorEvent{bw});
416  bw = 0;
417  }
418 
419  return bw;
420  }
421 
433  template<typename V, typename W>
434  int tryWrite(char *data, unsigned int len, StreamHandle<V, W> &send) {
435  uv_buf_t bufs[] = { uv_buf_init(data, len) };
436  auto bw = uv_try_write2(this->template get<uv_stream_t>(), bufs, 1, send.raw());
437 
438  if(bw < 0) {
439  this->publish(ErrorEvent{bw});
440  bw = 0;
441  }
442 
443  return bw;
444  }
445 
450  bool readable() const noexcept {
451  return (uv_is_readable(this->template get<uv_stream_t>()) == 1);
452  }
453 
458  bool writable() const noexcept {
459  return (uv_is_writable(this->template get<uv_stream_t>()) == 1);
460  }
461 
477  bool blocking(bool enable = false) {
478  return (0 == uv_stream_set_blocking(this->template get<uv_stream_t>(), enable));
479  }
480 
485  size_t writeQueueSize() const noexcept {
486  return uv_stream_get_write_queue_size(this->template get<uv_stream_t>());
487  }
488 };
489 
490 
491 }
492 
493 
494 #ifndef UVW_AS_LIB
495 #include "stream.cpp"
496 #endif
497 
498 #endif // UVW_STREAM_INCLUDE_H
Handle base class.
Definition: handle.hpp:30
Request base class.
Definition: request.hpp:21
std::shared_ptr< R > data() const
Gets user-defined data. uvw won't use this field in any case.
Definition: resource.hpp:52
The StreamHandle handle.
Definition: stream.h:128
void write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.h:341
void write(S &send, std::unique_ptr< char[], Deleter > data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.h:310
void read()
Starts reading data from an incoming stream.
Definition: stream.h:230
void shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
Definition: stream.h:174
int tryWrite(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.h:363
bool writable() const noexcept
Checks if the stream is writable.
Definition: stream.h:458
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
Definition: stream.h:477
int tryWrite(char *data, unsigned int len, StreamHandle< V, W > &send)
Queues a write request if it can be completed immediately.
Definition: stream.h:434
size_t writeQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition: stream.h:485
int tryWrite(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.h:410
bool readable() const noexcept
Checks if the stream is readable.
Definition: stream.h:450
void stop()
Stops reading data from the stream.
Definition: stream.h:239
void write(char *data, unsigned int len)
Writes data to the stream.
Definition: stream.h:279
void listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
Definition: stream.h:195
void accept(S &ref)
Accepts incoming connections.
Definition: stream.h:219
int tryWrite(std::unique_ptr< char[]> data, unsigned int len, StreamHandle< V, W > &send)
Queues a write request if it can be completed immediately.
Definition: stream.h:387
void write(std::unique_ptr< char[], Deleter > data, unsigned int len)
Writes data to the stream.
Definition: stream.h:256
Loop & loop() const noexcept
Gets the loop from which the resource was originated.
const U * raw() const noexcept
Gets the underlying raw data structure.
uvw default namespace.
Definition: async.h:10
ConnectEvent event.
Definition: stream.h:24
DataEvent event.
Definition: stream.h:64
std::unique_ptr< char[]> data
Definition: stream.h:67
std::size_t length
Definition: stream.h:68
EndEvent event.
Definition: stream.h:32
The ErrorEvent event.
Definition: emitter.h:25
ListenEvent event.
Definition: stream.h:40
ShutdownEvent event.
Definition: stream.h:48
WriteEvent event.
Definition: stream.h:56