From 70837c180c988ed96543ef71ced2abd1e0778b95 Mon Sep 17 00:00:00 2001 From: Niels Sascha Reedijk Date: Tue, 5 Apr 2022 21:39:54 +0100 Subject: [PATCH] NetServices: implement writing in the data thread. Change-Id: Ideb63f9f96bf62ea7d9ce63df22805473273855d --- .../network/libnetservices2/HttpSession.cpp | 258 +++++++++++++++++- 1 file changed, 255 insertions(+), 3 deletions(-) diff --git a/src/kits/network/libnetservices2/HttpSession.cpp b/src/kits/network/libnetservices2/HttpSession.cpp index 84efbfc1b3..4bc1a21ee2 100644 --- a/src/kits/network/libnetservices2/HttpSession.cpp +++ b/src/kits/network/libnetservices2/HttpSession.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -49,7 +50,7 @@ public: ContentReceived, TrailingHeadersReceived }; - RequestState State() const { return fRequestStatus; } + RequestState State() const noexcept { return fRequestStatus; } // Result Helpers std::shared_ptr @@ -59,8 +60,14 @@ public: // Operational methods void ResolveHostName(); void OpenConnection(); + void TransferRequest(); + bool ReceiveResult(); + void Disconnect() noexcept; + + // Object information + int Socket() const noexcept { return fSocket->Socket(); } + int32 Id() const noexcept { return fResult->id; } - // private: BHttpRequest fRequest; @@ -75,6 +82,10 @@ private: BNetworkAddress fRemoteAddress; std::unique_ptr fSocket; + // Transfer state + std::unique_ptr + fDataStream; + // Receive state /* bool receiveEnd = false; bool parseEnd = false; @@ -279,10 +290,216 @@ BHttpSession::Impl::ControlThreadFunc(void* arg) } +static constexpr uint16 EVENT_CANCELLED = 0x4000; + + /*static*/ status_t BHttpSession::Impl::DataThreadFunc(void* arg) { - // BHttpSession::Impl* data = static_cast(arg); + BHttpSession::Impl* data = static_cast(arg); + + // initial initialization of wait list + data->objectList.push_back(object_wait_info{data->fDataQueueSem, + B_OBJECT_TYPE_SEMAPHORE, B_EVENT_ACQUIRE_SEMAPHORE}); + + while (true) { + if (auto status = wait_for_objects(data->objectList.data(), data->objectList.size()); + status < B_OK) + { + // Something went inexplicably wrong + throw BSystemError("wait_for_objects()", status); + } + + // First check if the change is in acquiring the sem, meaning that + // there are new requests to be scheduled + if (data->objectList[0].events == B_EVENT_ACQUIRE_SEMAPHORE) { + if (auto status = acquire_sem(data->fDataQueueSem); status == B_INTERRUPTED) + continue; + else if (status != B_OK) { + // Most likely B_BAD_SEM_ID indicating that the sem was deleted + break; + } + + // Process the cancelList and dataQueue. Note that there might + // be a situation where a request is cancelled and added in the + // same iteration, but that is taken care by this algorithm. + data->fLock.Lock(); + while (!data->fDataQueue.empty()) { + auto request = std::move(data->fDataQueue.front()); + data->fDataQueue.pop_front(); + auto socket = request.Socket(); + + data->connectionMap.insert(std::make_pair(socket, std::move(request))); + + // Add to objectList + data->objectList.push_back(object_wait_info{socket, + B_OBJECT_TYPE_FD, B_EVENT_WRITE + }); + } + + for (auto id: data->fCancelList) { + // To cancel, we set a special event status on the + // object_wait_info list so that we can handle it below. + // Also: the first item in the waitlist is always the semaphore + // so the fun starts at offset 1. + size_t offset = 0; + for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) { + offset++; + if (it->second.Id() == id) { + data->objectList[offset].events = EVENT_CANCELLED; + break; + } + } + } + data->fCancelList.clear(); + data->fLock.Unlock(); + } else if ((data->objectList[0].events & B_EVENT_INVALID) == B_EVENT_INVALID) { + // The semaphore has been deleted. Start the cleanup + break; + } + + // Process all objects that are ready + bool resizeObjectList = false; + for(auto& item: data->objectList) { + if (item.type != B_OBJECT_TYPE_FD) + continue; + if ((item.events & B_EVENT_WRITE) == B_EVENT_WRITE) { + auto& request = data->connectionMap.find(item.object)->second; + auto error = false; + try { + request.TransferRequest(); + } catch (...) { + request.SetError(std::current_exception()); + error = true; + } + + // End failed writes + if (error) { + request.Disconnect(); + data->connectionMap.erase(item.object); + resizeObjectList = true; + } + } else if ((item.events & B_EVENT_READ) == B_EVENT_READ) { + auto& request = data->connectionMap.find(item.object)->second; + auto finished = false; + // TODO: replace the 5 lines below + try { + throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); + } catch (...) { + request.SetError(std::current_exception()); + finished = true; + } + +/* TODO with a properly implement read + auto& request = data->connectionMap.find(item.object)->second; + auto success = false; + try { + finished = _RequestRead(request); + if (finished) + request.result->SetBody(); + success = true; + } catch (...) { + request.result->SetError(std::current_exception()); + finished = true; + } +*/ + +/* if (request.result->CanCancel()) { + // This could be done earlier, but this seems cleaner for the flow + request.Disconnect(); + data->connectionMap.erase(item.object); + resizeObjectList = true; + } else */ if (finished) { + request.Disconnect(); + /* TODO: implement this somewhere else + if (request.observer.IsValid()) { + BMessage msg(UrlEvent::RequestCompleted); + msg.AddInt32(UrlEventData::Id, request.result->id); + msg.AddBool(UrlEventData::Success, success); + request.observer.SendMessage(&msg); + } + */ + data->connectionMap.erase(item.object); + resizeObjectList = true; + } + } else if ((item.events & B_EVENT_DISCONNECTED) == B_EVENT_DISCONNECTED) { + auto& request = data->connectionMap.find(item.object)->second; + try { + throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::NetworkError); + } catch (...) { + request.SetError(std::current_exception()); + } +/* TODO: move to BHttpSession::Request::SetError??? + if (request.observer.IsValid()) { + BMessage msg(UrlEvent::RequestCompleted); + msg.AddInt32(UrlEventData::Id, request.result->id); + msg.AddBool(UrlEventData::Success, false); + request.observer.SendMessage(&msg); + } */ + data->connectionMap.erase(item.object); + resizeObjectList = true; + } else if ((item.events & EVENT_CANCELLED) == EVENT_CANCELLED) { + auto& request = data->connectionMap.find(item.object)->second; + request.Disconnect(); + try { + throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); + } catch (...) { + request.SetError(std::current_exception()); + } + /* TODO: move to SetError()? + if (request.observer.IsValid()) { + BMessage msg(UrlEvent::RequestCompleted); + msg.AddInt32(UrlEventData::Id, request.result->id); + msg.AddBool(UrlEventData::Success, false); + request.observer.SendMessage(&msg); + }*/ + data->connectionMap.erase(item.object); + resizeObjectList = true; + } else { + // Likely to be B_EVENT_INVALID. This should not happen + throw BRuntimeError(__PRETTY_FUNCTION__, "Socket was deleted at an unexpected time"); + } + } + + // Reset objectList + data->objectList[0].events = B_EVENT_ACQUIRE_SEMAPHORE; + if (resizeObjectList) { + data->objectList.resize(data->connectionMap.size() + 1); + } + auto i = 1; + for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) { + data->objectList[i].object = it->first; + if (it->second.State() == Request::InitialState) + throw BRuntimeError(__PRETTY_FUNCTION__, "Invalid state of request"); + else if (it->second.State() == Request::Connected) + data->objectList[i].events = B_EVENT_WRITE | B_EVENT_DISCONNECTED; + else + data->objectList[i].events = B_EVENT_READ | B_EVENT_DISCONNECTED; + i++; + } + } + // Clean up and make sure we are quitting + if (atomic_get(&data->fQuitting) == 1) { + // Cancel all requests + for (auto it = data->connectionMap.begin(); it != data->connectionMap.end(); it++) { + try { + throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled); + } catch (...) { + it->second.SetError(std::current_exception()); + } + /* TODO: should be part of SetError() + if (it->second.observer.IsValid()) { + BMessage msg(UrlEvent::RequestCompleted); + msg.AddInt32(UrlEventData::Id, it->second.result->id); + msg.AddBool(UrlEventData::Success, false); + it->second.observer.SendMessage(&msg); + }*/ + } + } else { + throw BRuntimeError(__PRETTY_FUNCTION__, + "Unknown reason that the dataQueueSem is deleted"); + } + return B_OK; } @@ -381,3 +598,38 @@ BHttpSession::Request::OpenConnection() fRequestStatus = Connected; } + + +/*! + \brief Transfer data from the request to the socket. + + \returns \c true if the request is complete, or false if there is more. +*/ +void +BHttpSession::Request::TransferRequest() +{ + // Assert that we are in the right state + if (fRequestStatus != Connected) + throw BRuntimeError(__PRETTY_FUNCTION__, + "Write request for object that is not in the Connected state"); + + if (!fDataStream) + fDataStream = std::make_unique(fRequest); + + auto [currentBytesWritten, totalBytesWritten, totalSize, complete] + = fDataStream->Transfer(fSocket.get()); + + // TODO: notification +} + + +/*! + \brief Disconnect the socket. Does not validate if it actually succeeded. +*/ +void +BHttpSession::Request::Disconnect() noexcept +{ + fSocket->Disconnect(); + + // TODO: inform listeners that the request has ended +}