mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-21 01:01:43 +00:00
Add braces to C++ single-line loops and conditionals (NFC) (#2973)
This makes code easier to read and more consistent between C++ and Java. Also update clang-format settings to always add a line break (even if no braces are used).
This commit is contained in:
@@ -82,9 +82,13 @@ static const unsigned char pr2six[256] = {
|
||||
|
||||
size_t Base64Decode(raw_ostream& os, StringRef encoded) {
|
||||
const unsigned char* end = encoded.bytes_begin();
|
||||
while (pr2six[*end] <= 63 && end != encoded.bytes_end()) ++end;
|
||||
while (pr2six[*end] <= 63 && end != encoded.bytes_end()) {
|
||||
++end;
|
||||
}
|
||||
size_t nprbytes = end - encoded.bytes_begin();
|
||||
if (nprbytes == 0) return 0;
|
||||
if (nprbytes == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const unsigned char* cur = encoded.bytes_begin();
|
||||
|
||||
@@ -97,12 +101,15 @@ size_t Base64Decode(raw_ostream& os, StringRef encoded) {
|
||||
}
|
||||
|
||||
// Note: (nprbytes == 1) would be an error, so just ignore that case
|
||||
if (nprbytes > 1)
|
||||
if (nprbytes > 1) {
|
||||
os << static_cast<unsigned char>(pr2six[cur[0]] << 2 | pr2six[cur[1]] >> 4);
|
||||
if (nprbytes > 2)
|
||||
}
|
||||
if (nprbytes > 2) {
|
||||
os << static_cast<unsigned char>(pr2six[cur[1]] << 4 | pr2six[cur[2]] >> 2);
|
||||
if (nprbytes > 3)
|
||||
}
|
||||
if (nprbytes > 3) {
|
||||
os << static_cast<unsigned char>(pr2six[cur[2]] << 6 | pr2six[cur[3]]);
|
||||
}
|
||||
|
||||
return (end - encoded.bytes_begin()) + ((4 - nprbytes) & 3);
|
||||
}
|
||||
@@ -127,7 +134,9 @@ static const char basis_64[] =
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
|
||||
|
||||
void Base64Encode(raw_ostream& os, StringRef plain) {
|
||||
if (plain.empty()) return;
|
||||
if (plain.empty()) {
|
||||
return;
|
||||
}
|
||||
size_t len = plain.size();
|
||||
|
||||
size_t i;
|
||||
|
||||
@@ -18,7 +18,9 @@ class EventLoopRunner::Thread : public SafeThread {
|
||||
|
||||
Thread() : m_loop(uv::Loop::Create()) {
|
||||
// set up async handles
|
||||
if (!m_loop) return;
|
||||
if (!m_loop) {
|
||||
return;
|
||||
}
|
||||
|
||||
// run function
|
||||
m_doExec = UvExecFunc::Create(
|
||||
@@ -29,7 +31,9 @@ class EventLoopRunner::Thread : public SafeThread {
|
||||
}
|
||||
|
||||
void Main() {
|
||||
if (m_loop) m_loop->Run();
|
||||
if (m_loop) {
|
||||
m_loop->Run();
|
||||
}
|
||||
}
|
||||
|
||||
// the loop
|
||||
@@ -39,9 +43,13 @@ class EventLoopRunner::Thread : public SafeThread {
|
||||
std::weak_ptr<UvExecFunc> m_doExec;
|
||||
};
|
||||
|
||||
EventLoopRunner::EventLoopRunner() { m_owner.Start(); }
|
||||
EventLoopRunner::EventLoopRunner() {
|
||||
m_owner.Start();
|
||||
}
|
||||
|
||||
EventLoopRunner::~EventLoopRunner() { Stop(); }
|
||||
EventLoopRunner::~EventLoopRunner() {
|
||||
Stop();
|
||||
}
|
||||
|
||||
void EventLoopRunner::Stop() {
|
||||
ExecAsync([](uv::Loop& loop) {
|
||||
@@ -69,10 +77,14 @@ void EventLoopRunner::ExecSync(LoopFunc func) {
|
||||
f = doExec->Call(func);
|
||||
}
|
||||
}
|
||||
if (f.valid()) f.wait();
|
||||
if (f.valid()) {
|
||||
f.wait();
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<uv::Loop> EventLoopRunner::GetLoop() {
|
||||
if (auto thr = m_owner.GetThread()) return thr->m_loop;
|
||||
if (auto thr = m_owner.GetThread()) {
|
||||
return thr->m_loop;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@@ -34,7 +34,9 @@ HttpParser::HttpParser(Type type) {
|
||||
m_settings.on_url = [](http_parser* p, const char* at, size_t length) -> int {
|
||||
auto& self = *static_cast<HttpParser*>(p->data);
|
||||
// append to buffer
|
||||
if ((self.m_urlBuf.size() + length) > self.m_maxLength) return 1;
|
||||
if ((self.m_urlBuf.size() + length) > self.m_maxLength) {
|
||||
return 1;
|
||||
}
|
||||
self.m_urlBuf += StringRef{at, length};
|
||||
self.m_state = kUrl;
|
||||
return 0;
|
||||
@@ -45,7 +47,9 @@ HttpParser::HttpParser(Type type) {
|
||||
size_t length) -> int {
|
||||
auto& self = *static_cast<HttpParser*>(p->data);
|
||||
// use valueBuf for the status
|
||||
if ((self.m_valueBuf.size() + length) > self.m_maxLength) return 1;
|
||||
if ((self.m_valueBuf.size() + length) > self.m_maxLength) {
|
||||
return 1;
|
||||
}
|
||||
self.m_valueBuf += StringRef{at, length};
|
||||
self.m_state = kStatus;
|
||||
return 0;
|
||||
@@ -59,19 +63,25 @@ HttpParser::HttpParser(Type type) {
|
||||
// once we're in header, we know the URL is complete
|
||||
if (self.m_state == kUrl) {
|
||||
self.url(self.m_urlBuf);
|
||||
if (self.m_aborted) return 1;
|
||||
if (self.m_aborted) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
// once we're in header, we know the status is complete
|
||||
if (self.m_state == kStatus) {
|
||||
self.status(self.m_valueBuf);
|
||||
if (self.m_aborted) return 1;
|
||||
if (self.m_aborted) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
// if we previously were in value state, that means we finished a header
|
||||
if (self.m_state == kValue) {
|
||||
self.header(self.m_fieldBuf, self.m_valueBuf);
|
||||
if (self.m_aborted) return 1;
|
||||
if (self.m_aborted) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
// clear field and value when we enter this state
|
||||
@@ -82,7 +92,9 @@ HttpParser::HttpParser(Type type) {
|
||||
}
|
||||
|
||||
// append data to field buffer
|
||||
if ((self.m_fieldBuf.size() + length) > self.m_maxLength) return 1;
|
||||
if ((self.m_fieldBuf.size() + length) > self.m_maxLength) {
|
||||
return 1;
|
||||
}
|
||||
self.m_fieldBuf += StringRef{at, length};
|
||||
return 0;
|
||||
};
|
||||
@@ -99,7 +111,9 @@ HttpParser::HttpParser(Type type) {
|
||||
}
|
||||
|
||||
// append data to value buffer
|
||||
if ((self.m_valueBuf.size() + length) > self.m_maxLength) return 1;
|
||||
if ((self.m_valueBuf.size() + length) > self.m_maxLength) {
|
||||
return 1;
|
||||
}
|
||||
self.m_valueBuf += StringRef{at, length};
|
||||
return 0;
|
||||
};
|
||||
@@ -111,19 +125,25 @@ HttpParser::HttpParser(Type type) {
|
||||
// if we previously were in url state, that means we finished the url
|
||||
if (self.m_state == kUrl) {
|
||||
self.url(self.m_urlBuf);
|
||||
if (self.m_aborted) return 1;
|
||||
if (self.m_aborted) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
// if we previously were in status state, that means we finished the status
|
||||
if (self.m_state == kStatus) {
|
||||
self.status(self.m_valueBuf);
|
||||
if (self.m_aborted) return 1;
|
||||
if (self.m_aborted) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
// if we previously were in value state, that means we finished a header
|
||||
if (self.m_state == kValue) {
|
||||
self.header(self.m_fieldBuf, self.m_valueBuf);
|
||||
if (self.m_aborted) return 1;
|
||||
if (self.m_aborted) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
self.headersComplete(self.ShouldKeepAlive());
|
||||
|
||||
@@ -60,15 +60,23 @@ void HttpServerConnection::BuildHeader(raw_ostream& os, int code,
|
||||
const Twine& extra) {
|
||||
os << "HTTP/" << m_request.GetMajor() << '.' << m_request.GetMinor() << ' '
|
||||
<< code << ' ' << codeText << "\r\n";
|
||||
if (contentLength == 0) m_keepAlive = false;
|
||||
if (!m_keepAlive) os << "Connection: close\r\n";
|
||||
if (contentLength == 0) {
|
||||
m_keepAlive = false;
|
||||
}
|
||||
if (!m_keepAlive) {
|
||||
os << "Connection: close\r\n";
|
||||
}
|
||||
BuildCommonHeaders(os);
|
||||
os << "Content-Type: " << contentType << "\r\n";
|
||||
if (contentLength != 0) os << "Content-Length: " << contentLength << "\r\n";
|
||||
if (contentLength != 0) {
|
||||
os << "Content-Length: " << contentLength << "\r\n";
|
||||
}
|
||||
os << "Access-Control-Allow-Origin: *\r\nAccess-Control-Allow-Methods: *\r\n";
|
||||
SmallString<128> extraBuf;
|
||||
StringRef extraStr = extra.toStringRef(extraBuf);
|
||||
if (!extraStr.empty()) os << extraStr;
|
||||
if (!extraStr.empty()) {
|
||||
os << extraStr;
|
||||
}
|
||||
os << "\r\n"; // header ends with a blank line
|
||||
}
|
||||
|
||||
@@ -76,8 +84,12 @@ void HttpServerConnection::SendData(ArrayRef<uv::Buffer> bufs,
|
||||
bool closeAfter) {
|
||||
m_stream.Write(bufs, [closeAfter, stream = &m_stream](
|
||||
MutableArrayRef<uv::Buffer> bufs, uv::Error) {
|
||||
for (auto&& buf : bufs) buf.Deallocate();
|
||||
if (closeAfter) stream->Close();
|
||||
for (auto&& buf : bufs) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
if (closeAfter) {
|
||||
stream->Close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -100,8 +112,9 @@ void HttpServerConnection::SendStaticResponse(int code, const Twine& codeText,
|
||||
// TODO: handle remote side not accepting gzip (very rare)
|
||||
|
||||
StringRef contentEncodingHeader;
|
||||
if (gzipped /* && m_acceptGzip*/)
|
||||
if (gzipped /* && m_acceptGzip*/) {
|
||||
contentEncodingHeader = "Content-Encoding: gzip\r\n";
|
||||
}
|
||||
|
||||
SmallVector<uv::Buffer, 4> bufs;
|
||||
raw_uv_ostream os{bufs, 4096};
|
||||
@@ -113,8 +126,12 @@ void HttpServerConnection::SendStaticResponse(int code, const Twine& codeText,
|
||||
m_stream.Write(bufs, [closeAfter = !m_keepAlive, stream = &m_stream](
|
||||
MutableArrayRef<uv::Buffer> bufs, uv::Error) {
|
||||
// don't deallocate the static content
|
||||
for (auto&& buf : bufs.drop_back()) buf.Deallocate();
|
||||
if (closeAfter) stream->Close();
|
||||
for (auto&& buf : bufs.drop_back()) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
if (closeAfter) {
|
||||
stream->Close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -23,10 +23,11 @@ StringRef UnescapeURI(const Twine& str, SmallVectorImpl<char>& buf,
|
||||
// pass non-escaped characters to output
|
||||
if (*i != '%') {
|
||||
// decode + to space
|
||||
if (*i == '+')
|
||||
if (*i == '+') {
|
||||
buf.push_back(' ');
|
||||
else
|
||||
} else {
|
||||
buf.push_back(*i);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -92,17 +93,23 @@ HttpQueryMap::HttpQueryMap(wpi::StringRef query) {
|
||||
bool err = false;
|
||||
auto name = wpi::UnescapeURI(nameEsc, nameBuf, &err);
|
||||
// note: ignores duplicates
|
||||
if (!err) m_elems.try_emplace(name, valueEsc);
|
||||
if (!err) {
|
||||
m_elems.try_emplace(name, valueEsc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<wpi::StringRef> HttpQueryMap::Get(
|
||||
wpi::StringRef name, wpi::SmallVectorImpl<char>& buf) const {
|
||||
auto it = m_elems.find(name);
|
||||
if (it == m_elems.end()) return {};
|
||||
if (it == m_elems.end()) {
|
||||
return {};
|
||||
}
|
||||
bool err = false;
|
||||
auto val = wpi::UnescapeURI(it->second, buf, &err);
|
||||
if (err) return {};
|
||||
if (err) {
|
||||
return {};
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@@ -128,12 +135,16 @@ HttpPath::HttpPath(wpi::StringRef path) {
|
||||
}
|
||||
|
||||
bool HttpPath::startswith(size_t start, ArrayRef<StringRef> match) const {
|
||||
if (m_pathEnds.size() < (start + match.size())) return false;
|
||||
if (m_pathEnds.size() < (start + match.size())) {
|
||||
return false;
|
||||
}
|
||||
bool first = start == 0;
|
||||
auto p = m_pathEnds.begin() + start;
|
||||
for (auto m : match) {
|
||||
auto val = m_pathBuf.slice(first ? 0 : *(p - 1), *p);
|
||||
if (val != m) return false;
|
||||
if (val != m) {
|
||||
return false;
|
||||
}
|
||||
first = false;
|
||||
++p;
|
||||
}
|
||||
@@ -142,16 +153,24 @@ bool HttpPath::startswith(size_t start, ArrayRef<StringRef> match) const {
|
||||
|
||||
bool ParseHttpHeaders(raw_istream& is, SmallVectorImpl<char>* contentType,
|
||||
SmallVectorImpl<char>* contentLength) {
|
||||
if (contentType) contentType->clear();
|
||||
if (contentLength) contentLength->clear();
|
||||
if (contentType) {
|
||||
contentType->clear();
|
||||
}
|
||||
if (contentLength) {
|
||||
contentLength->clear();
|
||||
}
|
||||
|
||||
bool inContentType = false;
|
||||
bool inContentLength = false;
|
||||
SmallString<64> lineBuf;
|
||||
for (;;) {
|
||||
StringRef line = is.getline(lineBuf, 1024).rtrim();
|
||||
if (is.has_error()) return false;
|
||||
if (line.empty()) return true; // empty line signals end of headers
|
||||
if (is.has_error()) {
|
||||
return false;
|
||||
}
|
||||
if (line.empty()) {
|
||||
return true; // empty line signals end of headers
|
||||
}
|
||||
|
||||
// header fields start at the beginning of the line
|
||||
if (!std::isspace(line[0])) {
|
||||
@@ -160,22 +179,24 @@ bool ParseHttpHeaders(raw_istream& is, SmallVectorImpl<char>* contentType,
|
||||
StringRef field;
|
||||
std::tie(field, line) = line.split(':');
|
||||
field = field.rtrim();
|
||||
if (field.equals_lower("content-type"))
|
||||
if (field.equals_lower("content-type")) {
|
||||
inContentType = true;
|
||||
else if (field.equals_lower("content-length"))
|
||||
} else if (field.equals_lower("content-length")) {
|
||||
inContentLength = true;
|
||||
else
|
||||
} else {
|
||||
continue; // ignore other fields
|
||||
}
|
||||
}
|
||||
|
||||
// collapse whitespace
|
||||
line = line.ltrim();
|
||||
|
||||
// save field data
|
||||
if (inContentType && contentType)
|
||||
if (inContentType && contentType) {
|
||||
contentType->append(line.begin(), line.end());
|
||||
else if (inContentLength && contentLength)
|
||||
} else if (inContentLength && contentLength) {
|
||||
contentLength->append(line.begin(), line.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,7 +212,9 @@ bool FindMultipartBoundary(raw_istream& is, StringRef boundary,
|
||||
if (!saveBuf) {
|
||||
do {
|
||||
is.read(searchBuf.data(), 1);
|
||||
if (is.has_error()) return false;
|
||||
if (is.has_error()) {
|
||||
return false;
|
||||
}
|
||||
} while (searchBuf[0] == '\r' || searchBuf[0] == '\n');
|
||||
searchPos = 1;
|
||||
}
|
||||
@@ -202,19 +225,26 @@ bool FindMultipartBoundary(raw_istream& is, StringRef boundary,
|
||||
// there's a bunch of continuous -'s in the output, but that's unlikely.
|
||||
for (;;) {
|
||||
is.read(searchBuf.data() + searchPos, searchBuf.size() - searchPos);
|
||||
if (is.has_error()) return false;
|
||||
if (is.has_error()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Did we find the boundary?
|
||||
if (searchBuf[0] == '-' && searchBuf[1] == '-' &&
|
||||
searchBuf.substr(2) == boundary)
|
||||
searchBuf.substr(2) == boundary) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Fast-scan for '-'
|
||||
size_t pos = searchBuf.find('-', searchBuf[0] == '-' ? 1 : 0);
|
||||
if (pos == StringRef::npos) {
|
||||
if (saveBuf) saveBuf->append(searchBuf.data(), searchBuf.size());
|
||||
if (saveBuf) {
|
||||
saveBuf->append(searchBuf.data(), searchBuf.size());
|
||||
}
|
||||
} else {
|
||||
if (saveBuf) saveBuf->append(searchBuf.data(), pos);
|
||||
if (saveBuf) {
|
||||
saveBuf->append(searchBuf.data(), pos);
|
||||
}
|
||||
|
||||
// move '-' and following to start of buffer (next read will fill)
|
||||
std::memmove(searchBuf.data(), searchBuf.data() + pos,
|
||||
@@ -305,7 +335,9 @@ HttpLocation::HttpLocation(const Twine& url_, bool* error,
|
||||
// split out next param and value
|
||||
StringRef rawParam, rawValue;
|
||||
std::tie(rawParam, query) = query.split('&');
|
||||
if (rawParam.empty()) continue; // ignore "&&"
|
||||
if (rawParam.empty()) {
|
||||
continue; // ignore "&&"
|
||||
}
|
||||
std::tie(rawParam, rawValue) = rawParam.split('=');
|
||||
|
||||
// unescape param
|
||||
@@ -350,8 +382,9 @@ bool HttpConnection::Handshake(const HttpRequest& request,
|
||||
// send GET request
|
||||
os << "GET /" << request.path << " HTTP/1.1\r\n";
|
||||
os << "Host: " << request.host << "\r\n";
|
||||
if (!request.auth.empty())
|
||||
if (!request.auth.empty()) {
|
||||
os << "Authorization: Basic " << request.auth << "\r\n";
|
||||
}
|
||||
os << "\r\n";
|
||||
os.flush();
|
||||
|
||||
@@ -404,8 +437,12 @@ void HttpMultipartScanner::Reset(bool saveSkipped) {
|
||||
}
|
||||
|
||||
StringRef HttpMultipartScanner::Execute(StringRef in) {
|
||||
if (m_state == kDone) Reset(m_saveSkipped);
|
||||
if (m_saveSkipped) m_buf += in;
|
||||
if (m_state == kDone) {
|
||||
Reset(m_saveSkipped);
|
||||
}
|
||||
if (m_saveSkipped) {
|
||||
m_buf += in;
|
||||
}
|
||||
|
||||
size_t pos = 0;
|
||||
if (m_state == kBoundary) {
|
||||
@@ -451,7 +488,9 @@ StringRef HttpMultipartScanner::Execute(StringRef in) {
|
||||
if (ch == '\n') {
|
||||
// Found the LF; return remaining input buffer (following it)
|
||||
m_state = kDone;
|
||||
if (m_saveSkipped) m_buf.resize(m_buf.size() - in.size() + pos);
|
||||
if (m_saveSkipped) {
|
||||
m_buf.resize(m_buf.size() - in.size() + pos);
|
||||
}
|
||||
return in.drop_front(pos);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,9 @@ static void CopyStream(uv::Stream& in, std::weak_ptr<uv::Stream> outWeak) {
|
||||
return;
|
||||
}
|
||||
out->Write(buf2, [](auto bufs, uv::Error) {
|
||||
for (auto buf : bufs) buf.Deallocate();
|
||||
for (auto buf : bufs) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -55,7 +57,9 @@ void PortForwarder::Add(unsigned int port, const Twine& remoteHost,
|
||||
host = remoteHost.str(), remotePort] {
|
||||
auto& loop = serverPtr->GetLoopRef();
|
||||
auto client = serverPtr->Accept();
|
||||
if (!client) return;
|
||||
if (!client) {
|
||||
return;
|
||||
}
|
||||
|
||||
// close on error
|
||||
client->error.connect(
|
||||
@@ -70,7 +74,9 @@ void PortForwarder::Add(unsigned int port, const Twine& remoteHost,
|
||||
[remotePtr = remote.get(),
|
||||
clientWeak = std::weak_ptr<uv::Tcp>(client)](uv::Error err) {
|
||||
remotePtr->Close();
|
||||
if (auto client = clientWeak.lock()) client->Close();
|
||||
if (auto client = clientWeak.lock()) {
|
||||
client->Close();
|
||||
}
|
||||
});
|
||||
|
||||
// convert port to string
|
||||
@@ -83,7 +89,9 @@ void PortForwarder::Add(unsigned int port, const Twine& remoteHost,
|
||||
[clientWeak = std::weak_ptr<uv::Tcp>(client),
|
||||
remoteWeak = std::weak_ptr<uv::Tcp>(remote)](const addrinfo& addr) {
|
||||
auto remote = remoteWeak.lock();
|
||||
if (!remote) return;
|
||||
if (!remote) {
|
||||
return;
|
||||
}
|
||||
|
||||
// connect to remote address/port
|
||||
remote->Connect(*addr.ai_addr, [remotePtr = remote.get(),
|
||||
@@ -98,11 +106,15 @@ void PortForwarder::Add(unsigned int port, const Twine& remoteHost,
|
||||
// close both when either side closes
|
||||
client->end.connect([clientPtr = client.get(), remoteWeak] {
|
||||
clientPtr->Close();
|
||||
if (auto remote = remoteWeak.lock()) remote->Close();
|
||||
if (auto remote = remoteWeak.lock()) {
|
||||
remote->Close();
|
||||
}
|
||||
});
|
||||
remotePtr->end.connect([remotePtr, clientWeak] {
|
||||
remotePtr->Close();
|
||||
if (auto client = clientWeak.lock()) client->Close();
|
||||
if (auto client = clientWeak.lock()) {
|
||||
client->Close();
|
||||
}
|
||||
});
|
||||
|
||||
// copy bidirectionally
|
||||
@@ -121,10 +133,12 @@ void PortForwarder::Add(unsigned int port, const Twine& remoteHost,
|
||||
remoteWeak = std::weak_ptr<uv::Tcp>(remote)] {
|
||||
if (auto connected = connectedWeak.lock()) {
|
||||
if (!*connected) {
|
||||
if (auto client = clientWeak.lock())
|
||||
if (auto client = clientWeak.lock()) {
|
||||
client->Close();
|
||||
if (auto remote = remoteWeak.lock())
|
||||
}
|
||||
if (auto remote = remoteWeak.lock()) {
|
||||
remote->Close();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -9,7 +9,9 @@ using namespace wpi;
|
||||
detail::SafeThreadProxyBase::SafeThreadProxyBase(
|
||||
std::shared_ptr<SafeThread> thr)
|
||||
: m_thread(std::move(thr)) {
|
||||
if (!m_thread) return;
|
||||
if (!m_thread) {
|
||||
return;
|
||||
}
|
||||
m_lock = std::unique_lock<wpi::mutex>(m_thread->m_mutex);
|
||||
if (!m_thread->m_active) {
|
||||
m_lock.unlock();
|
||||
@@ -19,15 +21,18 @@ detail::SafeThreadProxyBase::SafeThreadProxyBase(
|
||||
}
|
||||
|
||||
detail::SafeThreadOwnerBase::~SafeThreadOwnerBase() {
|
||||
if (m_joinAtExit)
|
||||
if (m_joinAtExit) {
|
||||
Join();
|
||||
else
|
||||
} else {
|
||||
Stop();
|
||||
}
|
||||
}
|
||||
|
||||
void detail::SafeThreadOwnerBase::Start(std::shared_ptr<SafeThread> thr) {
|
||||
std::scoped_lock lock(m_mutex);
|
||||
if (auto thr = m_thread.lock()) return;
|
||||
if (auto thr = m_thread.lock()) {
|
||||
return;
|
||||
}
|
||||
m_stdThread = std::thread([=] { thr->Main(); });
|
||||
thr->m_threadId = m_stdThread.get_id();
|
||||
m_thread = thr;
|
||||
@@ -40,7 +45,9 @@ void detail::SafeThreadOwnerBase::Stop() {
|
||||
thr->m_cond.notify_all();
|
||||
m_thread.reset();
|
||||
}
|
||||
if (m_stdThread.joinable()) m_stdThread.detach();
|
||||
if (m_stdThread.joinable()) {
|
||||
m_stdThread.detach();
|
||||
}
|
||||
}
|
||||
|
||||
void detail::SafeThreadOwnerBase::Join() {
|
||||
@@ -59,7 +66,9 @@ void detail::SafeThreadOwnerBase::Join() {
|
||||
|
||||
void detail::swap(SafeThreadOwnerBase& lhs, SafeThreadOwnerBase& rhs) noexcept {
|
||||
using std::swap;
|
||||
if (&lhs == &rhs) return;
|
||||
if (&lhs == &rhs) {
|
||||
return;
|
||||
}
|
||||
std::scoped_lock lock(lhs.m_mutex, rhs.m_mutex);
|
||||
std::swap(lhs.m_stdThread, rhs.m_stdThread);
|
||||
std::swap(lhs.m_thread, rhs.m_thread);
|
||||
|
||||
@@ -73,7 +73,9 @@ TCPAcceptor::~TCPAcceptor() {
|
||||
}
|
||||
|
||||
int TCPAcceptor::start() {
|
||||
if (m_listening) return 0;
|
||||
if (m_listening) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
m_lsd = socket(PF_INET, SOCK_STREAM, 0);
|
||||
if (m_lsd < 0) {
|
||||
@@ -153,7 +155,8 @@ void TCPAcceptor::shutdown() {
|
||||
address.sin_port = htons(m_port);
|
||||
|
||||
int result = -1, sd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (sd < 0) return;
|
||||
if (sd < 0)
|
||||
return;
|
||||
|
||||
// Set socket to non-blocking
|
||||
u_long mode = 1;
|
||||
@@ -176,7 +179,9 @@ void TCPAcceptor::shutdown() {
|
||||
}
|
||||
|
||||
std::unique_ptr<NetworkStream> TCPAcceptor::accept() {
|
||||
if (!m_listening || m_shutdown) return nullptr;
|
||||
if (!m_listening || m_shutdown) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
struct sockaddr_in address;
|
||||
#ifdef _WIN32
|
||||
@@ -187,9 +192,10 @@ std::unique_ptr<NetworkStream> TCPAcceptor::accept() {
|
||||
std::memset(&address, 0, sizeof(address));
|
||||
int sd = ::accept(m_lsd, (struct sockaddr*)&address, &len);
|
||||
if (sd < 0) {
|
||||
if (!m_shutdown)
|
||||
if (!m_shutdown) {
|
||||
WPI_ERROR(m_logger, "accept() on port "
|
||||
<< m_port << " failed: " << SocketStrerror());
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
if (m_shutdown) {
|
||||
|
||||
@@ -144,9 +144,10 @@ std::unique_ptr<NetworkStream> TCPConnector::connect(const char* server,
|
||||
"could not set socket to non-blocking: " << SocketStrerror());
|
||||
} else {
|
||||
arg |= O_NONBLOCK;
|
||||
if (fcntl(sd, F_SETFL, arg) < 0)
|
||||
if (fcntl(sd, F_SETFL, arg) < 0) {
|
||||
WPI_WARNING(logger,
|
||||
"could not set socket to non-blocking: " << SocketStrerror());
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -171,10 +172,10 @@ std::unique_ptr<NetworkStream> TCPConnector::connect(const char* server,
|
||||
WPI_ERROR(logger, "select() to " << server << " port " << port
|
||||
<< " error " << valopt << " - "
|
||||
<< SocketStrerror(valopt));
|
||||
}
|
||||
// connection established
|
||||
else
|
||||
} else {
|
||||
// connection established
|
||||
result = 0;
|
||||
}
|
||||
} else {
|
||||
WPI_INFO(logger,
|
||||
"connect() to " << server << " port " << port << " timed out");
|
||||
@@ -199,9 +200,10 @@ std::unique_ptr<NetworkStream> TCPConnector::connect(const char* server,
|
||||
"could not set socket to blocking: " << SocketStrerror());
|
||||
} else {
|
||||
arg &= (~O_NONBLOCK);
|
||||
if (fcntl(sd, F_SETFL, arg) < 0)
|
||||
if (fcntl(sd, F_SETFL, arg) < 0) {
|
||||
WPI_WARNING(logger,
|
||||
"could not set socket to blocking: " << SocketStrerror());
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
@@ -26,7 +26,9 @@ using namespace wpi;
|
||||
std::unique_ptr<NetworkStream> TCPConnector::connect_parallel(
|
||||
ArrayRef<std::pair<const char*, int>> servers, Logger& logger,
|
||||
int timeout) {
|
||||
if (servers.empty()) return nullptr;
|
||||
if (servers.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// structure to make sure we don't start duplicate workers
|
||||
struct GlobalState {
|
||||
@@ -72,7 +74,9 @@ std::unique_ptr<NetworkStream> TCPConnector::connect_parallel(
|
||||
// attempt to the same server
|
||||
{
|
||||
std::scoped_lock lock(local->mtx);
|
||||
if (local->active.count(active_tracker) > 0) continue; // already in set
|
||||
if (local->active.count(active_tracker) > 0) {
|
||||
continue; // already in set
|
||||
}
|
||||
}
|
||||
|
||||
++num_workers;
|
||||
@@ -99,7 +103,9 @@ std::unique_ptr<NetworkStream> TCPConnector::connect_parallel(
|
||||
// successful connection
|
||||
if (stream) {
|
||||
std::scoped_lock lock(result->mtx);
|
||||
if (!result->done.exchange(true)) result->stream = std::move(stream);
|
||||
if (!result->done.exchange(true)) {
|
||||
result->stream = std::move(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
++result->count;
|
||||
|
||||
@@ -57,7 +57,9 @@ TCPStream::TCPStream(int sd, sockaddr_in* address)
|
||||
m_peerPort = ntohs(address->sin_port);
|
||||
}
|
||||
|
||||
TCPStream::~TCPStream() { close(); }
|
||||
TCPStream::~TCPStream() {
|
||||
close();
|
||||
}
|
||||
|
||||
size_t TCPStream::send(const char* buffer, size_t len, Error* err) {
|
||||
if (m_sd < 0) {
|
||||
@@ -101,10 +103,11 @@ size_t TCPStream::send(const char* buffer, size_t len, Error* err) {
|
||||
ssize_t rv = ::send(m_sd, buffer, len, 0);
|
||||
#endif
|
||||
if (rv < 0) {
|
||||
if (!m_blocking && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||
if (!m_blocking && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
||||
*err = kWouldBlock;
|
||||
else
|
||||
} else {
|
||||
*err = kConnectionReset;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
@@ -139,13 +142,14 @@ size_t TCPStream::receive(char* buffer, size_t len, Error* err, int timeout) {
|
||||
}
|
||||
if (rv < 0) {
|
||||
#ifdef _WIN32
|
||||
if (!m_blocking && WSAGetLastError() == WSAEWOULDBLOCK)
|
||||
if (!m_blocking && WSAGetLastError() == WSAEWOULDBLOCK) {
|
||||
#else
|
||||
if (!m_blocking && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||
if (!m_blocking && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
||||
#endif
|
||||
*err = kWouldBlock;
|
||||
else
|
||||
} else {
|
||||
*err = kConnectionReset;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
return static_cast<size_t>(rv);
|
||||
@@ -164,35 +168,52 @@ void TCPStream::close() {
|
||||
m_sd = -1;
|
||||
}
|
||||
|
||||
StringRef TCPStream::getPeerIP() const { return m_peerIP; }
|
||||
StringRef TCPStream::getPeerIP() const {
|
||||
return m_peerIP;
|
||||
}
|
||||
|
||||
int TCPStream::getPeerPort() const { return m_peerPort; }
|
||||
int TCPStream::getPeerPort() const {
|
||||
return m_peerPort;
|
||||
}
|
||||
|
||||
void TCPStream::setNoDelay() {
|
||||
if (m_sd < 0) return;
|
||||
if (m_sd < 0) {
|
||||
return;
|
||||
}
|
||||
int optval = 1;
|
||||
setsockopt(m_sd, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&optval),
|
||||
sizeof optval);
|
||||
}
|
||||
|
||||
bool TCPStream::setBlocking(bool enabled) {
|
||||
if (m_sd < 0) return true; // silently accept
|
||||
if (m_sd < 0) {
|
||||
return true; // silently accept
|
||||
}
|
||||
#ifdef _WIN32
|
||||
u_long mode = enabled ? 0 : 1;
|
||||
if (ioctlsocket(m_sd, FIONBIO, &mode) == SOCKET_ERROR) return false;
|
||||
if (ioctlsocket(m_sd, FIONBIO, &mode) == SOCKET_ERROR) {
|
||||
return false;
|
||||
}
|
||||
#else
|
||||
int flags = fcntl(m_sd, F_GETFL, nullptr);
|
||||
if (flags < 0) return false;
|
||||
if (enabled)
|
||||
if (flags < 0) {
|
||||
return false;
|
||||
}
|
||||
if (enabled) {
|
||||
flags &= ~O_NONBLOCK;
|
||||
else
|
||||
} else {
|
||||
flags |= O_NONBLOCK;
|
||||
if (fcntl(m_sd, F_SETFL, flags) < 0) return false;
|
||||
}
|
||||
if (fcntl(m_sd, F_SETFL, flags) < 0) {
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
int TCPStream::getNativeHandle() const { return m_sd; }
|
||||
int TCPStream::getNativeHandle() const {
|
||||
return m_sd;
|
||||
}
|
||||
|
||||
bool TCPStream::WaitForReadEvent(int timeout) {
|
||||
fd_set sdset;
|
||||
|
||||
@@ -41,7 +41,9 @@ UDPClient::~UDPClient() {
|
||||
}
|
||||
|
||||
UDPClient& UDPClient::operator=(UDPClient&& other) {
|
||||
if (this == &other) return *this;
|
||||
if (this == &other) {
|
||||
return *this;
|
||||
}
|
||||
shutdown();
|
||||
m_logger = other.m_logger;
|
||||
m_lsd = other.m_lsd;
|
||||
@@ -52,10 +54,14 @@ UDPClient& UDPClient::operator=(UDPClient&& other) {
|
||||
return *this;
|
||||
}
|
||||
|
||||
int UDPClient::start() { return start(0); }
|
||||
int UDPClient::start() {
|
||||
return start(0);
|
||||
}
|
||||
|
||||
int UDPClient::start(int port) {
|
||||
if (m_lsd > 0) return 0;
|
||||
if (m_lsd > 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef _WIN32
|
||||
WSAData wsaData;
|
||||
@@ -186,14 +192,18 @@ int UDPClient::send(StringRef data, const Twine& server, int port) {
|
||||
}
|
||||
|
||||
int UDPClient::receive(uint8_t* data_received, int receive_len) {
|
||||
if (m_port == 0) return -1; // return if not receiving
|
||||
if (m_port == 0) {
|
||||
return -1; // return if not receiving
|
||||
}
|
||||
return recv(m_lsd, reinterpret_cast<char*>(data_received), receive_len, 0);
|
||||
}
|
||||
|
||||
int UDPClient::receive(uint8_t* data_received, int receive_len,
|
||||
SmallVectorImpl<char>* addr_received,
|
||||
int* port_received) {
|
||||
if (m_port == 0) return -1; // return if not receiving
|
||||
if (m_port == 0) {
|
||||
return -1; // return if not receiving
|
||||
}
|
||||
|
||||
struct sockaddr_in remote;
|
||||
socklen_t remote_len = sizeof(remote);
|
||||
@@ -222,13 +232,17 @@ int UDPClient::receive(uint8_t* data_received, int receive_len,
|
||||
}
|
||||
|
||||
int UDPClient::set_timeout(double timeout) {
|
||||
if (timeout < 0) return -1;
|
||||
if (timeout < 0) {
|
||||
return -1;
|
||||
}
|
||||
struct timeval tv;
|
||||
tv.tv_sec = timeout; // truncating will give seconds
|
||||
timeout -= tv.tv_sec; // remove seconds portion
|
||||
tv.tv_usec = timeout * 1000000; // fractions of a second to us
|
||||
int ret = setsockopt(m_lsd, SOL_SOCKET, SO_RCVTIMEO,
|
||||
reinterpret_cast<char*>(&tv), sizeof(tv));
|
||||
if (ret < 0) WPI_ERROR(m_logger, "set timeout failed");
|
||||
if (ret < 0) {
|
||||
WPI_ERROR(m_logger, "set timeout failed");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -23,7 +23,9 @@ class WebSocketWriteReq : public uv::WriteReq {
|
||||
std::function<void(MutableArrayRef<uv::Buffer>, uv::Error)> callback) {
|
||||
finish.connect([=](uv::Error err) {
|
||||
MutableArrayRef<uv::Buffer> bufs{m_bufs};
|
||||
for (auto&& buf : bufs.slice(0, m_startUser)) buf.Deallocate();
|
||||
for (auto&& buf : bufs.slice(0, m_startUser)) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
callback(bufs.slice(m_startUser), err);
|
||||
});
|
||||
}
|
||||
@@ -41,7 +43,9 @@ class WebSocket::ClientHandshakeData {
|
||||
static std::default_random_engine gen{rd()};
|
||||
std::uniform_int_distribution<unsigned int> dist(0, 255);
|
||||
char nonce[16]; // the nonce sent to the server
|
||||
for (char& v : nonce) v = static_cast<char>(dist(gen));
|
||||
for (char& v : nonce) {
|
||||
v = static_cast<char>(dist(gen));
|
||||
}
|
||||
raw_svector_ostream os(key);
|
||||
Base64Encode(os, StringRef{nonce, 16});
|
||||
}
|
||||
@@ -111,18 +115,24 @@ std::shared_ptr<WebSocket> WebSocket::CreateServer(uv::Stream& stream,
|
||||
|
||||
void WebSocket::Close(uint16_t code, const Twine& reason) {
|
||||
SendClose(code, reason);
|
||||
if (m_state != FAILED && m_state != CLOSED) m_state = CLOSING;
|
||||
if (m_state != FAILED && m_state != CLOSED) {
|
||||
m_state = CLOSING;
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocket::Fail(uint16_t code, const Twine& reason) {
|
||||
if (m_state == FAILED || m_state == CLOSED) return;
|
||||
if (m_state == FAILED || m_state == CLOSED) {
|
||||
return;
|
||||
}
|
||||
SendClose(code, reason);
|
||||
SetClosed(code, reason, true);
|
||||
Shutdown();
|
||||
}
|
||||
|
||||
void WebSocket::Terminate(uint16_t code, const Twine& reason) {
|
||||
if (m_state == FAILED || m_state == CLOSED) return;
|
||||
if (m_state == FAILED || m_state == CLOSED) {
|
||||
return;
|
||||
}
|
||||
SetClosed(code, reason);
|
||||
Shutdown();
|
||||
}
|
||||
@@ -149,10 +159,11 @@ void WebSocket::StartClient(const Twine& uri, const Twine& host,
|
||||
os << "Sec-WebSocket-Protocol: ";
|
||||
bool first = true;
|
||||
for (auto protocol : protocols) {
|
||||
if (!first)
|
||||
if (!first) {
|
||||
os << ", ";
|
||||
else
|
||||
} else {
|
||||
first = false;
|
||||
}
|
||||
os << protocol;
|
||||
// also save for later checking against server response
|
||||
m_clientHandshake->protocols.emplace_back(protocol);
|
||||
@@ -161,42 +172,52 @@ void WebSocket::StartClient(const Twine& uri, const Twine& host,
|
||||
}
|
||||
|
||||
// other headers
|
||||
for (auto&& header : options.extraHeaders)
|
||||
for (auto&& header : options.extraHeaders) {
|
||||
os << header.first << ": " << header.second << "\r\n";
|
||||
}
|
||||
|
||||
// finish headers
|
||||
os << "\r\n";
|
||||
|
||||
// Send client request
|
||||
m_stream.Write(bufs, [](auto bufs, uv::Error) {
|
||||
for (auto& buf : bufs) buf.Deallocate();
|
||||
for (auto& buf : bufs) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
});
|
||||
|
||||
// Set up client response handling
|
||||
m_clientHandshake->parser.status.connect([this](StringRef status) {
|
||||
unsigned int code = m_clientHandshake->parser.GetStatusCode();
|
||||
if (code != 101) Terminate(code, status);
|
||||
if (code != 101) {
|
||||
Terminate(code, status);
|
||||
}
|
||||
});
|
||||
m_clientHandshake->parser.header.connect(
|
||||
[this](StringRef name, StringRef value) {
|
||||
value = value.trim();
|
||||
if (name.equals_lower("upgrade")) {
|
||||
if (!value.equals_lower("websocket"))
|
||||
if (!value.equals_lower("websocket")) {
|
||||
return Terminate(1002, "invalid upgrade response value");
|
||||
}
|
||||
m_clientHandshake->hasUpgrade = true;
|
||||
} else if (name.equals_lower("connection")) {
|
||||
if (!value.equals_lower("upgrade"))
|
||||
if (!value.equals_lower("upgrade")) {
|
||||
return Terminate(1002, "invalid connection response value");
|
||||
}
|
||||
m_clientHandshake->hasConnection = true;
|
||||
} else if (name.equals_lower("sec-websocket-accept")) {
|
||||
// Check against expected response
|
||||
SmallString<64> acceptBuf;
|
||||
if (!value.equals(AcceptHash(m_clientHandshake->key, acceptBuf)))
|
||||
if (!value.equals(AcceptHash(m_clientHandshake->key, acceptBuf))) {
|
||||
return Terminate(1002, "invalid accept key");
|
||||
}
|
||||
m_clientHandshake->hasAccept = true;
|
||||
} else if (name.equals_lower("sec-websocket-extensions")) {
|
||||
// No extensions are supported
|
||||
if (!value.empty()) return Terminate(1010, "unsupported extension");
|
||||
if (!value.empty()) {
|
||||
return Terminate(1010, "unsupported extension");
|
||||
}
|
||||
} else if (name.equals_lower("sec-websocket-protocol")) {
|
||||
// Make sure it was one of the provided protocols
|
||||
bool match = false;
|
||||
@@ -206,7 +227,9 @@ void WebSocket::StartClient(const Twine& uri, const Twine& host,
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!match) return Terminate(1003, "unsupported protocol");
|
||||
if (!match) {
|
||||
return Terminate(1003, "unsupported protocol");
|
||||
}
|
||||
m_clientHandshake->hasProtocol = true;
|
||||
m_protocol = value;
|
||||
}
|
||||
@@ -248,7 +271,9 @@ void WebSocket::StartServer(StringRef key, StringRef version,
|
||||
os << "Upgrade: WebSocket\r\n";
|
||||
os << "Sec-WebSocket-Version: 13\r\n\r\n";
|
||||
m_stream.Write(bufs, [this](auto bufs, uv::Error) {
|
||||
for (auto& buf : bufs) buf.Deallocate();
|
||||
for (auto& buf : bufs) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
// XXX: Should we support sending a new handshake on the same connection?
|
||||
// XXX: "this->" is required by GCC 5.5 (bug)
|
||||
this->Terminate(1003, "unsupported protocol version");
|
||||
@@ -264,14 +289,18 @@ void WebSocket::StartServer(StringRef key, StringRef version,
|
||||
SmallString<64> acceptBuf;
|
||||
os << "Sec-WebSocket-Accept: " << AcceptHash(key, acceptBuf) << "\r\n";
|
||||
|
||||
if (!protocol.empty()) os << "Sec-WebSocket-Protocol: " << protocol << "\r\n";
|
||||
if (!protocol.empty()) {
|
||||
os << "Sec-WebSocket-Protocol: " << protocol << "\r\n";
|
||||
}
|
||||
|
||||
// end headers
|
||||
os << "\r\n";
|
||||
|
||||
// Send server response
|
||||
m_stream.Write(bufs, [this](auto bufs, uv::Error) {
|
||||
for (auto& buf : bufs) buf.Deallocate();
|
||||
for (auto& buf : bufs) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
if (m_state == CONNECTING) {
|
||||
m_state = OPEN;
|
||||
open(m_protocol);
|
||||
@@ -289,12 +318,16 @@ void WebSocket::SendClose(uint16_t code, const Twine& reason) {
|
||||
reason.print(os);
|
||||
}
|
||||
Send(kFlagFin | kOpClose, bufs, [](auto bufs, uv::Error) {
|
||||
for (auto&& buf : bufs) buf.Deallocate();
|
||||
for (auto&& buf : bufs) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void WebSocket::SetClosed(uint16_t code, const Twine& reason, bool failed) {
|
||||
if (m_state == FAILED || m_state == CLOSED) return;
|
||||
if (m_state == FAILED || m_state == CLOSED) {
|
||||
return;
|
||||
}
|
||||
m_state = failed ? FAILED : CLOSED;
|
||||
SmallString<64> reasonBuf;
|
||||
closed(code, reason.toStringRef(reasonBuf));
|
||||
@@ -306,7 +339,9 @@ void WebSocket::Shutdown() {
|
||||
|
||||
void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
// ignore incoming data if we're failed or closed
|
||||
if (m_state == FAILED || m_state == CLOSED) return;
|
||||
if (m_state == FAILED || m_state == CLOSED) {
|
||||
return;
|
||||
}
|
||||
|
||||
StringRef data{buf.base, size};
|
||||
|
||||
@@ -315,9 +350,12 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
if (m_clientHandshake) {
|
||||
data = m_clientHandshake->parser.Execute(data);
|
||||
// check for parser failure
|
||||
if (m_clientHandshake->parser.HasError())
|
||||
if (m_clientHandshake->parser.HasError()) {
|
||||
return Terminate(1003, "invalid response");
|
||||
if (m_state != OPEN) return; // not done with handshake yet
|
||||
}
|
||||
if (m_state != OPEN) {
|
||||
return; // not done with handshake yet
|
||||
}
|
||||
|
||||
// we're done with the handshake, so release its memory
|
||||
m_clientHandshake.reset();
|
||||
@@ -336,26 +374,37 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
size_t toCopy = (std::min)(2u - m_header.size(), data.size());
|
||||
m_header.append(data.bytes_begin(), data.bytes_begin() + toCopy);
|
||||
data = data.drop_front(toCopy);
|
||||
if (m_header.size() < 2u) return; // need more data
|
||||
if (m_header.size() < 2u) {
|
||||
return; // need more data
|
||||
}
|
||||
|
||||
// Validate RSV bits are zero
|
||||
if ((m_header[0] & 0x70) != 0) return Fail(1002, "nonzero RSV");
|
||||
if ((m_header[0] & 0x70) != 0) {
|
||||
return Fail(1002, "nonzero RSV");
|
||||
}
|
||||
}
|
||||
|
||||
// Once we have first two bytes, we can calculate the header size
|
||||
if (m_headerSize == 0) {
|
||||
m_headerSize = 2;
|
||||
uint8_t len = m_header[1] & kLenMask;
|
||||
if (len == 126)
|
||||
if (len == 126) {
|
||||
m_headerSize += 2;
|
||||
else if (len == 127)
|
||||
} else if (len == 127) {
|
||||
m_headerSize += 8;
|
||||
}
|
||||
bool masking = (m_header[1] & kFlagMasking) != 0;
|
||||
if (masking) m_headerSize += 4; // masking key
|
||||
if (masking) {
|
||||
m_headerSize += 4; // masking key
|
||||
}
|
||||
// On server side, incoming messages MUST be masked
|
||||
// On client side, incoming messages MUST NOT be masked
|
||||
if (m_server && !masking) return Fail(1002, "client data not masked");
|
||||
if (!m_server && masking) return Fail(1002, "server data masked");
|
||||
if (m_server && !masking) {
|
||||
return Fail(1002, "client data not masked");
|
||||
}
|
||||
if (!m_server && masking) {
|
||||
return Fail(1002, "server data masked");
|
||||
}
|
||||
}
|
||||
|
||||
// Need to complete header to calculate message size
|
||||
@@ -363,16 +412,18 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
size_t toCopy = (std::min)(m_headerSize - m_header.size(), data.size());
|
||||
m_header.append(data.bytes_begin(), data.bytes_begin() + toCopy);
|
||||
data = data.drop_front(toCopy);
|
||||
if (m_header.size() < m_headerSize) return; // need more data
|
||||
if (m_header.size() < m_headerSize) {
|
||||
return; // need more data
|
||||
}
|
||||
}
|
||||
|
||||
if (m_header.size() >= m_headerSize) {
|
||||
// get payload length
|
||||
uint8_t len = m_header[1] & kLenMask;
|
||||
if (len == 126)
|
||||
if (len == 126) {
|
||||
m_frameSize = (static_cast<uint16_t>(m_header[2]) << 8) |
|
||||
static_cast<uint16_t>(m_header[3]);
|
||||
else if (len == 127)
|
||||
} else if (len == 127) {
|
||||
m_frameSize = (static_cast<uint64_t>(m_header[2]) << 56) |
|
||||
(static_cast<uint64_t>(m_header[3]) << 48) |
|
||||
(static_cast<uint64_t>(m_header[4]) << 40) |
|
||||
@@ -381,12 +432,14 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
(static_cast<uint64_t>(m_header[7]) << 16) |
|
||||
(static_cast<uint64_t>(m_header[8]) << 8) |
|
||||
static_cast<uint64_t>(m_header[9]);
|
||||
else
|
||||
} else {
|
||||
m_frameSize = len;
|
||||
}
|
||||
|
||||
// limit maximum size
|
||||
if ((m_payload.size() + m_frameSize) > m_maxMessageSize)
|
||||
if ((m_payload.size() + m_frameSize) > m_maxMessageSize) {
|
||||
return Fail(1009, "message too large");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -407,7 +460,9 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
for (uint8_t& ch :
|
||||
MutableArrayRef<uint8_t>{m_payload}.slice(m_frameStart)) {
|
||||
ch ^= key[n++];
|
||||
if (n >= 4) n = 0;
|
||||
if (n >= 4) {
|
||||
n = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,32 +473,48 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
case kOpCont:
|
||||
switch (m_fragmentOpcode) {
|
||||
case kOpText:
|
||||
if (!m_combineFragments || fin)
|
||||
if (!m_combineFragments || fin) {
|
||||
text(StringRef{reinterpret_cast<char*>(m_payload.data()),
|
||||
m_payload.size()},
|
||||
fin);
|
||||
}
|
||||
break;
|
||||
case kOpBinary:
|
||||
if (!m_combineFragments || fin) binary(m_payload, fin);
|
||||
if (!m_combineFragments || fin) {
|
||||
binary(m_payload, fin);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// no preceding message?
|
||||
return Fail(1002, "invalid continuation message");
|
||||
}
|
||||
if (fin) m_fragmentOpcode = 0;
|
||||
if (fin) {
|
||||
m_fragmentOpcode = 0;
|
||||
}
|
||||
break;
|
||||
case kOpText:
|
||||
if (m_fragmentOpcode != 0) return Fail(1002, "incomplete fragment");
|
||||
if (!m_combineFragments || fin)
|
||||
if (m_fragmentOpcode != 0) {
|
||||
return Fail(1002, "incomplete fragment");
|
||||
}
|
||||
if (!m_combineFragments || fin) {
|
||||
text(StringRef{reinterpret_cast<char*>(m_payload.data()),
|
||||
m_payload.size()},
|
||||
fin);
|
||||
if (!fin) m_fragmentOpcode = opcode;
|
||||
}
|
||||
if (!fin) {
|
||||
m_fragmentOpcode = opcode;
|
||||
}
|
||||
break;
|
||||
case kOpBinary:
|
||||
if (m_fragmentOpcode != 0) return Fail(1002, "incomplete fragment");
|
||||
if (!m_combineFragments || fin) binary(m_payload, fin);
|
||||
if (!fin) m_fragmentOpcode = opcode;
|
||||
if (m_fragmentOpcode != 0) {
|
||||
return Fail(1002, "incomplete fragment");
|
||||
}
|
||||
if (!m_combineFragments || fin) {
|
||||
binary(m_payload, fin);
|
||||
}
|
||||
if (!fin) {
|
||||
m_fragmentOpcode = opcode;
|
||||
}
|
||||
break;
|
||||
case kOpClose: {
|
||||
uint16_t code;
|
||||
@@ -461,18 +532,26 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
.drop_front(2);
|
||||
}
|
||||
// Echo the close if we didn't previously send it
|
||||
if (m_state != CLOSING) SendClose(code, reason);
|
||||
if (m_state != CLOSING) {
|
||||
SendClose(code, reason);
|
||||
}
|
||||
SetClosed(code, reason);
|
||||
// If we're the server, shutdown the connection.
|
||||
if (m_server) Shutdown();
|
||||
if (m_server) {
|
||||
Shutdown();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case kOpPing:
|
||||
if (!fin) return Fail(1002, "cannot fragment control frames");
|
||||
if (!fin) {
|
||||
return Fail(1002, "cannot fragment control frames");
|
||||
}
|
||||
ping(m_payload);
|
||||
break;
|
||||
case kOpPong:
|
||||
if (!fin) return Fail(1002, "cannot fragment control frames");
|
||||
if (!fin) {
|
||||
return Fail(1002, "cannot fragment control frames");
|
||||
}
|
||||
pong(m_payload);
|
||||
break;
|
||||
default:
|
||||
@@ -482,7 +561,9 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
|
||||
// Prepare for next message
|
||||
m_header.clear();
|
||||
m_headerSize = 0;
|
||||
if (!m_combineFragments || fin) m_payload.clear();
|
||||
if (!m_combineFragments || fin) {
|
||||
m_payload.clear();
|
||||
}
|
||||
m_frameStart = m_payload.size();
|
||||
m_frameSize = UINT64_MAX;
|
||||
}
|
||||
@@ -496,10 +577,11 @@ void WebSocket::Send(
|
||||
// If we're not open, emit an error and don't send the data
|
||||
if (m_state != OPEN) {
|
||||
int err;
|
||||
if (m_state == CONNECTING)
|
||||
if (m_state == CONNECTING) {
|
||||
err = UV_EAGAIN;
|
||||
else
|
||||
} else {
|
||||
err = UV_ESHUTDOWN;
|
||||
}
|
||||
SmallVector<uv::Buffer, 4> bufs{data.begin(), data.end()};
|
||||
callback(bufs, uv::Error{err});
|
||||
return;
|
||||
@@ -513,7 +595,9 @@ void WebSocket::Send(
|
||||
|
||||
// payload length
|
||||
uint64_t size = 0;
|
||||
for (auto&& buf : data) size += buf.len;
|
||||
for (auto&& buf : data) {
|
||||
size += buf.len;
|
||||
}
|
||||
if (size < 126) {
|
||||
os << static_cast<unsigned char>((m_server ? 0x00 : kFlagMasking) | size);
|
||||
} else if (size <= 0xffff) {
|
||||
@@ -541,14 +625,18 @@ void WebSocket::Send(
|
||||
static std::default_random_engine gen{rd()};
|
||||
std::uniform_int_distribution<unsigned int> dist(0, 255);
|
||||
uint8_t key[4];
|
||||
for (uint8_t& v : key) v = dist(gen);
|
||||
for (uint8_t& v : key) {
|
||||
v = dist(gen);
|
||||
}
|
||||
os << ArrayRef<uint8_t>{key, 4};
|
||||
// copy and mask data
|
||||
int n = 0;
|
||||
for (auto&& buf : data) {
|
||||
for (auto&& ch : buf.data()) {
|
||||
os << static_cast<unsigned char>(static_cast<uint8_t>(ch) ^ key[n++]);
|
||||
if (n >= 4) n = 0;
|
||||
if (n >= 4) {
|
||||
n = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
req->m_startUser = req->m_bufs.size();
|
||||
|
||||
@@ -15,7 +15,9 @@ WebSocketServerHelper::WebSocketServerHelper(HttpParser& req) {
|
||||
if (name.equals_lower("host")) {
|
||||
m_gotHost = true;
|
||||
} else if (name.equals_lower("upgrade")) {
|
||||
if (value.equals_lower("websocket")) m_websocket = true;
|
||||
if (value.equals_lower("websocket")) {
|
||||
m_websocket = true;
|
||||
}
|
||||
} else if (name.equals_lower("sec-websocket-key")) {
|
||||
m_key = value;
|
||||
} else if (name.equals_lower("sec-websocket-version")) {
|
||||
@@ -26,22 +28,29 @@ WebSocketServerHelper::WebSocketServerHelper(HttpParser& req) {
|
||||
value.split(protocols, ",", -1, false);
|
||||
for (auto protocol : protocols) {
|
||||
protocol = protocol.trim();
|
||||
if (!protocol.empty()) m_protocols.emplace_back(protocol);
|
||||
if (!protocol.empty()) {
|
||||
m_protocols.emplace_back(protocol);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
req.headersComplete.connect([&req, this](bool) {
|
||||
if (req.IsUpgrade() && IsUpgrade()) upgrade();
|
||||
if (req.IsUpgrade() && IsUpgrade()) {
|
||||
upgrade();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::pair<bool, StringRef> WebSocketServerHelper::MatchProtocol(
|
||||
ArrayRef<StringRef> protocols) {
|
||||
if (protocols.empty() && m_protocols.empty())
|
||||
if (protocols.empty() && m_protocols.empty()) {
|
||||
return std::make_pair(true, StringRef{});
|
||||
}
|
||||
for (auto protocol : protocols) {
|
||||
for (auto&& clientProto : m_protocols) {
|
||||
if (protocol == clientProto) return std::make_pair(true, protocol);
|
||||
if (protocol == clientProto) {
|
||||
return std::make_pair(true, protocol);
|
||||
}
|
||||
}
|
||||
}
|
||||
return std::make_pair(false, StringRef{});
|
||||
@@ -59,24 +68,31 @@ WebSocketServer::WebSocketServer(uv::Stream& stream,
|
||||
m_req.header.connect([this](StringRef name, StringRef value) {
|
||||
if (name.equals_lower("host")) {
|
||||
if (m_options.checkHost) {
|
||||
if (!m_options.checkHost(value)) Abort(401, "Unrecognized Host");
|
||||
if (!m_options.checkHost(value)) {
|
||||
Abort(401, "Unrecognized Host");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
m_req.url.connect([this](StringRef name) {
|
||||
if (m_options.checkUrl) {
|
||||
if (!m_options.checkUrl(name)) Abort(404, "Not Found");
|
||||
if (!m_options.checkUrl(name)) {
|
||||
Abort(404, "Not Found");
|
||||
}
|
||||
}
|
||||
});
|
||||
m_req.headersComplete.connect([this](bool) {
|
||||
// We only accept websocket connections
|
||||
if (!m_helper.IsUpgrade() || !m_req.IsUpgrade())
|
||||
if (!m_helper.IsUpgrade() || !m_req.IsUpgrade()) {
|
||||
Abort(426, "Upgrade Required");
|
||||
}
|
||||
});
|
||||
|
||||
// Handle upgrade event
|
||||
m_helper.upgrade.connect([this] {
|
||||
if (m_aborted) return;
|
||||
if (m_aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Negotiate sub-protocol
|
||||
SmallVector<StringRef, 2> protocols{m_protocols.begin(), m_protocols.end()};
|
||||
@@ -103,9 +119,13 @@ WebSocketServer::WebSocketServer(uv::Stream& stream,
|
||||
stream.StartRead();
|
||||
m_dataConn =
|
||||
stream.data.connect_connection([this](uv::Buffer& buf, size_t size) {
|
||||
if (m_aborted) return;
|
||||
if (m_aborted) {
|
||||
return;
|
||||
}
|
||||
m_req.Execute(StringRef{buf.base, size});
|
||||
if (m_req.HasError()) Abort(400, "Bad Request");
|
||||
if (m_req.HasError()) {
|
||||
Abort(400, "Bad Request");
|
||||
}
|
||||
});
|
||||
m_errorConn =
|
||||
stream.error.connect_connection([this](uv::Error) { m_stream.Close(); });
|
||||
@@ -122,7 +142,9 @@ std::shared_ptr<WebSocketServer> WebSocketServer::Create(
|
||||
}
|
||||
|
||||
void WebSocketServer::Abort(uint16_t code, StringRef reason) {
|
||||
if (m_aborted) return;
|
||||
if (m_aborted) {
|
||||
return;
|
||||
}
|
||||
m_aborted = true;
|
||||
|
||||
// Build response
|
||||
@@ -131,10 +153,14 @@ void WebSocketServer::Abort(uint16_t code, StringRef reason) {
|
||||
|
||||
// Handle unsupported version
|
||||
os << "HTTP/1.1 " << code << ' ' << reason << "\r\n";
|
||||
if (code == 426) os << "Upgrade: WebSocket\r\n";
|
||||
if (code == 426) {
|
||||
os << "Upgrade: WebSocket\r\n";
|
||||
}
|
||||
os << "\r\n";
|
||||
m_stream.Write(bufs, [this](auto bufs, uv::Error) {
|
||||
for (auto& buf : bufs) buf.Deallocate();
|
||||
for (auto& buf : bufs) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
m_stream.Shutdown([this] { m_stream.Close(); });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -25,10 +25,14 @@ uint64_t PromiseFactoryBase::CreateRequest() {
|
||||
}
|
||||
|
||||
bool PromiseFactoryBase::EraseRequest(uint64_t request) {
|
||||
if (request == 0) return false;
|
||||
if (request == 0) {
|
||||
return false;
|
||||
}
|
||||
auto it = std::find_if(m_requests.begin(), m_requests.end(),
|
||||
[=](auto r) { return r == request; });
|
||||
if (it == m_requests.end()) return false; // no waiters
|
||||
if (it == m_requests.end()) {
|
||||
return false; // no waiters
|
||||
}
|
||||
m_requests.erase(it);
|
||||
return true;
|
||||
}
|
||||
@@ -44,7 +48,9 @@ future<void> PromiseFactory<void>::MakeReadyFuture() {
|
||||
|
||||
void PromiseFactory<void>::SetValue(uint64_t request) {
|
||||
std::unique_lock lock(GetResultMutex());
|
||||
if (!EraseRequest(request)) return;
|
||||
if (!EraseRequest(request)) {
|
||||
return;
|
||||
}
|
||||
auto it = std::find_if(m_thens.begin(), m_thens.end(),
|
||||
[=](const auto& x) { return x.request == request; });
|
||||
if (it != m_thens.end()) {
|
||||
@@ -102,7 +108,9 @@ void PromiseFactory<void>::WaitResult(uint64_t request) {
|
||||
// Did we get a response to *our* request?
|
||||
auto it = std::find_if(m_results.begin(), m_results.end(),
|
||||
[=](const auto& r) { return r == request; });
|
||||
if (it != m_results.end()) return;
|
||||
if (it != m_results.end()) {
|
||||
return;
|
||||
}
|
||||
// No, keep waiting for a response
|
||||
Wait(lock);
|
||||
}
|
||||
|
||||
@@ -24,7 +24,9 @@ std::string GetHostname() {
|
||||
} else if (err == UV_ENOBUFS) {
|
||||
char* name2 = static_cast<char*>(std::malloc(size));
|
||||
err = uv_os_gethostname(name2, &size);
|
||||
if (err == 0) rv.assign(name2, size);
|
||||
if (err == 0) {
|
||||
rv.assign(name2, size);
|
||||
}
|
||||
std::free(name2);
|
||||
}
|
||||
|
||||
@@ -44,7 +46,9 @@ StringRef GetHostname(SmallVectorImpl<char>& name) {
|
||||
} else if (err == UV_ENOBUFS) {
|
||||
name.resize(size);
|
||||
err = uv_os_gethostname(name.data(), &size);
|
||||
if (err != 0) size = 0;
|
||||
if (err != 0) {
|
||||
size = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return StringRef{name.data(), size};
|
||||
|
||||
@@ -15,8 +15,9 @@ extern "C" {
|
||||
|
||||
JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM* vm, void* reserved) {
|
||||
JNIEnv* env;
|
||||
if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION_1_6) != JNI_OK)
|
||||
if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION_1_6) != JNI_OK) {
|
||||
return JNI_ERR;
|
||||
}
|
||||
|
||||
return JNI_VERSION_1_6;
|
||||
}
|
||||
|
||||
@@ -24,8 +24,9 @@ uint64_t WriteUleb128(SmallVectorImpl<char>& dest, uint64_t val) {
|
||||
unsigned char byte = val & 0x7f;
|
||||
val >>= 7;
|
||||
|
||||
if (val != 0)
|
||||
if (val != 0) {
|
||||
byte |= 0x80; // mark this byte to show that more bytes will follow
|
||||
}
|
||||
|
||||
dest.push_back(byte);
|
||||
count++;
|
||||
@@ -47,7 +48,9 @@ uint64_t ReadUleb128(const char* addr, uint64_t* ret) {
|
||||
result |= (byte & 0x7f) << shift;
|
||||
shift += 7;
|
||||
|
||||
if (!(byte & 0x80)) break;
|
||||
if (!(byte & 0x80)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*ret = result;
|
||||
@@ -62,12 +65,16 @@ bool ReadUleb128(raw_istream& is, uint64_t* ret) {
|
||||
while (1) {
|
||||
unsigned char byte;
|
||||
is.read(reinterpret_cast<char*>(&byte), 1);
|
||||
if (is.has_error()) return false;
|
||||
if (is.has_error()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
result |= (byte & 0x7f) << shift;
|
||||
shift += 7;
|
||||
|
||||
if (!(byte & 0x80)) break;
|
||||
if (!(byte & 0x80)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*ret = result;
|
||||
|
||||
@@ -38,17 +38,25 @@ StringRef raw_istream::getline(SmallVectorImpl<char>& buf, int maxLen) {
|
||||
for (int i = 0; i < maxLen; ++i) {
|
||||
char c;
|
||||
read(c);
|
||||
if (has_error()) return StringRef{buf.data(), buf.size()};
|
||||
if (c == '\r') continue;
|
||||
if (has_error()) {
|
||||
return StringRef{buf.data(), buf.size()};
|
||||
}
|
||||
if (c == '\r') {
|
||||
continue;
|
||||
}
|
||||
buf.push_back(c);
|
||||
if (c == '\n') break;
|
||||
if (c == '\n') {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return StringRef{buf.data(), buf.size()};
|
||||
}
|
||||
|
||||
void raw_mem_istream::close() {}
|
||||
|
||||
size_t raw_mem_istream::in_avail() const { return m_left; }
|
||||
size_t raw_mem_istream::in_avail() const {
|
||||
return m_left;
|
||||
}
|
||||
|
||||
void raw_mem_istream::read_impl(void* data, size_t len) {
|
||||
if (len > m_left) {
|
||||
@@ -73,7 +81,9 @@ static int getFD(const Twine& Filename, std::error_code& EC) {
|
||||
int FD;
|
||||
|
||||
EC = sys::fs::openFileForRead(Filename, FD);
|
||||
if (EC) return -1;
|
||||
if (EC) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
EC = std::error_code();
|
||||
return FD;
|
||||
@@ -89,7 +99,9 @@ raw_fd_istream::raw_fd_istream(int fd, bool shouldClose, size_t bufSize)
|
||||
}
|
||||
|
||||
raw_fd_istream::~raw_fd_istream() {
|
||||
if (m_shouldClose) close();
|
||||
if (m_shouldClose) {
|
||||
close();
|
||||
}
|
||||
std::free(m_buf);
|
||||
}
|
||||
|
||||
@@ -100,7 +112,9 @@ void raw_fd_istream::close() {
|
||||
}
|
||||
}
|
||||
|
||||
size_t raw_fd_istream::in_avail() const { return m_end - m_cur; }
|
||||
size_t raw_fd_istream::in_avail() const {
|
||||
return m_end - m_cur;
|
||||
}
|
||||
|
||||
void raw_fd_istream::read_impl(void* data, size_t len) {
|
||||
char* cdata = static_cast<char*>(data);
|
||||
|
||||
@@ -24,6 +24,10 @@ void raw_socket_istream::read_impl(void* data, size_t len) {
|
||||
set_read_count(pos);
|
||||
}
|
||||
|
||||
void raw_socket_istream::close() { m_stream.close(); }
|
||||
void raw_socket_istream::close() {
|
||||
m_stream.close();
|
||||
}
|
||||
|
||||
size_t raw_socket_istream::in_avail() const { return 0; }
|
||||
size_t raw_socket_istream::in_avail() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -10,7 +10,9 @@ using namespace wpi;
|
||||
|
||||
raw_socket_ostream::~raw_socket_ostream() {
|
||||
flush();
|
||||
if (m_shouldClose) close();
|
||||
if (m_shouldClose) {
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
void raw_socket_ostream::write_impl(const char* data, size_t len) {
|
||||
@@ -27,10 +29,14 @@ void raw_socket_ostream::write_impl(const char* data, size_t len) {
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t raw_socket_ostream::current_pos() const { return 0; }
|
||||
uint64_t raw_socket_ostream::current_pos() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
void raw_socket_ostream::close() {
|
||||
if (!m_shouldClose) return;
|
||||
if (!m_shouldClose) {
|
||||
return;
|
||||
}
|
||||
flush();
|
||||
m_stream.close();
|
||||
}
|
||||
|
||||
@@ -32,6 +32,8 @@ void raw_uv_ostream::write_impl(const char* data, size_t len) {
|
||||
|
||||
uint64_t raw_uv_ostream::current_pos() const {
|
||||
uint64_t size = 0;
|
||||
for (auto&& buf : m_bufs) size += buf.len;
|
||||
for (auto&& buf : m_bufs) {
|
||||
size += buf.len;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
@@ -212,7 +212,9 @@ static void buffer_to_block(const unsigned char* buffer,
|
||||
}
|
||||
}
|
||||
|
||||
SHA1::SHA1() { reset(digest, buf_size, transforms); }
|
||||
SHA1::SHA1() {
|
||||
reset(digest, buf_size, transforms);
|
||||
}
|
||||
|
||||
void SHA1::Update(StringRef s) {
|
||||
raw_mem_istream is(makeArrayRef(s.data(), s.size()));
|
||||
|
||||
@@ -92,14 +92,22 @@ void wpi::SetNowImpl(uint64_t (*func)(void)) {
|
||||
now_impl = func ? func : NowDefault;
|
||||
}
|
||||
|
||||
uint64_t wpi::Now() { return (now_impl.load())(); }
|
||||
uint64_t wpi::Now() {
|
||||
return (now_impl.load())();
|
||||
}
|
||||
|
||||
extern "C" {
|
||||
|
||||
uint64_t WPI_NowDefault(void) { return wpi::NowDefault(); }
|
||||
uint64_t WPI_NowDefault(void) {
|
||||
return wpi::NowDefault();
|
||||
}
|
||||
|
||||
void WPI_SetNowImpl(uint64_t (*func)(void)) { wpi::SetNowImpl(func); }
|
||||
void WPI_SetNowImpl(uint64_t (*func)(void)) {
|
||||
wpi::SetNowImpl(func);
|
||||
}
|
||||
|
||||
uint64_t WPI_Now(void) { return wpi::Now(); }
|
||||
uint64_t WPI_Now(void) {
|
||||
return wpi::Now();
|
||||
}
|
||||
|
||||
} // extern "C"
|
||||
|
||||
@@ -10,10 +10,11 @@ namespace wpi {
|
||||
namespace uv {
|
||||
|
||||
Async<>::~Async() noexcept {
|
||||
if (auto loop = m_loop.lock())
|
||||
if (auto loop = m_loop.lock()) {
|
||||
Close();
|
||||
else
|
||||
} else {
|
||||
ForceClosed();
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Async<>> Async<>::Create(const std::shared_ptr<Loop>& loop) {
|
||||
|
||||
@@ -29,10 +29,11 @@ void FsEvent::Start(const Twine& path, unsigned int flags) {
|
||||
&uv_fs_event_start, GetRaw(),
|
||||
[](uv_fs_event_t* handle, const char* filename, int events, int status) {
|
||||
FsEvent& h = *static_cast<FsEvent*>(handle->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.fsEvent(filename, events);
|
||||
}
|
||||
},
|
||||
path.toNullTerminatedStringRef(pathBuf).data(), flags);
|
||||
}
|
||||
|
||||
@@ -23,10 +23,11 @@ void GetAddrInfo(Loop& loop, const std::shared_ptr<GetAddrInfoReq>& req,
|
||||
loop.GetRaw(), req->GetRaw(),
|
||||
[](uv_getaddrinfo_t* req, int status, addrinfo* res) {
|
||||
auto& h = *static_cast<GetAddrInfoReq*>(req->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.resolved(*res);
|
||||
}
|
||||
uv_freeaddrinfo(res);
|
||||
h.Release(); // this is always a one-shot
|
||||
},
|
||||
@@ -34,10 +35,11 @@ void GetAddrInfo(Loop& loop, const std::shared_ptr<GetAddrInfoReq>& req,
|
||||
service.isNull() ? nullptr
|
||||
: service.toNullTerminatedStringRef(serviceStr).data(),
|
||||
hints);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
loop.ReportError(err);
|
||||
else
|
||||
} else {
|
||||
req->Keep();
|
||||
}
|
||||
}
|
||||
|
||||
void GetAddrInfo(Loop& loop, std::function<void(const addrinfo&)> callback,
|
||||
|
||||
@@ -21,17 +21,19 @@ void GetNameInfo(Loop& loop, const std::shared_ptr<GetNameInfoReq>& req,
|
||||
[](uv_getnameinfo_t* req, int status, const char* hostname,
|
||||
const char* service) {
|
||||
auto& h = *static_cast<GetNameInfoReq*>(req->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.resolved(hostname, service);
|
||||
}
|
||||
h.Release(); // this is always a one-shot
|
||||
},
|
||||
&addr, flags);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
loop.ReportError(err);
|
||||
else
|
||||
} else {
|
||||
req->Keep();
|
||||
}
|
||||
}
|
||||
|
||||
void GetNameInfo(Loop& loop,
|
||||
@@ -46,10 +48,11 @@ void GetNameInfo4(Loop& loop, const std::shared_ptr<GetNameInfoReq>& req,
|
||||
const Twine& ip, unsigned int port, int flags) {
|
||||
sockaddr_in addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
loop.ReportError(err);
|
||||
else
|
||||
} else {
|
||||
GetNameInfo(loop, req, reinterpret_cast<const sockaddr&>(addr), flags);
|
||||
}
|
||||
}
|
||||
|
||||
void GetNameInfo4(Loop& loop,
|
||||
@@ -57,20 +60,22 @@ void GetNameInfo4(Loop& loop,
|
||||
const Twine& ip, unsigned int port, int flags) {
|
||||
sockaddr_in addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
loop.ReportError(err);
|
||||
else
|
||||
} else {
|
||||
GetNameInfo(loop, callback, reinterpret_cast<const sockaddr&>(addr), flags);
|
||||
}
|
||||
}
|
||||
|
||||
void GetNameInfo6(Loop& loop, const std::shared_ptr<GetNameInfoReq>& req,
|
||||
const Twine& ip, unsigned int port, int flags) {
|
||||
sockaddr_in6 addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
loop.ReportError(err);
|
||||
else
|
||||
} else {
|
||||
GetNameInfo(loop, req, reinterpret_cast<const sockaddr&>(addr), flags);
|
||||
}
|
||||
}
|
||||
|
||||
void GetNameInfo6(Loop& loop,
|
||||
@@ -78,10 +83,11 @@ void GetNameInfo6(Loop& loop,
|
||||
const Twine& ip, unsigned int port, int flags) {
|
||||
sockaddr_in6 addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
loop.ReportError(err);
|
||||
else
|
||||
} else {
|
||||
GetNameInfo(loop, callback, reinterpret_cast<const sockaddr&>(addr), flags);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace uv
|
||||
|
||||
@@ -30,4 +30,6 @@ void Handle::AllocBuf(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
|
||||
*buf = h.m_allocBuf(size);
|
||||
}
|
||||
|
||||
void Handle::DefaultFreeBuf(Buffer& buf) { buf.Deallocate(); }
|
||||
void Handle::DefaultFreeBuf(Buffer& buf) {
|
||||
buf.Deallocate();
|
||||
}
|
||||
|
||||
@@ -26,7 +26,9 @@ Loop::~Loop() noexcept {
|
||||
|
||||
std::shared_ptr<Loop> Loop::Create() {
|
||||
auto loop = std::make_shared<Loop>(private_init{});
|
||||
if (uv_loop_init(&loop->m_loopStruct) < 0) return nullptr;
|
||||
if (uv_loop_init(&loop->m_loopStruct) < 0) {
|
||||
return nullptr;
|
||||
}
|
||||
loop->m_loop = &loop->m_loopStruct;
|
||||
loop->m_loop->data = loop.get();
|
||||
return loop;
|
||||
@@ -35,14 +37,18 @@ std::shared_ptr<Loop> Loop::Create() {
|
||||
std::shared_ptr<Loop> Loop::GetDefault() {
|
||||
static std::shared_ptr<Loop> loop = std::make_shared<Loop>(private_init{});
|
||||
loop->m_loop = uv_default_loop();
|
||||
if (!loop->m_loop) return nullptr;
|
||||
if (!loop->m_loop) {
|
||||
return nullptr;
|
||||
}
|
||||
loop->m_loop->data = loop.get();
|
||||
return loop;
|
||||
}
|
||||
|
||||
void Loop::Close() {
|
||||
int err = uv_loop_close(m_loop);
|
||||
if (err < 0) ReportError(err);
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
}
|
||||
}
|
||||
|
||||
void Loop::Walk(std::function<void(Handle&)> callback) {
|
||||
@@ -58,5 +64,7 @@ void Loop::Walk(std::function<void(Handle&)> callback) {
|
||||
|
||||
void Loop::Fork() {
|
||||
int err = uv_loop_fork(m_loop);
|
||||
if (err < 0) ReportError(err);
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,10 +15,11 @@ void NetworkStream::Listen(int backlog) {
|
||||
Invoke(&uv_listen, GetRawStream(), backlog,
|
||||
[](uv_stream_t* handle, int status) {
|
||||
auto& h = *static_cast<NetworkStream*>(handle->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.connection();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -23,13 +23,19 @@ std::shared_ptr<Pipe> Pipe::Create(Loop& loop, bool ipc) {
|
||||
}
|
||||
|
||||
void Pipe::Reuse(std::function<void()> callback, bool ipc) {
|
||||
if (IsClosing()) return;
|
||||
if (!m_reuseData) m_reuseData = std::make_unique<ReuseData>();
|
||||
if (IsClosing()) {
|
||||
return;
|
||||
}
|
||||
if (!m_reuseData) {
|
||||
m_reuseData = std::make_unique<ReuseData>();
|
||||
}
|
||||
m_reuseData->callback = callback;
|
||||
m_reuseData->ipc = ipc;
|
||||
uv_close(GetRawHandle(), [](uv_handle_t* handle) {
|
||||
Pipe& h = *static_cast<Pipe*>(handle->data);
|
||||
if (!h.m_reuseData) return;
|
||||
if (!h.m_reuseData) {
|
||||
return;
|
||||
}
|
||||
auto data = std::move(h.m_reuseData);
|
||||
auto err =
|
||||
uv_pipe_init(h.GetLoopRef().GetRaw(), h.GetRaw(), data->ipc ? 1 : 0);
|
||||
@@ -43,7 +49,9 @@ void Pipe::Reuse(std::function<void()> callback, bool ipc) {
|
||||
|
||||
std::shared_ptr<Pipe> Pipe::Accept() {
|
||||
auto client = Create(GetLoopRef(), GetRaw()->ipc);
|
||||
if (!client) return nullptr;
|
||||
if (!client) {
|
||||
return nullptr;
|
||||
}
|
||||
if (!Accept(client)) {
|
||||
client->Release();
|
||||
return nullptr;
|
||||
@@ -51,7 +59,9 @@ std::shared_ptr<Pipe> Pipe::Accept() {
|
||||
return client;
|
||||
}
|
||||
|
||||
Pipe* Pipe::DoAccept() { return Accept().get(); }
|
||||
Pipe* Pipe::DoAccept() {
|
||||
return Accept().get();
|
||||
}
|
||||
|
||||
void Pipe::Bind(const Twine& name) {
|
||||
SmallString<128> nameBuf;
|
||||
@@ -66,10 +76,11 @@ void Pipe::Connect(const Twine& name,
|
||||
name.toNullTerminatedStringRef(nameBuf).data(),
|
||||
[](uv_connect_t* req, int status) {
|
||||
auto& h = *static_cast<PipeConnectReq*>(req->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.connected();
|
||||
}
|
||||
h.Release(); // this is always a one-shot
|
||||
});
|
||||
req->Keep();
|
||||
|
||||
@@ -32,14 +32,20 @@ std::shared_ptr<Poll> Poll::CreateSocket(Loop& loop, uv_os_sock_t sock) {
|
||||
}
|
||||
|
||||
void Poll::Reuse(int fd, std::function<void()> callback) {
|
||||
if (IsClosing()) return;
|
||||
if (!m_reuseData) m_reuseData = std::make_unique<ReuseData>();
|
||||
if (IsClosing()) {
|
||||
return;
|
||||
}
|
||||
if (!m_reuseData) {
|
||||
m_reuseData = std::make_unique<ReuseData>();
|
||||
}
|
||||
m_reuseData->callback = callback;
|
||||
m_reuseData->isSocket = false;
|
||||
m_reuseData->fd = fd;
|
||||
uv_close(GetRawHandle(), [](uv_handle_t* handle) {
|
||||
Poll& h = *static_cast<Poll*>(handle->data);
|
||||
if (!h.m_reuseData || h.m_reuseData->isSocket) return; // just in case
|
||||
if (!h.m_reuseData || h.m_reuseData->isSocket) {
|
||||
return; // just in case
|
||||
}
|
||||
auto data = std::move(h.m_reuseData);
|
||||
int err = uv_poll_init(h.GetLoopRef().GetRaw(), h.GetRaw(), data->fd);
|
||||
if (err < 0) {
|
||||
@@ -51,14 +57,20 @@ void Poll::Reuse(int fd, std::function<void()> callback) {
|
||||
}
|
||||
|
||||
void Poll::ReuseSocket(uv_os_sock_t sock, std::function<void()> callback) {
|
||||
if (IsClosing()) return;
|
||||
if (!m_reuseData) m_reuseData = std::make_unique<ReuseData>();
|
||||
if (IsClosing()) {
|
||||
return;
|
||||
}
|
||||
if (!m_reuseData) {
|
||||
m_reuseData = std::make_unique<ReuseData>();
|
||||
}
|
||||
m_reuseData->callback = callback;
|
||||
m_reuseData->isSocket = true;
|
||||
m_reuseData->sock = sock;
|
||||
uv_close(GetRawHandle(), [](uv_handle_t* handle) {
|
||||
Poll& h = *static_cast<Poll*>(handle->data);
|
||||
if (!h.m_reuseData || !h.m_reuseData->isSocket) return; // just in case
|
||||
if (!h.m_reuseData || !h.m_reuseData->isSocket) {
|
||||
return; // just in case
|
||||
}
|
||||
auto data = std::move(h.m_reuseData);
|
||||
int err = uv_poll_init(h.GetLoopRef().GetRaw(), h.GetRaw(), data->sock);
|
||||
if (err < 0) {
|
||||
@@ -73,10 +85,11 @@ void Poll::Start(int events) {
|
||||
Invoke(&uv_poll_start, GetRaw(), events,
|
||||
[](uv_poll_t* handle, int status, int events) {
|
||||
Poll& h = *static_cast<Poll*>(handle->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.pollEvent(events);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -65,28 +65,36 @@ std::shared_ptr<Process> Process::SpawnArray(Loop& loop, const Twine& file,
|
||||
break;
|
||||
case Option::kStdioIgnore: {
|
||||
size_t index = o.m_data.stdio.index;
|
||||
if (index >= stdioBuf.size()) stdioBuf.resize(index + 1);
|
||||
if (index >= stdioBuf.size()) {
|
||||
stdioBuf.resize(index + 1);
|
||||
}
|
||||
stdioBuf[index].flags = UV_IGNORE;
|
||||
stdioBuf[index].data.fd = 0;
|
||||
break;
|
||||
}
|
||||
case Option::kStdioInheritFd: {
|
||||
size_t index = o.m_data.stdio.index;
|
||||
if (index >= stdioBuf.size()) stdioBuf.resize(index + 1);
|
||||
if (index >= stdioBuf.size()) {
|
||||
stdioBuf.resize(index + 1);
|
||||
}
|
||||
stdioBuf[index].flags = UV_INHERIT_FD;
|
||||
stdioBuf[index].data.fd = o.m_data.stdio.fd;
|
||||
break;
|
||||
}
|
||||
case Option::kStdioInheritPipe: {
|
||||
size_t index = o.m_data.stdio.index;
|
||||
if (index >= stdioBuf.size()) stdioBuf.resize(index + 1);
|
||||
if (index >= stdioBuf.size()) {
|
||||
stdioBuf.resize(index + 1);
|
||||
}
|
||||
stdioBuf[index].flags = UV_INHERIT_STREAM;
|
||||
stdioBuf[index].data.stream = o.m_data.stdio.pipe->GetRawStream();
|
||||
break;
|
||||
}
|
||||
case Option::kStdioCreatePipe: {
|
||||
size_t index = o.m_data.stdio.index;
|
||||
if (index >= stdioBuf.size()) stdioBuf.resize(index + 1);
|
||||
if (index >= stdioBuf.size()) {
|
||||
stdioBuf.resize(index + 1);
|
||||
}
|
||||
stdioBuf[index].flags =
|
||||
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | o.m_data.stdio.flags);
|
||||
stdioBuf[index].data.stream = o.m_data.stdio.pipe->GetRawStream();
|
||||
@@ -97,7 +105,9 @@ std::shared_ptr<Process> Process::SpawnArray(Loop& loop, const Twine& file,
|
||||
}
|
||||
}
|
||||
|
||||
if (argsBuf.empty()) argsBuf.push_back(const_cast<char*>(coptions.file));
|
||||
if (argsBuf.empty()) {
|
||||
argsBuf.push_back(const_cast<char*>(coptions.file));
|
||||
}
|
||||
argsBuf.push_back(nullptr);
|
||||
coptions.args = argsBuf.data();
|
||||
|
||||
|
||||
@@ -38,18 +38,22 @@ void Stream::Shutdown(const std::shared_ptr<ShutdownReq>& req) {
|
||||
if (Invoke(&uv_shutdown, req->GetRaw(), GetRawStream(),
|
||||
[](uv_shutdown_t* req, int status) {
|
||||
auto& h = *static_cast<ShutdownReq*>(req->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.complete();
|
||||
}
|
||||
h.Release(); // this is always a one-shot
|
||||
}))
|
||||
})) {
|
||||
req->Keep();
|
||||
}
|
||||
}
|
||||
|
||||
void Stream::Shutdown(std::function<void()> callback) {
|
||||
auto req = std::make_shared<ShutdownReq>();
|
||||
if (callback) req->complete.connect(callback);
|
||||
if (callback) {
|
||||
req->complete.connect(callback);
|
||||
}
|
||||
Shutdown(req);
|
||||
}
|
||||
|
||||
@@ -60,12 +64,13 @@ void Stream::StartRead() {
|
||||
Buffer data = *buf;
|
||||
|
||||
// nread=0 is simply ignored
|
||||
if (nread == UV_EOF)
|
||||
if (nread == UV_EOF) {
|
||||
h.end();
|
||||
else if (nread > 0)
|
||||
} else if (nread > 0) {
|
||||
h.data(data, static_cast<size_t>(nread));
|
||||
else if (nread < 0)
|
||||
} else if (nread < 0) {
|
||||
h.ReportError(nread);
|
||||
}
|
||||
|
||||
// free the buffer
|
||||
h.FreeBuf(data);
|
||||
@@ -77,11 +82,14 @@ void Stream::Write(ArrayRef<Buffer> bufs,
|
||||
if (Invoke(&uv_write, req->GetRaw(), GetRawStream(), bufs.data(), bufs.size(),
|
||||
[](uv_write_t* r, int status) {
|
||||
auto& h = *static_cast<WriteReq*>(r->data);
|
||||
if (status < 0) h.ReportError(status);
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
}
|
||||
h.finish(Error(status));
|
||||
h.Release(); // this is always a one-shot
|
||||
}))
|
||||
})) {
|
||||
req->Keep();
|
||||
}
|
||||
}
|
||||
|
||||
void Stream::Write(
|
||||
|
||||
@@ -23,13 +23,19 @@ std::shared_ptr<Tcp> Tcp::Create(Loop& loop, unsigned int flags) {
|
||||
}
|
||||
|
||||
void Tcp::Reuse(std::function<void()> callback, unsigned int flags) {
|
||||
if (IsClosing()) return;
|
||||
if (!m_reuseData) m_reuseData = std::make_unique<ReuseData>();
|
||||
if (IsClosing()) {
|
||||
return;
|
||||
}
|
||||
if (!m_reuseData) {
|
||||
m_reuseData = std::make_unique<ReuseData>();
|
||||
}
|
||||
m_reuseData->callback = callback;
|
||||
m_reuseData->flags = flags;
|
||||
uv_close(GetRawHandle(), [](uv_handle_t* handle) {
|
||||
Tcp& h = *static_cast<Tcp*>(handle->data);
|
||||
if (!h.m_reuseData) return; // just in case
|
||||
if (!h.m_reuseData) {
|
||||
return; // just in case
|
||||
}
|
||||
auto data = std::move(h.m_reuseData);
|
||||
int err = uv_tcp_init_ex(h.GetLoopRef().GetRaw(), h.GetRaw(), data->flags);
|
||||
if (err < 0) {
|
||||
@@ -42,7 +48,9 @@ void Tcp::Reuse(std::function<void()> callback, unsigned int flags) {
|
||||
|
||||
std::shared_ptr<Tcp> Tcp::Accept() {
|
||||
auto client = Create(GetLoopRef());
|
||||
if (!client) return nullptr;
|
||||
if (!client) {
|
||||
return nullptr;
|
||||
}
|
||||
if (!Accept(client)) {
|
||||
client->Release();
|
||||
return nullptr;
|
||||
@@ -50,32 +58,37 @@ std::shared_ptr<Tcp> Tcp::Accept() {
|
||||
return client;
|
||||
}
|
||||
|
||||
Tcp* Tcp::DoAccept() { return Accept().get(); }
|
||||
Tcp* Tcp::DoAccept() {
|
||||
return Accept().get();
|
||||
}
|
||||
|
||||
void Tcp::Bind(const Twine& ip, unsigned int port, unsigned int flags) {
|
||||
sockaddr_in addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Bind(reinterpret_cast<const sockaddr&>(addr), flags);
|
||||
}
|
||||
}
|
||||
|
||||
void Tcp::Bind6(const Twine& ip, unsigned int port, unsigned int flags) {
|
||||
sockaddr_in6 addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Bind(reinterpret_cast<const sockaddr&>(addr), flags);
|
||||
}
|
||||
}
|
||||
|
||||
sockaddr_storage Tcp::GetSock() {
|
||||
sockaddr_storage name;
|
||||
int len = sizeof(name);
|
||||
if (!Invoke(&uv_tcp_getsockname, GetRaw(), reinterpret_cast<sockaddr*>(&name),
|
||||
&len))
|
||||
&len)) {
|
||||
std::memset(&name, 0, sizeof(name));
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
@@ -83,8 +96,9 @@ sockaddr_storage Tcp::GetPeer() {
|
||||
sockaddr_storage name;
|
||||
int len = sizeof(name);
|
||||
if (!Invoke(&uv_tcp_getpeername, GetRaw(), reinterpret_cast<sockaddr*>(&name),
|
||||
&len))
|
||||
&len)) {
|
||||
std::memset(&name, 0, sizeof(name));
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
@@ -93,13 +107,15 @@ void Tcp::Connect(const sockaddr& addr,
|
||||
if (Invoke(&uv_tcp_connect, req->GetRaw(), GetRaw(), &addr,
|
||||
[](uv_connect_t* req, int status) {
|
||||
auto& h = *static_cast<TcpConnectReq*>(req->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.connected();
|
||||
}
|
||||
h.Release(); // this is always a one-shot
|
||||
}))
|
||||
})) {
|
||||
req->Keep();
|
||||
}
|
||||
}
|
||||
|
||||
void Tcp::Connect(const sockaddr& addr, std::function<void()> callback) {
|
||||
@@ -112,40 +128,44 @@ void Tcp::Connect(const Twine& ip, unsigned int port,
|
||||
const std::shared_ptr<TcpConnectReq>& req) {
|
||||
sockaddr_in addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Connect(reinterpret_cast<const sockaddr&>(addr), req);
|
||||
}
|
||||
}
|
||||
|
||||
void Tcp::Connect(const Twine& ip, unsigned int port,
|
||||
std::function<void()> callback) {
|
||||
sockaddr_in addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Connect(reinterpret_cast<const sockaddr&>(addr), callback);
|
||||
}
|
||||
}
|
||||
|
||||
void Tcp::Connect6(const Twine& ip, unsigned int port,
|
||||
const std::shared_ptr<TcpConnectReq>& req) {
|
||||
sockaddr_in6 addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Connect(reinterpret_cast<const sockaddr&>(addr), req);
|
||||
}
|
||||
}
|
||||
|
||||
void Tcp::Connect6(const Twine& ip, unsigned int port,
|
||||
std::function<void()> callback) {
|
||||
sockaddr_in6 addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Connect(reinterpret_cast<const sockaddr&>(addr), callback);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace uv
|
||||
|
||||
@@ -22,7 +22,9 @@ std::shared_ptr<Timer> Timer::Create(Loop& loop) {
|
||||
|
||||
void Timer::SingleShot(Loop& loop, Time timeout, std::function<void()> func) {
|
||||
auto h = Create(loop);
|
||||
if (!h) return;
|
||||
if (!h) {
|
||||
return;
|
||||
}
|
||||
h->timeout.connect([theTimer = h.get(), func]() {
|
||||
func();
|
||||
theTimer->Close();
|
||||
|
||||
@@ -51,45 +51,50 @@ std::shared_ptr<Udp> Udp::Create(Loop& loop, unsigned int flags) {
|
||||
void Udp::Bind(const Twine& ip, unsigned int port, unsigned int flags) {
|
||||
sockaddr_in addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Bind(reinterpret_cast<const sockaddr&>(addr), flags);
|
||||
}
|
||||
}
|
||||
|
||||
void Udp::Bind6(const Twine& ip, unsigned int port, unsigned int flags) {
|
||||
sockaddr_in6 addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Bind(reinterpret_cast<const sockaddr&>(addr), flags);
|
||||
}
|
||||
}
|
||||
|
||||
void Udp::Connect(const Twine& ip, unsigned int port) {
|
||||
sockaddr_in addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Connect(reinterpret_cast<const sockaddr&>(addr));
|
||||
}
|
||||
}
|
||||
|
||||
void Udp::Connect6(const Twine& ip, unsigned int port) {
|
||||
sockaddr_in6 addr;
|
||||
int err = NameToAddr(ip, port, &addr);
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
ReportError(err);
|
||||
else
|
||||
} else {
|
||||
Connect(reinterpret_cast<const sockaddr&>(addr));
|
||||
}
|
||||
}
|
||||
|
||||
sockaddr_storage Udp::GetPeer() {
|
||||
sockaddr_storage name;
|
||||
int len = sizeof(name);
|
||||
if (!Invoke(&uv_udp_getpeername, GetRaw(), reinterpret_cast<sockaddr*>(&name),
|
||||
&len))
|
||||
&len)) {
|
||||
std::memset(&name, 0, sizeof(name));
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
@@ -97,8 +102,9 @@ sockaddr_storage Udp::GetSock() {
|
||||
sockaddr_storage name;
|
||||
int len = sizeof(name);
|
||||
if (!Invoke(&uv_udp_getsockname, GetRaw(), reinterpret_cast<sockaddr*>(&name),
|
||||
&len))
|
||||
&len)) {
|
||||
std::memset(&name, 0, sizeof(name));
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
@@ -123,11 +129,14 @@ void Udp::Send(const sockaddr& addr, ArrayRef<Buffer> bufs,
|
||||
if (Invoke(&uv_udp_send, req->GetRaw(), GetRaw(), bufs.data(), bufs.size(),
|
||||
&addr, [](uv_udp_send_t* r, int status) {
|
||||
auto& h = *static_cast<UdpSendReq*>(r->data);
|
||||
if (status < 0) h.ReportError(status);
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
}
|
||||
h.complete(Error(status));
|
||||
h.Release(); // this is always a one-shot
|
||||
}))
|
||||
})) {
|
||||
req->Keep();
|
||||
}
|
||||
}
|
||||
|
||||
void Udp::Send(const sockaddr& addr, ArrayRef<Buffer> bufs,
|
||||
@@ -139,11 +148,14 @@ void Udp::Send(ArrayRef<Buffer> bufs, const std::shared_ptr<UdpSendReq>& req) {
|
||||
if (Invoke(&uv_udp_send, req->GetRaw(), GetRaw(), bufs.data(), bufs.size(),
|
||||
nullptr, [](uv_udp_send_t* r, int status) {
|
||||
auto& h = *static_cast<UdpSendReq*>(r->data);
|
||||
if (status < 0) h.ReportError(status);
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
}
|
||||
h.complete(Error(status));
|
||||
h.Release(); // this is always a one-shot
|
||||
}))
|
||||
})) {
|
||||
req->Keep();
|
||||
}
|
||||
}
|
||||
|
||||
void Udp::Send(ArrayRef<Buffer> bufs,
|
||||
@@ -159,10 +171,11 @@ void Udp::StartRecv() {
|
||||
Buffer data = *buf;
|
||||
|
||||
// nread=0 is simply ignored
|
||||
if (nread > 0)
|
||||
if (nread > 0) {
|
||||
h.received(data, static_cast<size_t>(nread), *addr, flags);
|
||||
else if (nread < 0)
|
||||
} else if (nread < 0) {
|
||||
h.ReportError(nread);
|
||||
}
|
||||
|
||||
// free the buffer
|
||||
h.FreeBuf(data);
|
||||
|
||||
@@ -22,23 +22,29 @@ void QueueWork(Loop& loop, const std::shared_ptr<WorkReq>& req) {
|
||||
},
|
||||
[](uv_work_t* req, int status) {
|
||||
auto& h = *static_cast<WorkReq*>(req->data);
|
||||
if (status < 0)
|
||||
if (status < 0) {
|
||||
h.ReportError(status);
|
||||
else
|
||||
} else {
|
||||
h.afterWork();
|
||||
}
|
||||
h.Release(); // this is always a one-shot
|
||||
});
|
||||
if (err < 0)
|
||||
if (err < 0) {
|
||||
loop.ReportError(err);
|
||||
else
|
||||
} else {
|
||||
req->Keep();
|
||||
}
|
||||
}
|
||||
|
||||
void QueueWork(Loop& loop, std::function<void()> work,
|
||||
std::function<void()> afterWork) {
|
||||
auto req = std::make_shared<WorkReq>();
|
||||
if (work) req->work.connect(work);
|
||||
if (afterWork) req->afterWork.connect(afterWork);
|
||||
if (work) {
|
||||
req->work.connect(work);
|
||||
}
|
||||
if (afterWork) {
|
||||
req->afterWork.connect(afterWork);
|
||||
}
|
||||
QueueWork(loop, req);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user