Merge pull request #1748 from majestrate/liblokinet-udp-api-2021-09-19

liblokinet updates
dev
majestrate 1 year ago committed by GitHub
commit 838183e36e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,8 +5,11 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
# Has to be set before `project()`, and ignored on non-macos:
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
option(BUILD_DAEMON "build lokinet daemon and associated utils" ON)
set(LANGS C CXX)
if(APPLE)
if(APPLE AND BUILD_DAEMON)
set(LANGS ${LANGS} OBJC Swift)
endif()
@ -43,6 +46,12 @@ endif()
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake")
set(DEFAULT_WITH_BOOTSTRAP ON)
if(APPLE)
set(DEFAULT_WITH_BOOTSTRAP OFF)
endif()
# Core options
option(USE_AVX2 "enable avx2 code" OFF)
option(USE_NETNS "enable networking namespace support. Linux only" OFF)
@ -60,6 +69,7 @@ option(TRACY_ROOT "include tracy profiler source" OFF)
option(WITH_TESTS "build unit tests" OFF)
option(WITH_HIVE "build simulation stubs" OFF)
option(BUILD_PACKAGE "builds extra components for making an installer (with 'make package')" OFF)
option(WITH_BOOTSTRAP "build lokinet-bootstrap tool" ${DEFAULT_WITH_BOOTSTRAP})
include(cmake/enable_lto.cmake)
@ -182,7 +192,7 @@ if(OXENMQ_FOUND)
message(STATUS "Found system liboxenmq ${OXENMQ_VERSION}")
else()
message(STATUS "using oxenmq submodule")
add_subdirectory(${CMAKE_SOURCE_DIR}/external/oxen-mq)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/external/oxen-mq)
endif()
@ -327,8 +337,9 @@ endif()
add_subdirectory(crypto)
add_subdirectory(llarp)
add_subdirectory(daemon)
if(BUILD_DAEMON)
add_subdirectory(daemon)
endif()
if(WITH_HIVE)
add_subdirectory(pybind)

