NetServices: implement writing in the data thread.

Change-Id: Ideb63f9f96bf62ea7d9ce63df22805473273855d
This commit is contained in:
Niels Sascha Reedijk 2022-04-05 21:39:54 +01:00
parent 1d2070c739
commit 70837c180c

View File

@ -17,6 +17,7 @@
#include <HttpRequest.h>
#include <HttpResult.h>
#include <HttpSession.h>
#include <HttpStream.h>
#include <Locker.h>
#include <Messenger.h>
#include <NetBuffer.h>
@ -49,7 +50,7 @@ public:
ContentReceived,
TrailingHeadersReceived
};
RequestState State() const { return fRequestStatus; }
RequestState State() const noexcept { return fRequestStatus; }
// Result Helpers
std::shared_ptr<HttpResultPrivate>
@ -59,8 +60,14 @@ public:
// Operational methods
void ResolveHostName();
void OpenConnection();
void TransferRequest();
bool ReceiveResult();
void Disconnect() noexcept;
// Object information
int Socket() const noexcept { return fSocket->Socket(); }
int32 Id() const noexcept { return fResult->id; }
//
private:
BHttpRequest fRequest;
@ -75,6 +82,10 @@ private:
BNetworkAddress fRemoteAddress;
std::unique_ptr<BSocket> fSocket;
// Transfer state
std::unique_ptr<BAbstractDataStream>
fDataStream;
// Receive state
/* bool receiveEnd = false;
bool parseEnd = false;
@ -279,10 +290,216 @@ BHttpSession::Impl::ControlThreadFunc(void* arg)
}
static constexpr uint16 EVENT_CANCELLED = 0x4000;
/*static*/ status_t
BHttpSession::Impl::DataThreadFunc(void* arg)
{
// BHttpSession::Impl* data = static_cast<BHttpSession::Impl*>(arg);
BHttpSession::Impl* data = static_cast<BHttpSession::Impl*>(arg);
// initial initialization of wait list
data->objectList.push_back(object_wait_info{data->fDataQueueSem,
B_OBJECT_TYPE_SEMAPHORE, B_EVENT_ACQUIRE_SEMAPHORE});
while (true) {
if (auto status = wait_for_objects(data->objectList.data(), data->objectList.size());
status < B_OK)
{
// Something went inexplicably wrong
throw BSystemError("wait_for_objects()", status);
}
// First check if the change is in acquiring the sem, meaning that
// there are new requests to be scheduled
if (data->objectList[0].events == B_EVENT_ACQUIRE_SEMAPHORE) {
if (auto status = acquire_sem(data->fDataQueueSem); status == B_INTERRUPTED)
continue;
else if (status != B_OK) {
// Most likely B_BAD_SEM_ID indicating that the sem was deleted
break;
}
// Process the cancelList and dataQueue. Note that there might
// be a situation where a request is cancelled and added in the
// same iteration, but that is taken care by this algorithm.
data->fLock.Lock();
while (!data->fDataQueue.empty()) {
auto request = std::move(data->fDataQueue.front());
data->fDataQueue.pop_front();
auto socket = request.Socket();
data->connectionMap.insert(std::make_pair(socket, std::move(request)));
// Add to objectList
data->objectList.push_back(object_wait_info{socket,
B_OBJECT_TYPE_FD, B_EVENT_WRITE
});
}
for (auto id: data->fCancelList) {
// To cancel, we set a special event status on the
// object_wait_info list so that we can handle it below.
// Also: the first item in the waitlist is always the semaphore
// so the fun starts at offset 1.
size_t offset = 0;
for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) {
offset++;
if (it->second.Id() == id) {
data->objectList[offset].events = EVENT_CANCELLED;
break;
}
}
}
data->fCancelList.clear();
data->fLock.Unlock();
} else if ((data->objectList[0].events & B_EVENT_INVALID) == B_EVENT_INVALID) {
// The semaphore has been deleted. Start the cleanup
break;
}
// Process all objects that are ready
bool resizeObjectList = false;
for(auto& item: data->objectList) {
if (item.type != B_OBJECT_TYPE_FD)
continue;
if ((item.events & B_EVENT_WRITE) == B_EVENT_WRITE) {
auto& request = data->connectionMap.find(item.object)->second;
auto error = false;
try {
request.TransferRequest();
} catch (...) {
request.SetError(std::current_exception());
error = true;
}
// End failed writes
if (error) {
request.Disconnect();
data->connectionMap.erase(item.object);
resizeObjectList = true;
}
} else if ((item.events & B_EVENT_READ) == B_EVENT_READ) {
auto& request = data->connectionMap.find(item.object)->second;
auto finished = false;
// TODO: replace the 5 lines below
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
finished = true;
}
/* TODO with a properly implement read
auto& request = data->connectionMap.find(item.object)->second;
auto success = false;
try {
finished = _RequestRead(request);
if (finished)
request.result->SetBody();
success = true;
} catch (...) {
request.result->SetError(std::current_exception());
finished = true;
}
*/
/* if (request.result->CanCancel()) {
// This could be done earlier, but this seems cleaner for the flow
request.Disconnect();
data->connectionMap.erase(item.object);
resizeObjectList = true;
} else */ if (finished) {
request.Disconnect();
/* TODO: implement this somewhere else
if (request.observer.IsValid()) {
BMessage msg(UrlEvent::RequestCompleted);
msg.AddInt32(UrlEventData::Id, request.result->id);
msg.AddBool(UrlEventData::Success, success);
request.observer.SendMessage(&msg);
}
*/
data->connectionMap.erase(item.object);
resizeObjectList = true;
}
} else if ((item.events & B_EVENT_DISCONNECTED) == B_EVENT_DISCONNECTED) {
auto& request = data->connectionMap.find(item.object)->second;
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::NetworkError);
} catch (...) {
request.SetError(std::current_exception());
}
/* TODO: move to BHttpSession::Request::SetError???
if (request.observer.IsValid()) {
BMessage msg(UrlEvent::RequestCompleted);
msg.AddInt32(UrlEventData::Id, request.result->id);
msg.AddBool(UrlEventData::Success, false);
request.observer.SendMessage(&msg);
} */
data->connectionMap.erase(item.object);
resizeObjectList = true;
} else if ((item.events & EVENT_CANCELLED) == EVENT_CANCELLED) {
auto& request = data->connectionMap.find(item.object)->second;
request.Disconnect();
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
}
/* TODO: move to SetError()?
if (request.observer.IsValid()) {
BMessage msg(UrlEvent::RequestCompleted);
msg.AddInt32(UrlEventData::Id, request.result->id);
msg.AddBool(UrlEventData::Success, false);
request.observer.SendMessage(&msg);
}*/
data->connectionMap.erase(item.object);
resizeObjectList = true;
} else {
// Likely to be B_EVENT_INVALID. This should not happen
throw BRuntimeError(__PRETTY_FUNCTION__, "Socket was deleted at an unexpected time");
}
}
// Reset objectList
data->objectList[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
if (resizeObjectList) {
data->objectList.resize(data->connectionMap.size() + 1);
}
auto i = 1;
for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) {
data->objectList[i].object = it->first;
if (it->second.State() == Request::InitialState)
throw BRuntimeError(__PRETTY_FUNCTION__, "Invalid state of request");
else if (it->second.State() == Request::Connected)
data->objectList[i].events = B_EVENT_WRITE | B_EVENT_DISCONNECTED;
else
data->objectList[i].events = B_EVENT_READ | B_EVENT_DISCONNECTED;
i++;
}
}
// Clean up and make sure we are quitting
if (atomic_get(&data->fQuitting) == 1) {
// Cancel all requests
for (auto it = data->connectionMap.begin(); it != data->connectionMap.end(); it++) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
it->second.SetError(std::current_exception());
}
/* TODO: should be part of SetError()
if (it->second.observer.IsValid()) {
BMessage msg(UrlEvent::RequestCompleted);
msg.AddInt32(UrlEventData::Id, it->second.result->id);
msg.AddBool(UrlEventData::Success, false);
it->second.observer.SendMessage(&msg);
}*/
}
} else {
throw BRuntimeError(__PRETTY_FUNCTION__,
"Unknown reason that the dataQueueSem is deleted");
}
return B_OK;
}
@ -381,3 +598,38 @@ BHttpSession::Request::OpenConnection()
fRequestStatus = Connected;
}
/*!
\brief Transfer data from the request to the socket.
\returns \c true if the request is complete, or false if there is more.
*/
void
BHttpSession::Request::TransferRequest()
{
// Assert that we are in the right state
if (fRequestStatus != Connected)
throw BRuntimeError(__PRETTY_FUNCTION__,
"Write request for object that is not in the Connected state");
if (!fDataStream)
fDataStream = std::make_unique<BHttpRequestStream>(fRequest);
auto [currentBytesWritten, totalBytesWritten, totalSize, complete]
= fDataStream->Transfer(fSocket.get());
// TODO: notification
}
/*!
\brief Disconnect the socket. Does not validate if it actually succeeded.
*/
void
BHttpSession::Request::Disconnect() noexcept
{
fSocket->Disconnect();
// TODO: inform listeners that the request has ended
}