@ -294,7 +294,6 @@ build_external(expat
)
add_static_target(expat expat_external libexpat.a)
build_external(unbound
DEPENDS openssl_external expat_external
CONFIGURE_COMMAND ./configure ${cross_host} ${cross_rc} --prefix=${DEPS_DESTDIR} --disable-shared
@ -352,6 +351,10 @@ set_target_properties(libzmq PROPERTIES
INTERFACE_LINK_LIBRARIES "${libzmq_link_libs}"
INTERFACE_COMPILE_DEFINITIONS "ZMQ_STATIC")
if(NOT WITH_BOOTSTRAP)
return()
endif()
set(curl_extra)
if(WIN32)
set(curl_ssl_opts --without-ssl --with-schannel)

@ -0,0 +1,10 @@
cmake_minimum_required(VERSION 3.10)
project(udptest LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
add_executable(udptest udptest.cpp)
include_directories(../../include)
target_link_libraries(udptest PUBLIC lokinet)

@ -0,0 +1,13 @@
# liblokinet examples
building:
$ mkdir -p build
$ cd build
$ cp /path/to/liblokinet.so .
$ cmake .. -DCMAKE_EXE_LINKER_FLAGS='-L.'
$ make
running:
$ ./udptest /path/to/bootstrap.signed

@ -0,0 +1,239 @@
#include <lokinet.h>
#include <signal.h>
#include <memory>
#include <stdexcept>
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <cstring>
#include <algorithm>
bool _run{true};
using Lokinet_ptr = std::shared_ptr<lokinet_context>;
[[nodiscard]] auto
MakeLokinet(const std::vector<char>& bootstrap)
{
auto ctx = std::shared_ptr<lokinet_context>(lokinet_context_new(), lokinet_context_free);
if (auto err = lokinet_add_bootstrap_rc(bootstrap.data(), bootstrap.size(), ctx.get()))
throw std::runtime_error{strerror(err)};
if (lokinet_context_start(ctx.get()))
throw std::runtime_error{"could not start context"};
return ctx;
}
void
WaitForReady(const Lokinet_ptr& ctx)
{
while (_run and lokinet_wait_for_ready(1000, ctx.get()))
{
std::cout << "waiting for context..." << std::endl;
}
}
class Flow
{
lokinet_udp_flowinfo const _info;
lokinet_context* const _ctx;
public:
explicit Flow(const lokinet_udp_flowinfo* info, lokinet_context* ctx) : _info{*info}, _ctx{ctx}
{}
lokinet_context*
Context() const
{
return _ctx;
}
std::string
String() const
{
std::stringstream ss;
ss << std::string{_info.remote_host} << ":" << std::to_string(_info.remote_port)
<< " on socket " << _info.socket_id;
return ss.str();
}
};
struct ConnectJob
{
lokinet_udp_flowinfo remote;
lokinet_context* ctx;
};
void
CreateOutboundFlow(void* user, void** flowdata, int* timeout)
{
auto* job = static_cast<ConnectJob*>(user);
Flow* flow = new Flow{&job->remote, job->ctx};
*flowdata = flow;
*timeout = 30;
std::cout << "made outbound flow: " << flow->String() << std::endl;
;
}
int
ProcessNewInboundFlow(void* user, const lokinet_udp_flowinfo* remote, void** flowdata, int* timeout)
{
auto* ctx = static_cast<lokinet_context*>(user);
Flow* flow = new Flow{remote, ctx};
std::cout << "new udp flow: " << flow->String() << std::endl;
*flowdata = flow;
*timeout = 30;
return 0;
}
void
DeleteFlow(const lokinet_udp_flowinfo* remote, void* flowdata)
{
auto* flow = static_cast<Flow*>(flowdata);
std::cout << "udp flow from " << flow->String() << " timed out" << std::endl;
delete flow;
}
void
HandleUDPPacket(const lokinet_udp_flowinfo* remote, const char* pkt, size_t len, void* flowdata)
{
auto* flow = static_cast<Flow*>(flowdata);
std::cout << "we got " << len << " bytes of udp from " << flow->String() << std::endl;
}
void
BounceUDPPacket(const lokinet_udp_flowinfo* remote, const char* pkt, size_t len, void* flowdata)
{
auto* flow = static_cast<Flow*>(flowdata);
std::cout << "bounce " << len << " bytes of udp from " << flow->String() << std::endl;
if (auto err = lokinet_udp_flow_send(remote, pkt, len, flow->Context()))
{
std::cout << "bounce failed: " << strerror(err) << std::endl;
}
}
Lokinet_ptr sender, recip;
void
signal_handler(int)
{
_run = false;
}
int
main(int argc, char* argv[])
{
if (argc == 1)
{
std::cout << "usage: " << argv[0] << " bootstrap.signed" << std::endl;
return 1;
}
/*
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
*/
std::vector<char> bootstrap;
// load bootstrap.signed
{
std::ifstream inf{argv[1], std::ifstream::ate | std::ifstream::binary};
size_t len = inf.tellg();
inf.seekg(0);
bootstrap.resize(len);
inf.read(bootstrap.data(), bootstrap.size());
}
if (auto* loglevel = getenv("LOKINET_LOG"))
lokinet_log_level(loglevel);
else
lokinet_log_level("none");
std::cout << "starting up" << std::endl;
recip = MakeLokinet(bootstrap);
WaitForReady(recip);
lokinet_udp_bind_result recipBindResult{};
const auto port = 10000;
if (auto err = lokinet_udp_bind(
port,
ProcessNewInboundFlow,
BounceUDPPacket,
DeleteFlow,
recip.get(),
&recipBindResult,
recip.get()))
{
std::cout << "failed to bind recip udp socket " << strerror(err) << std::endl;
return 0;
}
std::cout << "bound recip udp" << std::endl;
sender = MakeLokinet(bootstrap);
WaitForReady(sender);
std::string recipaddr{lokinet_address(recip.get())};
std::cout << "recip ready at " << recipaddr << std::endl;
lokinet_udp_bind_result senderBindResult{};
if (auto err = lokinet_udp_bind(
port,
ProcessNewInboundFlow,
HandleUDPPacket,
DeleteFlow,
sender.get(),
&senderBindResult,
sender.get()))
{
std::cout << "failed to bind sender udp socket " << strerror(err) << std::endl;
return 0;
}
ConnectJob connect{};
connect.remote.socket_id = senderBindResult.socket_id;
connect.remote.remote_port = port;
std::copy_n(recipaddr.c_str(), recipaddr.size(), connect.remote.remote_host);
connect.ctx = sender.get();
std::cout << "bound sender udp" << std::endl;
do
{
std::cout << "try establish to " << connect.remote.remote_host << std::endl;
if (auto err =
lokinet_udp_establish(CreateOutboundFlow, &connect, &connect.remote, sender.get()))
{
std::cout << "failed to establish to recip: " << strerror(err) << std::endl;
usleep(100000);
}
else
break;
} while (true);
std::cout << "sender established" << std::endl;
const std::string buf{"liblokinet"};
const std::string senderAddr{lokinet_address(sender.get())};
do
{
std::cout << senderAddr << " send to remote: " << buf << std::endl;
if (auto err = lokinet_udp_flow_send(&connect.remote, buf.data(), buf.size(), sender.get()))
{
std::cout << "send failed: " << strerror(err) << std::endl;
}
usleep(100000);
} while (_run);
return 0;
}

@ -1,8 +1,3 @@
set(DEFAULT_WITH_BOOTSTRAP ON)
if(APPLE)
set(DEFAULT_WITH_BOOTSTRAP OFF)
endif()
option(WITH_BOOTSTRAP "build lokinet-bootstrap tool" ${DEFAULT_WITH_BOOTSTRAP})
add_executable(lokinet-vpn lokinet-vpn.cpp)
if(APPLE)
@ -11,11 +6,10 @@ if(APPLE)
else()
add_executable(lokinet lokinet.cpp)
enable_lto(lokinet lokinet-vpn)
if(WITH_BOOTSTRAP)
add_executable(lokinet-bootstrap lokinet-bootstrap.cpp)
enable_lto(lokinet-bootstrap)
endif()
endif()
if(WITH_BOOTSTRAP)
add_executable(lokinet-bootstrap lokinet-bootstrap.cpp)
enable_lto(lokinet-bootstrap)
endif()

@ -56,26 +56,27 @@ add_ngtcp2_lib()
# cpr configuration. Ideally we'd just do this via add_subdirectory, but cpr's cmake requires
# 3.15+, and we target lower than that (and this is fairly simple to build).
if(WITH_BOOTSTRAP)
if(NOT BUILD_STATIC_DEPS)
find_package(CURL REQUIRED COMPONENTS HTTP HTTPS SSL)
if(NOT BUILD_STATIC_DEPS)
find_package(CURL REQUIRED COMPONENTS HTTP HTTPS SSL)
# CURL::libcurl wasn't added to FindCURL until cmake 3.12, so add it if necessary
if (CMAKE_VERSION VERSION_LESS 3.12 AND NOT TARGET CURL::libcurl)
add_library(libcurl UNKNOWN IMPORTED GLOBAL)
set_target_properties(libcurl PROPERTIES
IMPORTED_LOCATION ${CURL_LIBRARIES}
INTERFACE_INCLUDE_DIRECTORIES "${CURL_INCLUDE_DIRS}")
add_library(CURL_libcurl INTERFACE)
target_link_libraries(CURL_libcurl INTERFACE libcurl)
add_library(CURL::libcurl ALIAS CURL_libcurl)
# CURL::libcurl wasn't added to FindCURL until cmake 3.12, so add it if necessary
if (CMAKE_VERSION VERSION_LESS 3.12 AND NOT TARGET CURL::libcurl)
add_library(libcurl UNKNOWN IMPORTED GLOBAL)
set_target_properties(libcurl PROPERTIES
IMPORTED_LOCATION ${CURL_LIBRARIES}
INTERFACE_INCLUDE_DIRECTORIES "${CURL_INCLUDE_DIRS}")
add_library(CURL_libcurl INTERFACE)
target_link_libraries(CURL_libcurl INTERFACE libcurl)
add_library(CURL::libcurl ALIAS CURL_libcurl)
endif()
endif()
endif()
file(GLOB cpr_sources ${conf_depends} cpr/cpr/*.cpp)
file(GLOB cpr_sources ${conf_depends} cpr/cpr/*.cpp)
add_library(cpr STATIC EXCLUDE_FROM_ALL ${cpr_sources})
target_link_libraries(cpr PUBLIC CURL::libcurl)
target_include_directories(cpr PUBLIC cpr/include)
target_compile_definitions(cpr PUBLIC CPR_CURL_NOSIGNAL)
add_library(cpr::cpr ALIAS cpr)
add_library(cpr STATIC EXCLUDE_FROM_ALL ${cpr_sources})
target_link_libraries(cpr PUBLIC CURL::libcurl)
target_include_directories(cpr PUBLIC cpr/include)
target_compile_definitions(cpr PUBLIC CPR_CURL_NOSIGNAL)
add_library(cpr::cpr ALIAS cpr)
endif()

@ -21,6 +21,17 @@ extern "C"
int EXPORT
lokinet_log_level(const char*);
typedef void (*lokinet_logger_func)(const char*, void*);
/// set a custom logger function
void EXPORT
lokinet_set_logger(lokinet_logger_func func, void* user);
/// @brief take in hex and turn it into base32z
/// @return value must be free()'d later
char* EXPORT
lokinet_hex_to_base32z(const char* hex);
#ifdef __cplusplus
}
#endif

@ -8,7 +8,6 @@ extern "C"
#endif
/// the result of a lokinet stream mapping attempt
#pragma pack(1)
struct lokinet_stream_result
{
/// set to zero on success otherwise the error that happened
@ -23,7 +22,6 @@ extern "C"
/// the id of the stream we created
int stream_id;
};
#pragma pack()
/// connect out to a remote endpoint
/// remoteAddr is in the form of "name:port"
@ -39,7 +37,7 @@ extern "C"
/// return 0 to accept
/// return -1 to explicitly reject
/// return -2 to silently drop
typedef int (*lokinet_stream_filter)(const char* remote, uint16_t port, void*);
typedef int (*lokinet_stream_filter)(const char* remote, uint16_t port, void* userdata);
/// set stream accepter filter
/// passes user parameter into stream filter as void *
@ -53,6 +51,9 @@ extern "C"
int EXPORT
lokinet_inbound_stream(uint16_t port, struct lokinet_context* context);
void EXPORT
lokinet_close_stream(int stream_id, struct lokinet_context* context);
#ifdef __cplusplus
}
#endif

@ -2,54 +2,22 @@
#include "lokinet_context.h"
#ifdef _WIN32
extern "C"
{
struct iovec
{
void* iov_base;
size_t iov_len;
};
}
#else
#include <sys/uio.h>
#endif
#ifdef __cplusplus
extern "C"
{
#endif
/// information about a udp flow
struct lokinet_udp_flow
struct lokinet_udp_flowinfo
{
/// the socket id for this flow used for i/o purposes and closing this socket
int socket_id;
/// remote endpoint's .loki or .snode address
char remote_addr[256];
/// local endpoint's ip address
char local_addr[64];
char remote_host[256];
/// remote endpont's port
int remote_port;
/// local endpoint's port
int local_port;
uint16_t remote_port;
/// the socket id for this flow used for i/o purposes and closing this socket
int socket_id;
};
/// establish an outbound udp flow
/// remoteHost is the remote .loki or .snode address conneting to
/// remotePort is either a string integer or an srv record name to lookup, e.g. thingservice in
/// which we do a srv lookup for _udp.thingservice.remotehost.tld and use the "best" port provided
/// localAddr is the local ip:port to bind our socket to, if localAddr is NULL then
/// lokinet_udp_sendmmsg MUST be used to send packets return 0 on success return nonzero on fail,
/// containing an errno value
int EXPORT
lokinet_udp_establish(
char* remoteHost,
char* remotePort,
char* localAddr,
struct lokinet_udp_flow* flow,
struct lokinet_context* ctx);
/// a result from a lokinet_udp_bind call
struct lokinet_udp_bind_result
{
@ -57,48 +25,97 @@ extern "C"
int socket_id;
};
/// flow acceptor hook, return 0 success, return nonzero with errno on failure
typedef int (*lokinet_udp_flow_filter)(
void* userdata,
const struct lokinet_udp_flowinfo* remote_address,
void** flow_userdata,
int* timeout_seconds);
/// callback to make a new outbound flow
typedef void(lokinet_udp_create_flow_func)(
void* userdata, void** flow_userdata, int* timeout_seconds);
/// hook function for handling packets
typedef void (*lokinet_udp_flow_recv_func)(
const struct lokinet_udp_flowinfo* remote_address,
const char* pkt_data,
size_t pkt_length,
void* flow_userdata);
/// hook function for flow timeout
typedef void (*lokinet_udp_flow_timeout_func)(
const struct lokinet_udp_flowinfo* remote_address, void* flow_userdata);
/// inbound listen udp socket
/// expose udp port exposePort to the void
/// if srv is not NULL add an srv record for this port, the format being "thingservice" in which
/// will add a srv record "_udp.thingservice.ouraddress.tld" that advertises this port provide
/// localAddr to forward inbound udp packets to "ip:port" if localAddr is NULL then the resulting
/// socket MUST be drained by lokinet_udp_recvmmsg
////
/// @param filter MUST be non null, pointing to a flow filter for accepting new udp flows, called
/// with user data
///
/// returns 0 on success
/// returns nonzero on error in which it is an errno value
/// @param recv MUST be non null, pointing to a packet handler function for each flow, called
/// with per flow user data provided by filter function if accepted
///
/// @param timeout MUST be non null,
/// pointing to a cleanup function to clean up a stale flow, staleness determined by the value
/// given by the filter function returns 0 on success
///
/// @returns nonzero on error in which it is an errno value
int EXPORT
lokinet_udp_bind(
int exposedPort,
char* srv,
char* localAddr,
struct lokinet_udp_listen_result* result,
uint16_t exposedPort,
lokinet_udp_flow_filter filter,
lokinet_udp_flow_recv_func recv,
lokinet_udp_flow_timeout_func timeout,
void* user,
struct lokinet_udp_bind_result* result,
struct lokinet_context* ctx);
/// poll many udp sockets for activity
/// returns 0 on sucess
/// @brief establish a udp flow to remote endpoint
///
/// @param create_flow the callback to create the new flow if we establish one
///
/// @param user passed to new_flow as user data
///
/// @param remote the remote address to establish to
///
/// returns non zero errno on error
/// @param ctx the lokinet context to use
///
/// @return 0 on success, non zero errno on fail
int EXPORT
lokinet_udp_poll(
const int* socket_ids,
size_t numsockets,
const struct timespec* timeout,
lokinet_udp_establish(
lokinet_udp_create_flow_func create_flow,
void* user,
const struct lokinet_udp_flowinfo* remote,
struct lokinet_context* ctx);
struct lokinet_udp_pkt
{
char remote_addr[256];
int remote_port;
struct iovec pkt;
};
/// @brief send on an established flow to remote endpoint
/// blocks until we have sent the packet
///
/// @param flowinfo remote flow to use for sending
///
/// @param ptr pointer to data to send
///
/// @param len the length of the data
///
/// @param ctx the lokinet context to use
///
/// @returns 0 on success and non zero errno on fail
int EXPORT
lokinet_udp_flow_send(
const struct lokinet_udp_flowinfo* remote,
const void* ptr,
size_t len,
struct lokinet_context* ctx);
/// analog to recvmmsg
ssize_t EXPORT
lokinet_udp_recvmmsg(
int socket_id,
struct lokinet_udp_pkt* events,
size_t max_events,
struct lokient_context* ctx);
/// @brief close a bound udp socket
/// closes all flows immediately
///
/// @param socket_id the bound udp socket's id
///
/// @param ctx lokinet context
void EXPORT
lokinet_udp_close(int socket_id, struct lokinet_context* ctx);
#ifdef __cplusplus
}

@ -54,6 +54,7 @@ add_library(lokinet-platform
net/net_int.cpp
net/sock_addr.cpp
vpn/packet_router.cpp
vpn/egres_packet_router.cpp
vpn/platform.cpp
)

@ -1,5 +1,7 @@
#include "bootstrap.hpp"
#include "util/bencode.hpp"
#include "util/logging/logger.hpp"
#include "util/logging/buffer.hpp"
namespace llarp
{
@ -16,9 +18,12 @@ namespace llarp
[&](llarp_buffer_t* b, bool more) -> bool {
if (more)
{
RouterContact rc;
RouterContact rc{};
if (not rc.BDecode(b))
{
LogError("invalid rc in bootstrap list: ", llarp::buffer_printer{*b});
return false;
}
emplace(std::move(rc));
}
return true;

@ -1,6 +1,7 @@
#pragma once
#include <chrono>
#include <llarp/bootstrap.hpp>
#include <llarp/crypto/types.hpp>
#include <llarp/router_contact.hpp>
#include <llarp/util/fs.hpp>
@ -196,7 +197,7 @@ namespace llarp
struct BootstrapConfig
{
std::vector<fs::path> files;
std::set<RouterContact> routers;
BootstrapList routers;
bool seednode;
void
defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params);

@ -5,87 +5,120 @@
#include <llarp/quic/tunnel.hpp>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/ev/ev.hpp>
#include <llarp/vpn/egres_packet_router.hpp>
namespace llarp
namespace llarp::handlers
{
namespace handlers
struct NullEndpoint final : public llarp::service::Endpoint,
public std::enable_shared_from_this<NullEndpoint>
{
struct NullEndpoint final : public llarp::service::Endpoint,
public std::enable_shared_from_this<NullEndpoint>
NullEndpoint(AbstractRouter* r, llarp::service::Context* parent)
: llarp::service::Endpoint{r, parent}
, m_PacketRouter{new vpn::EgresPacketRouter{[](auto from, auto pkt) {
var::visit(
[&pkt](auto&& from) {
LogError("unhandled traffic from: ", from, " of ", pkt.sz, " bytes");
},
from);
}}}
{
NullEndpoint(AbstractRouter* r, llarp::service::Context* parent)
: llarp::service::Endpoint(r, parent)
r->loop()->add_ticker([this] { Pump(Now()); });
}
virtual bool
HandleInboundPacket(
const service::ConvoTag tag,
const llarp_buffer_t& buf,
service::ProtocolType t,
uint64_t) override
{
LogTrace("Inbound ", t, " packet (", buf.sz, "B) on convo ", tag);
if (t == service::ProtocolType::Control)
{
r->loop()->add_ticker([this] { Pump(Now()); });
return true;
}
virtual bool
HandleInboundPacket(
const service::ConvoTag tag,
const llarp_buffer_t& buf,
service::ProtocolType t,
uint64_t) override
if (t == service::ProtocolType::TrafficV4 or t == service::ProtocolType::TrafficV6)
{
LogTrace("Inbound ", t, " packet (", buf.sz, "B) on convo ", tag);
if (t == service::ProtocolType::Control)
if (auto from = GetEndpointWithConvoTag(tag))
{
net::IPPacket pkt{};
if (not pkt.Load(buf))
{
LogWarn("invalid ip packet from remote T=", tag);
return false;
}
m_PacketRouter->HandleIPPacketFrom(std::move(*from), std::move(pkt));
return true;
}
if (t != service::ProtocolType::QUIC)
return false;
auto* quic = GetQUICTunnel();
if (!quic)
{
LogWarn("incoming quic packet but this endpoint is not quic capable; dropping");
return false;
}
if (buf.sz < 4)
else
{
LogWarn("invalid incoming quic packet, dropping");
LogWarn("did not handle packet, no endpoint with convotag T=", tag);
return false;
}
quic->receive_packet(tag, buf);
return true;
}
if (t != service::ProtocolType::QUIC)
return false;
std::string
GetIfName() const override
auto* quic = GetQUICTunnel();
if (!quic)
{
return "";
LogWarn("incoming quic packet but this endpoint is not quic capable; dropping");
return false;
}
path::PathSet_ptr
GetSelf() override
if (buf.sz < 4)
{
return shared_from_this();
LogWarn("invalid incoming quic packet, dropping");
return false;
}
quic->receive_packet(tag, buf);
return true;
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
std::string
GetIfName() const override
{
return "";
}
bool
SupportsV6() const override
{
return false;
}
path::PathSet_ptr
GetSelf() override
{
return shared_from_this();
}
void
SendPacketToRemote(const llarp_buffer_t&, service::ProtocolType) override{};
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
huint128_t ObtainIPForAddr(std::variant<service::Address, RouterID>) override
{
return {0};
}
bool
SupportsV6() const override
{
return false;
}
std::optional<std::variant<service::Address, RouterID>> ObtainAddrForIP(
huint128_t) const override
{
return std::nullopt;
}
};
} // namespace handlers
} // namespace llarp
void
SendPacketToRemote(const llarp_buffer_t&, service::ProtocolType) override{};
huint128_t ObtainIPForAddr(std::variant<service::Address, RouterID>) override
{
return {0};
}
std::optional<std::variant<service::Address, RouterID>> ObtainAddrForIP(
huint128_t) const override
{
return std::nullopt;
}
vpn::EgresPacketRouter*
EgresPacketRouter() override
{
return m_PacketRouter.get();
}
private:
std::unique_ptr<vpn::EgresPacketRouter> m_PacketRouter;
};
} // namespace llarp::handlers

@ -15,7 +15,7 @@
#include <queue>
#include <type_traits>
#include <variant>
#include "service/protocol_type.hpp"
#include <llarp/service/protocol_type.hpp>
namespace llarp
{

@ -1,7 +1,5 @@
#include "lokinet.h"
#include "llarp.hpp"
#include <lokinet.h>
#include <llarp.hpp>
#include <llarp/config/config.hpp>
#include <llarp/crypto/crypto_libsodium.hpp>
@ -10,7 +8,13 @@
#include <llarp/quic/tunnel.hpp>
#include <llarp/nodedb.hpp>
#include <llarp/util/logging/buffer.hpp>
#include <oxenmq/base32z.h>
#include <mutex>
#include <memory>
#include <chrono>
#ifdef _WIN32
#define EHOSTDOWN ENETDOWN
@ -18,6 +22,34 @@
namespace
{
struct Logger : public llarp::ILogStream
{
lokinet_logger_func func;
void* user;
explicit Logger(lokinet_logger_func _func, void* _user) : func{_func}, user{_user}
{}
void
PreLog(std::stringstream&, llarp::LogLevel, std::string_view, int, const std::string&)
const override
{}
void
Print(llarp::LogLevel, std::string_view, const std::string& msg) override
{
func(msg.c_str(), user);
}
void
PostLog(std::stringstream&) const override{};
void
ImmediateFlush() override{};
void Tick(llarp_time_t) override{};
};
struct Context : public llarp::Context
{
using llarp::Context::Context;
@ -28,6 +60,162 @@ namespace
return std::make_shared<llarp::NodeDB>();
}
};
struct UDPFlow
{
using Clock_t = std::chrono::steady_clock;
void* m_FlowUserData;
std::chrono::seconds m_FlowTimeout;
std::chrono::time_point<Clock_t> m_ExpiresAt;
lokinet_udp_flowinfo m_FlowInfo;
lokinet_udp_flow_recv_func m_Recv;
/// call timeout hook for this flow
void
TimedOut(lokinet_udp_flow_timeout_func timeout)
{
timeout(&m_FlowInfo, m_FlowUserData);
}
/// mark this flow as active
/// updates the expires at timestamp
void
MarkActive()
{
m_ExpiresAt = Clock_t::now() + m_FlowTimeout;
}
/// returns true if we think this flow is expired
bool
IsExpired() const
{
return Clock_t::now() >= m_ExpiresAt;
}
void
HandlePacket(const llarp::net::IPPacket& pkt)
{
if (auto maybe = pkt.L4Data())
{
MarkActive();
m_Recv(&m_FlowInfo, maybe->first, maybe->second, m_FlowUserData);
}
}
};
struct UDPHandler
{
using AddressVariant_t = llarp::vpn::AddressVariant_t;
int m_SocketID;
llarp::nuint16_t m_LocalPort;
lokinet_udp_flow_filter m_Filter;
lokinet_udp_flow_recv_func m_Recv;
lokinet_udp_flow_timeout_func m_Timeout;
void* m_User;
std::weak_ptr<llarp::service::Endpoint> m_Endpoint;
std::unordered_map<AddressVariant_t, UDPFlow> m_Flows;
std::mutex m_Access;
explicit UDPHandler(
int socketid,
llarp::nuint16_t localport,
lokinet_udp_flow_filter filter,
lokinet_udp_flow_recv_func recv,
lokinet_udp_flow_timeout_func timeout,
void* user,
std::weak_ptr<llarp::service::Endpoint> ep)
: m_SocketID{socketid}
, m_LocalPort{localport}
, m_Filter{filter}
, m_Recv{recv}
, m_Timeout{timeout}
, m_User{user}
, m_Endpoint{ep}
{}
void
KillAllFlows()
{
std::unique_lock lock{m_Access};
for (auto& item : m_Flows)
{
item.second.TimedOut(m_Timeout);
}
m_Flows.clear();
}
void
AddFlow(
const AddressVariant_t& from,
const lokinet_udp_flowinfo& flow_addr,
void* flow_userdata,
int flow_timeoutseconds,
std::optional<llarp::net::IPPacket> firstPacket = std::nullopt)
{
std::unique_lock lock{m_Access};
auto& flow = m_Flows[from];
flow.m_FlowInfo = flow_addr;
flow.m_FlowTimeout = std::chrono::seconds{flow_timeoutseconds};
flow.m_FlowUserData = flow_userdata;
flow.m_Recv = m_Recv;
if (firstPacket)
flow.HandlePacket(*firstPacket);
}
void
ExpireOldFlows()
{
std::unique_lock lock{m_Access};
for (auto itr = m_Flows.begin(); itr != m_Flows.end();)
{
if (itr->second.IsExpired())
{
itr->second.TimedOut(m_Timeout);
itr = m_Flows.erase(itr);
}
else
++itr;
}
}
void
HandlePacketFrom(AddressVariant_t from, llarp::net::IPPacket pkt)
{
{
std::unique_lock lock{m_Access};
if (m_Flows.count(from))
{
m_Flows[from].HandlePacket(pkt);
return;
}
}
lokinet_udp_flowinfo flow_addr{};
// set flow remote address
std::string addrstr = var::visit([&flow_addr](auto&& from) { return from.ToString(); }, from);
std::copy_n(
addrstr.data(),
std::min(addrstr.size(), sizeof(flow_addr.remote_host)),
flow_addr.remote_host);
// set socket id
flow_addr.socket_id = m_SocketID;
// get source port
if (const auto srcport = pkt.SrcPort())
{
flow_addr.remote_port = ToHost(*srcport).h;
}
else
return; // invalid data so we bail
void* flow_userdata = nullptr;
int flow_timeoutseconds{};
// got a new flow, let's check if we want it
if (m_Filter(m_User, &flow_addr, &flow_userdata, &flow_timeoutseconds))
return;
AddFlow(from, flow_addr, flow_userdata, flow_timeoutseconds, pkt);
}
};
} // namespace
struct lokinet_context
@ -39,7 +227,10 @@ struct lokinet_context
std::unique_ptr<std::thread> runner;
lokinet_context() : impl{std::make_shared<Context>()}, config{llarp::Config::EmbeddedConfig()}
int _socket_id;
lokinet_context()
: impl{std::make_shared<Context>()}, config{llarp::Config::EmbeddedConfig()}, _socket_id{0}
{}
~lokinet_context()
@ -48,6 +239,91 @@ struct lokinet_context
runner->join();
}
int
next_socket_id()
{
int id = ++_socket_id;
// handle overflow
if (id < 0)
{
_socket_id = 0;
id = ++_socket_id;
}
return id;
}
/// make a udp handler and hold onto it
/// return its id
[[nodiscard]] std::optional<int>
make_udp_handler(
const std::shared_ptr<llarp::service::Endpoint>& ep,
llarp::huint16_t exposePort,
lokinet_udp_flow_filter filter,
lokinet_udp_flow_recv_func recv,
lokinet_udp_flow_timeout_func timeout,
void* user)
{
if (udp_sockets.empty())
{
// start udp flow expiration timer
impl->router->loop()->call_every(1s, std::make_shared<int>(0), [this]() {
std::unique_lock lock{m_access};
for (auto& item : udp_sockets)
{
item.second->ExpireOldFlows();
}
});
}
auto udp = std::make_shared<UDPHandler>(
next_socket_id(), llarp::ToNet(exposePort), filter, recv, timeout, user, std::weak_ptr{ep});
auto id = udp->m_SocketID;
std::promise<bool> result;
impl->router->loop()->call([ep, &result, udp, exposePort]() {
if (auto pkt = ep->EgresPacketRouter())
{
pkt->AddUDPHandler(exposePort, [udp](auto from, auto pkt) {
udp->HandlePacketFrom(std::move(from), std::move(pkt));
});
result.set_value(true);
}
else
result.set_value(false);
});
if (result.get_future().get())
{
udp_sockets[udp->m_SocketID] = std::move(udp);
return id;
}
return std::nullopt;
}
void
remove_udp_handler(int socket_id)
{
std::shared_ptr<UDPHandler> udp;
{
std::unique_lock lock{m_access};
if (auto itr = udp_sockets.find(socket_id); itr != udp_sockets.end())
{
udp = std::move(itr->second);
udp_sockets.erase(itr);
}
}
if (udp)
{
udp->KillAllFlows();
// remove packet handler
impl->router->loop()->call(
[ep = udp->m_Endpoint.lock(), localport = llarp::ToHost(udp->m_LocalPort)]() {
if (auto pkt = ep->EgresPacketRouter())
pkt->RemoveUDPHandler(localport);
});
}
}
/// acquire mutex for accessing this context
[[nodiscard]] auto
acquire()
@ -62,6 +338,7 @@ struct lokinet_context
}
std::unordered_map<int, bool> streams;
std::unordered_map<int, std::shared_ptr<UDPHandler>> udp_sockets;
void
inbound_stream(int id)
@ -78,8 +355,6 @@ struct lokinet_context
namespace
{
std::unique_ptr<lokinet_context> g_context;
void
stream_error(lokinet_stream_result* result, int err)
{
@ -230,18 +505,39 @@ extern "C"
int EXPORT
lokinet_add_bootstrap_rc(const char* data, size_t datalen, struct lokinet_context* ctx)
{
if (data == nullptr or datalen == 0)
return -3;
llarp_buffer_t buf{data, datalen};
llarp::RouterContact rc{};
if (ctx == nullptr)
return -3;
auto lock = ctx->acquire();
// add a temp cryptography implementation here so rc.Verify works
llarp::CryptoManager instance{new llarp::sodium::CryptoLibSodium{}};
if (not rc.BDecode(&buf))
return -1;
if (not rc.Verify(llarp::time_now_ms()))
return -2;
ctx->config->bootstrap.routers.insert(std::move(rc));
if (data[0] == 'l')
{
if (not ctx->config->bootstrap.routers.BDecode(&buf))
{
llarp::LogError("Cannot decode bootstrap list: ", llarp::buffer_printer{buf});
return -1;
}
for (const auto& rc : ctx->config->bootstrap.routers)
{
if (not rc.Verify(llarp::time_now_ms()))
return -2;
}
}
else
{
llarp::RouterContact rc{};
if (not rc.BDecode(&buf))
{
llarp::LogError("failed to decode signle RC: ", llarp::buffer_printer{buf});
return -1;
}
if (not rc.Verify(llarp::time_now_ms()))
return -2;
ctx->config->bootstrap.routers.insert(std::move(rc));
}
return 0;
}
@ -334,11 +630,11 @@ extern "C"
return;
auto lock = ctx->acquire();
if (not ctx->impl->IsStopping())
{
ctx->impl->CloseAsync();
ctx->impl->Wait();
}
if (ctx->impl->IsStopping())
return;
ctx->impl->CloseAsync();
ctx->impl->Wait();
if (ctx->runner)
ctx->runner->join();
@ -523,6 +819,27 @@ extern "C"
return id;
}
char* EXPORT
lokinet_hex_to_base32z(const char* hex)
{
std::string_view hexview{hex};
if (not oxenmq::is_hex(hexview))
return nullptr;
const size_t byte_len = hexview.size() / 2;
const size_t b32z_len = (byte_len * 8 + 4) / 5; // = ⌈N×8÷5⌉ because 5 bits per 32z char
auto buf = std::make_unique<char[]>(b32z_len + 1);
char* end = buf.get() + b32z_len;
*end = 0; // null terminate
// Write the bytes into the *end* of the buffer so that when we rewrite the final b32z chars
// into the buffer we won't overwrite any byte values until after we've consumed them.
char* bytepos = end - byte_len;
oxenmq::from_hex(hexview.begin(), hexview.end(), bytepos);
// In-place conversion into the buffer
oxenmq::to_base32z(bytepos, end, buf.get());
return buf.release(); // leak the buffer to the caller
}
void EXPORT
lokinet_close_stream(int stream_id, struct lokinet_context* ctx)
{
@ -594,4 +911,168 @@ extern "C"
delete result->internal;
result->internal = nullptr;
}
int EXPORT
lokinet_udp_bind(
uint16_t exposedPort,
lokinet_udp_flow_filter filter,
lokinet_udp_flow_recv_func recv,
lokinet_udp_flow_timeout_func timeout,
void* user,
struct lokinet_udp_bind_result* result,
struct lokinet_context* ctx)
{
if (filter == nullptr or recv == nullptr or timeout == nullptr or result == nullptr
or ctx == nullptr)
return EINVAL;
auto lock = ctx->acquire();
if (auto ep = ctx->endpoint())
{
if (auto maybe =
ctx->make_udp_handler(ep, llarp::huint16_t{exposedPort}, filter, recv, timeout, user))
{
result->socket_id = *maybe;
return 0;
}
}
return EINVAL;
}
void EXPORT
lokinet_udp_close(int socket_id, struct lokinet_context* ctx)
{
if (ctx)
{
ctx->remove_udp_handler(socket_id);
}
}
int EXPORT
lokinet_udp_flow_send(
const struct lokinet_udp_flowinfo* remote,
const void* ptr,
size_t len,
struct lokinet_context* ctx)
{
if (remote == nullptr or remote->remote_port == 0 or ptr == nullptr or len == 0
or ctx == nullptr)
return EINVAL;
std::shared_ptr<llarp::EndpointBase> ep;
llarp::nuint16_t srcport{0};
llarp::nuint16_t dstport{llarp::ToNet(llarp::huint16_t{remote->remote_port})};
{
auto lock = ctx->acquire();
if (auto itr = ctx->udp_sockets.find(remote->socket_id); itr != ctx->udp_sockets.end())
{
ep = itr->second->m_Endpoint.lock();
srcport = itr->second->m_LocalPort;
}
else
return EHOSTUNREACH;
}
if (auto maybe =