diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d398fe8..bd8f22f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to bssh will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added +- Internal fork of `russh-sftp` as `crates/bssh-russh-sftp` with a `serde_bytes` performance fix for `SSH_FXP_WRITE` and `SSH_FXP_DATA` packets. The upstream serde derive routes `Vec` through `deserialize_seq` (byte-by-byte), accounting for ~42% of server CPU during 1 GiB SFTP uploads in `perf` profiling. Annotating the `data` fields with `#[serde(with = "serde_bytes")]` and implementing wire-compatible `serialize_bytes` on the SFTP `Serializer` routes through the existing bulk `deserialize_byte_buf`/`try_get_bytes` path. Measured impact on a CPU-bound host (Xeon Silver 4214): 1 GiB SFTP upload throughput improves from 74.8 MiB/s to 96.4 MiB/s (+29%), closing the gap to OpenSSH `sftp-server` from ~26% to ~5%. + +### Changed +- Switched the top-level `russh-sftp` dependency from crates.io `russh-sftp = "2.1.1"` to `russh-sftp = { package = "bssh-russh-sftp", version = "2.1.1", path = "crates/bssh-russh-sftp" }`. All existing `use russh_sftp::...` imports continue to work unchanged. + ## [2.1.1] - 2026-04-17 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index ecc9490a..379f3101 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -446,6 +446,7 @@ dependencies = [ "atty", "bcrypt", "bssh-russh", + "bssh-russh-sftp", "bytes", "chrono", "clap", @@ -475,7 +476,6 @@ dependencies = [ "ratatui", "regex", "rpassword", - "russh-sftp", "rustls-native-certs", "rustyline", "secrecy", @@ -572,6 +572,23 @@ dependencies = [ "zeroize", ] +[[package]] +name = "bssh-russh-sftp" +version = "2.1.1" +dependencies = [ + "async-trait", + "bitflags 2.11.0", + "bytes", + "chrono", + "flurry", + "log", + "serde", + "serde_bytes", + "thiserror 2.0.18", + "tokio", + "tokio-util", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -3975,23 +3992,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "russh-sftp" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bb94393cafad0530145b8f626d8687f1ee1dedb93d7ba7740d6ae81868b13b5" -dependencies = [ - "bitflags 2.11.0", - "bytes", - "chrono", - "flurry", - "log", - "serde", - "thiserror 2.0.18", - "tokio", - "tokio-util", -] - [[package]] name = "russh-util" version = "0.52.0" @@ -4272,6 +4272,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" +dependencies = [ + "serde", + "serde_core", +] + [[package]] name = "serde_core" version = "1.0.228" diff --git a/Cargo.toml b/Cargo.toml index 5e98e720..4d35f32c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ ".", "crates/bssh-russh", + "crates/bssh-russh-sftp", ] [package] @@ -23,7 +24,8 @@ tokio = { version = "1.51.1", features = ["full"] } # - Development: uses local path (crates/bssh-russh) # - Publishing: uses crates.io version (path ignored) russh = { package = "bssh-russh", version = "0.60.1", path = "crates/bssh-russh" } -russh-sftp = "2.1.1" +# Use our internal russh-sftp fork with a serde_bytes perf fix +russh-sftp = { package = "bssh-russh-sftp", version = "2.1.1", path = "crates/bssh-russh-sftp" } clap = { version = "4.6.0", features = ["derive", "env"] } anyhow = "1.0.102" thiserror = "2.0.18" diff --git a/crates/bssh-russh-sftp/Cargo.toml b/crates/bssh-russh-sftp/Cargo.toml new file mode 100644 index 00000000..a69960c7 --- /dev/null +++ b/crates/bssh-russh-sftp/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "bssh-russh-sftp" +version = "2.1.1" +authors = ["Jeongkyu Shin "] +description = "Temporary fork of russh-sftp with a serde_bytes performance fix for SFTP Write/Data packets" +documentation = "https://docs.rs/bssh-russh-sftp" +edition = "2021" +homepage = "https://github.com/lablup/bssh" +keywords = ["russh", "sftp", "ssh2", "server", "client"] +license = "Apache-2.0" +readme = "README.md" +repository = "https://github.com/lablup/bssh" + +[dependencies] +tokio = { version = "1", default-features = false, features = [ + "io-util", + "rt", + "sync", + "time", + "macros", +] } +tokio-util = "0.7" +serde = { version = "1.0", features = ["derive"] } +serde_bytes = "0.11" +bitflags = { version = "2.9", features = ["serde"] } +async-trait = { version = "0.1", optional = true } + +thiserror = "2.0" +chrono = "0.4" +bytes = "1.10" +log = "0.4" +flurry = "0.5" + +[features] +async-trait = ["dep:async-trait"] diff --git a/crates/bssh-russh-sftp/create-patch.sh b/crates/bssh-russh-sftp/create-patch.sh new file mode 100755 index 00000000..9011ae52 --- /dev/null +++ b/crates/bssh-russh-sftp/create-patch.sh @@ -0,0 +1,48 @@ +#!/bin/bash +# create-patch.sh +# Creates a patch file from the current bssh-russh-sftp changes compared to upstream russh-sftp. +# +# Usage: ./create-patch.sh + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BSSH_ROOT="$SCRIPT_DIR/../.." +UPSTREAM_DIR="$BSSH_ROOT/references/russh-sftp/src" +CURRENT_DIR="$SCRIPT_DIR/src" +PATCH_DIR="$SCRIPT_DIR/patches" +PATCH_FILE="$PATCH_DIR/sftp-serde-bytes-perf.patch" + +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +log_info() { echo -e "${GREEN}[INFO]${NC} $1"; } +log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; } + +if [ ! -d "$UPSTREAM_DIR" ]; then + echo "Error: Upstream russh-sftp not found at $UPSTREAM_DIR" + echo "Please ensure references/russh-sftp exists with the upstream source." + exit 1 +fi + +mkdir -p "$PATCH_DIR" + +log_info "Creating patch from differences..." + +/usr/bin/diff -urN "$UPSTREAM_DIR" "$CURRENT_DIR" \ + | sed "s|$UPSTREAM_DIR|a/src|g" \ + | sed "s|$CURRENT_DIR|b/src|g" \ + > "$PATCH_FILE" || true + +if [ -s "$PATCH_FILE" ]; then + LINES=$(wc -l < "$PATCH_FILE" | tr -d ' ') + log_info "Patch created: $PATCH_FILE ($LINES lines)" + + echo "" + echo "Patch summary:" + echo "==============" + grep -E "^@@|^\+\+\+|^---" "$PATCH_FILE" | head -20 +else + log_warn "No differences found - patch file is empty" +fi diff --git a/crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch b/crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch new file mode 100644 index 00000000..4f8d4da6 --- /dev/null +++ b/crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch @@ -0,0 +1,62 @@ +diff -urN a/src/lib.rs src/lib.rs +--- a/src/lib.rs 2026-04-21 17:00:59 ++++ b/src/lib.rs 2026-04-21 17:05:30 +@@ -1,3 +1,6 @@ ++// Lints tripped by vendored upstream source that we do not want to diverge from. ++#![allow(clippy::io_other_error)] ++ + //! SFTP subsystem with client and server support for Russh and more! + //! + //! Crate can provide compatibility with anything that can provide the raw data +diff -urN a/src/protocol/data.rs src/protocol/data.rs +--- a/src/protocol/data.rs 2026-04-21 17:00:59 ++++ b/src/protocol/data.rs 2026-04-21 17:00:36 +@@ -4,6 +4,7 @@ + #[derive(Debug, Serialize, Deserialize)] + pub struct Data { + pub id: u32, ++ #[serde(with = "serde_bytes")] + pub data: Vec, + } + +diff -urN a/src/protocol/write.rs src/protocol/write.rs +--- a/src/protocol/write.rs 2026-04-21 17:00:59 ++++ b/src/protocol/write.rs 2026-04-21 17:00:36 +@@ -6,6 +6,7 @@ + pub id: u32, + pub handle: String, + pub offset: u64, ++ #[serde(with = "serde_bytes")] + pub data: Vec, + } + +diff -urN a/src/ser.rs src/ser.rs +--- a/src/ser.rs 2026-04-21 17:00:59 ++++ b/src/ser.rs 2026-04-21 17:00:36 +@@ -103,8 +103,10 @@ + Ok(()) + } + +- fn serialize_bytes(self, _v: &[u8]) -> Result { +- Err(Error::BadMessage("bytes not supported".to_owned())) ++ fn serialize_bytes(self, v: &[u8]) -> Result { ++ self.output.put_u32(v.len() as u32); ++ self.output.put_slice(v); ++ Ok(()) + } + + fn serialize_none(self) -> Result { +diff -urN a/src/utils.rs src/utils.rs +--- a/src/utils.rs 2026-04-21 17:00:59 ++++ b/src/utils.rs 2026-04-21 17:04:11 +@@ -9,9 +9,7 @@ + DateTime::::from(time).timestamp() as u32 + } + +-pub async fn read_packet( +- stream: &mut S, +-) -> Result { ++pub async fn read_packet(stream: &mut S) -> Result { + let length = stream.read_u32().await?; + + let mut buf = vec![0; length as usize]; diff --git a/crates/bssh-russh-sftp/src/buf.rs b/crates/bssh-russh-sftp/src/buf.rs new file mode 100644 index 00000000..3006f979 --- /dev/null +++ b/crates/bssh-russh-sftp/src/buf.rs @@ -0,0 +1,27 @@ +use bytes::Buf; + +use crate::error::Error; + +pub trait TryBuf: Buf { + fn try_get_bytes(&mut self) -> Result, Error>; + fn try_get_string(&mut self) -> Result; +} + +impl TryBuf for T { + fn try_get_bytes(&mut self) -> Result, Error> { + let len = self + .try_get_u32() + .map_err(|e| Error::UnexpectedBehavior(e.to_string()))? as usize; + if self.remaining() < len { + return Err(Error::BadMessage("no remaining for vec".to_owned())); + } + + Ok(self.copy_to_bytes(len).to_vec()) + } + + fn try_get_string(&mut self) -> Result { + let bytes = self.try_get_bytes()?; + //String::from_utf8(bytes).map_err(|_| Error::BadMessage("unable to parse str".to_owned())) + Ok(String::from_utf8_lossy(&bytes).into()) + } +} diff --git a/crates/bssh-russh-sftp/src/client/error.rs b/crates/bssh-russh-sftp/src/client/error.rs new file mode 100644 index 00000000..09e1c10a --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/error.rs @@ -0,0 +1,67 @@ +use std::io; +use thiserror::Error; +use tokio::sync::mpsc::error::SendError as MpscSendError; +use tokio::sync::oneshot::error::RecvError as OneshotRecvError; +use tokio::time::error::Elapsed as TimeElapsed; + +use crate::error; +use crate::protocol::Status; + +/// Enum for client errors +#[derive(Debug, Clone, Error)] +pub enum Error { + /// Contains an error status packet + #[error("{}: {}", .0.status_code, .0.error_message)] + Status(Status), + /// Any errors related to I/O + #[error("I/O: {0}")] + IO(String), + /// Time limit for receiving response packet exceeded + #[error("Timeout")] + Timeout, + /// Occurs due to exceeding the limits set by the `limits@openssh.com` extension + #[error("Limit exceeded: {0}")] + Limited(String), + /// Occurs when an unexpected packet is sent + #[error("Unexpected packet")] + UnexpectedPacket, + /// Occurs when unexpected server behavior differs from the protocol specifition + #[error("{0}")] + UnexpectedBehavior(String), +} + +impl From for Error { + fn from(status: Status) -> Self { + Self::Status(status) + } +} + +impl From for Error { + fn from(error: io::Error) -> Self { + Self::IO(error.to_string()) + } +} + +impl From> for Error { + fn from(err: MpscSendError) -> Self { + Self::UnexpectedBehavior(format!("SendError: {}", err)) + } +} + +impl From for Error { + fn from(err: OneshotRecvError) -> Self { + Self::UnexpectedBehavior(format!("RecvError: {}", err)) + } +} + +impl From for Error { + fn from(_: TimeElapsed) -> Self { + Self::Timeout + } +} + +impl From for Error { + fn from(error: error::Error) -> Self { + Self::UnexpectedBehavior(error.to_string()) + } +} diff --git a/crates/bssh-russh-sftp/src/client/fs/dir.rs b/crates/bssh-russh-sftp/src/client/fs/dir.rs new file mode 100644 index 00000000..7d71de1c --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/fs/dir.rs @@ -0,0 +1,48 @@ +use std::collections::VecDeque; + +use super::Metadata; +use crate::protocol::FileType; + +/// Entries returned by the [`ReadDir`] iterator. +#[derive(Debug)] +pub struct DirEntry { + file: String, + metadata: Metadata, +} + +impl DirEntry { + /// Returns the file name for the file that this entry points at. + pub fn file_name(&self) -> String { + self.file.to_owned() + } + + /// Returns the file type for the file that this entry points at. + pub fn file_type(&self) -> FileType { + self.metadata.file_type() + } + + /// Returns the metadata for the file that this entry points at. + pub fn metadata(&self) -> Metadata { + self.metadata.to_owned() + } +} + +/// Iterator over the entries in a remote directory. +pub struct ReadDir { + pub(crate) entries: VecDeque<(String, Metadata)>, +} + +impl Iterator for ReadDir { + type Item = DirEntry; + + fn next(&mut self) -> Option { + match self.entries.pop_front() { + None => None, + Some(entry) if entry.0 == "." || entry.0 == ".." => self.next(), + Some(entry) => Some(DirEntry { + file: entry.0, + metadata: entry.1, + }), + } + } +} diff --git a/crates/bssh-russh-sftp/src/client/fs/file.rs b/crates/bssh-russh-sftp/src/client/fs/file.rs new file mode 100644 index 00000000..4b1cdcd2 --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/fs/file.rs @@ -0,0 +1,339 @@ +use std::{ + future::Future, + io::{self, SeekFrom}, + pin::Pin, + sync::Arc, + task::{ready, Context, Poll}, +}; +use tokio::{ + io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}, + runtime::Handle, +}; + +use super::Metadata; +use crate::{ + client::{error::Error, rawsession::SftpResult, session::Extensions, RawSftpSession}, + protocol::StatusCode, +}; + +type StateFn = Option> + Send + Sync + 'static>>>; + +const MAX_READ_LENGTH: u64 = 261120; +const MAX_WRITE_LENGTH: u64 = 261120; + +struct FileState { + f_read: StateFn>>, + f_seek: StateFn, + f_write: StateFn, + f_flush: StateFn<()>, + f_shutdown: StateFn<()>, +} + +/// Provides high-level methods for interaction with a remote file. +/// +/// In order to properly close the handle, [`shutdown`] on a file should be called. +/// Also implement [`AsyncSeek`] and other async i/o implementations. +/// +/// # Weakness +/// Using [`SeekFrom::End`] is costly and time-consuming because we need to +/// request the actual file size from the remote server. +pub struct File { + session: Arc, + handle: String, + state: FileState, + pos: u64, + closed: bool, + extensions: Arc, +} + +impl File { + pub(crate) fn new( + session: Arc, + handle: String, + extensions: Arc, + ) -> Self { + Self { + session, + handle, + state: FileState { + f_read: None, + f_seek: None, + f_write: None, + f_flush: None, + f_shutdown: None, + }, + pos: 0, + closed: false, + extensions, + } + } + + /// Queries metadata about the remote file. + pub async fn metadata(&self) -> SftpResult { + Ok(self.session.fstat(self.handle.as_str()).await?.attrs) + } + + /// Sets metadata for a remote file. + pub async fn set_metadata(&self, metadata: Metadata) -> SftpResult<()> { + self.session + .fsetstat(self.handle.as_str(), metadata) + .await + .map(|_| ()) + } + + /// Attempts to sync all data. + /// + /// If the server does not support `fsync@openssh.com` sending the request will + /// be omitted, but will still pseudo-successfully + pub async fn sync_all(&self) -> SftpResult<()> { + if !self.extensions.fsync { + return Ok(()); + } + + self.session.fsync(self.handle.as_str()).await.map(|_| ()) + } +} + +impl Drop for File { + fn drop(&mut self) { + if self.closed { + return; + } + + if let Ok(handle) = Handle::try_current() { + let session = self.session.clone(); + let file_handle = self.handle.clone(); + + handle.spawn(async move { + let _ = session.close(file_handle).await; + }); + } + } +} + +impl AsyncRead for File { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let poll = Pin::new(match self.state.f_read.as_mut() { + Some(f) => f, + None => { + let session = self.session.clone(); + let max_read_len = self + .extensions + .limits + .as_ref() + .and_then(|l| l.read_len) + .unwrap_or(MAX_READ_LENGTH) as usize; + + let file_handle = self.handle.clone(); + + let offset = self.pos; + let len = if buf.remaining() > max_read_len { + max_read_len + } else { + buf.remaining() + }; + + self.state.f_read.get_or_insert(Box::pin(async move { + let result = session.read(file_handle, offset, len as u32).await; + + match result { + Ok(data) => Ok(Some(data.data)), + Err(Error::Status(status)) if status.status_code == StatusCode::Eof => { + Ok(None) + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + } + })) + } + }) + .poll(cx); + + if poll.is_ready() { + self.state.f_read = None; + } + + match poll { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(None)) => Poll::Ready(Ok(())), + Poll::Ready(Ok(Some(data))) => { + self.pos += data.len() as u64; + buf.put_slice(&data[..]); + Poll::Ready(Ok(())) + } + } + } +} + +impl AsyncSeek for File { + fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { + match self.state.f_seek { + Some(_) => Err(io::Error::new( + io::ErrorKind::Other, + "other file operation is pending, call poll_complete before start_seek", + )), + None => { + let session = self.session.clone(); + let file_handle = self.handle.clone(); + let cur_pos = self.pos as i64; + + self.state.f_seek = Some(Box::pin(async move { + let new_pos = match position { + SeekFrom::Start(pos) => pos as i64, + SeekFrom::Current(pos) => cur_pos + pos, + SeekFrom::End(pos) => { + let result = session + .fstat(file_handle) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + match result.attrs.size { + Some(size) => size as i64 + pos, + None => { + return Err(io::Error::new( + io::ErrorKind::Other, + "file size unknown", + )) + } + } + } + }; + + if new_pos < 0 { + return Err(io::Error::new( + io::ErrorKind::Other, + "cannot move file pointer before the beginning", + )); + } + + Ok(new_pos as u64) + })); + + Ok(()) + } + } + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.state.f_seek.as_mut() { + None => Poll::Ready(Ok(self.pos)), + Some(f) => { + self.pos = ready!(Pin::new(f).poll(cx))?; + self.state.f_seek = None; + Poll::Ready(Ok(self.pos)) + } + } + } +} + +impl AsyncWrite for File { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let poll = Pin::new(match self.state.f_write.as_mut() { + Some(f) => f, + None => { + let session = self.session.clone(); + let max_write_len = self + .extensions + .limits + .as_ref() + .and_then(|l| l.write_len) + .unwrap_or(MAX_WRITE_LENGTH) as usize; + + let file_handle = self.handle.clone(); + let data = buf.to_vec(); + + let offset = self.pos; + let len = if data.len() > max_write_len { + max_write_len + } else { + data.len() + }; + + self.state.f_write.get_or_insert(Box::pin(async move { + session + .write(file_handle, offset, data[..len].to_vec()) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + Ok(len) + })) + } + }) + .poll(cx); + + if poll.is_ready() { + self.state.f_write = None; + } + + if let Poll::Ready(Ok(len)) = poll { + self.pos += len as u64; + } + + poll + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if !self.extensions.fsync { + return Poll::Ready(Ok(())); + } + + let poll = Pin::new(match self.state.f_flush.as_mut() { + Some(f) => f, + None => { + let session = self.session.clone(); + let file_handle = self.handle.clone(); + + self.state.f_flush.get_or_insert(Box::pin(async move { + session + .fsync(file_handle) + .await + .map(|_| ()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) + })) + } + }) + .poll(cx); + + if poll.is_ready() { + self.state.f_flush = None; + } + + poll + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let poll = Pin::new(match self.state.f_shutdown.as_mut() { + Some(f) => f, + None => { + let session = self.session.clone(); + let file_handle = self.handle.clone(); + + self.state.f_shutdown.get_or_insert(Box::pin(async move { + session + .close(file_handle) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + Ok(()) + })) + } + }) + .poll(cx); + + if poll.is_ready() { + self.state.f_shutdown = None; + self.closed = true; + } + + poll + } +} diff --git a/crates/bssh-russh-sftp/src/client/fs/mod.rs b/crates/bssh-russh-sftp/src/client/fs/mod.rs new file mode 100644 index 00000000..73ff87ab --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/fs/mod.rs @@ -0,0 +1,13 @@ +//! Filesystem manipulation operations. +//! +//! This module contains methods for interacting with remote entities on high-level. +//! The architecture is quite simple because it is built as an analogue of [`std::fs`] + +mod dir; +mod file; + +use crate::protocol::FileAttributes; + +pub use dir::{DirEntry, ReadDir}; +pub use file::File; +pub type Metadata = FileAttributes; diff --git a/crates/bssh-russh-sftp/src/client/handler.rs b/crates/bssh-russh-sftp/src/client/handler.rs new file mode 100644 index 00000000..ca7cdcee --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/handler.rs @@ -0,0 +1,58 @@ +use std::future::Future; + +use super::error::Error; +use crate::protocol::{Attrs, Data, ExtendedReply, Handle, Name, Status, Version}; + +/// Client stream handler. This is `async_trait` +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +pub trait Handler: Sized { + type Error: Into; + + /// Called on SSH_FXP_VERSION. + #[allow(unused_variables)] + fn version( + &mut self, + version: Version, + ) -> impl Future> + Send { + async { Ok(()) } + } + + /// Called on SSH_FXP_STATUS. + #[allow(unused_variables)] + fn status(&mut self, status: Status) -> impl Future> + Send { + async { Ok(()) } + } + + /// Called on SSH_FXP_HANDLE. + #[allow(unused_variables)] + fn handle(&mut self, handle: Handle) -> impl Future> + Send { + async { Ok(()) } + } + + /// Called on SSH_FXP_DATA. + #[allow(unused_variables)] + fn data(&mut self, data: Data) -> impl Future> + Send { + async { Ok(()) } + } + + /// Called on SSH_FXP_NAME. + #[allow(unused_variables)] + fn name(&mut self, name: Name) -> impl Future> + Send { + async { Ok(()) } + } + + /// Called on SSH_FXP_ATTRS. + #[allow(unused_variables)] + fn attrs(&mut self, attrs: Attrs) -> impl Future> + Send { + async { Ok(()) } + } + + /// Called on SSH_EXTENDED_REPLY. + #[allow(unused_variables)] + fn extended_reply( + &mut self, + reply: ExtendedReply, + ) -> impl Future> + Send { + async { Ok(()) } + } +} diff --git a/crates/bssh-russh-sftp/src/client/mod.rs b/crates/bssh-russh-sftp/src/client/mod.rs new file mode 100644 index 00000000..6b4421a3 --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/mod.rs @@ -0,0 +1,109 @@ +pub mod error; +pub mod fs; +mod handler; +pub mod rawsession; +mod session; + +pub use handler::Handler; +pub use rawsession::RawSftpSession; +pub use session::SftpSession; + +use bytes::Bytes; +use tokio::{ + io::{self, AsyncRead, AsyncWrite, AsyncWriteExt}, + select, + sync::mpsc, +}; +use tokio_util::sync::CancellationToken; + +use crate::{error::Error, protocol::Packet, utils::read_packet}; + +macro_rules! into_wrap { + ($handler:expr) => { + match $handler.await { + Err(error) => Err(error.into()), + Ok(()) => Ok(()), + } + }; +} + +async fn execute_handler(bytes: &mut Bytes, handler: &mut H) -> Result<(), error::Error> +where + H: Handler + Send, +{ + match Packet::try_from(bytes)? { + Packet::Version(p) => into_wrap!(handler.version(p)), + Packet::Status(p) => into_wrap!(handler.status(p)), + Packet::Handle(p) => into_wrap!(handler.handle(p)), + Packet::Data(p) => into_wrap!(handler.data(p)), + Packet::Name(p) => into_wrap!(handler.name(p)), + Packet::Attrs(p) => into_wrap!(handler.attrs(p)), + Packet::ExtendedReply(p) => into_wrap!(handler.extended_reply(p)), + _ => Err(error::Error::UnexpectedBehavior( + "A packet was received that could not be processed.".to_owned(), + )), + } +} + +async fn process_handler(stream: &mut S, handler: &mut H) -> Result<(), Error> +where + S: AsyncRead + Unpin, + H: Handler + Send, +{ + let mut bytes = read_packet(stream).await?; + Ok(execute_handler(&mut bytes, handler).await?) +} + +/// Run processing stream as SFTP client. Is a simple handler of incoming +/// and outgoing packets. Can be used for non-standard implementations +pub fn run(stream: S, mut handler: H) -> mpsc::UnboundedSender +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + H: Handler + Send + 'static, +{ + let (tx, mut rx) = mpsc::unbounded_channel::(); + let (mut rd, mut wr) = io::split(stream); + + let rc = CancellationToken::new(); + let wc = rc.clone(); + { + tokio::spawn(async move { + loop { + select! { + result = process_handler(&mut rd, &mut handler) => { + match result { + Err(Error::UnexpectedEof) => break, + Err(err) => warn!("{}", err), + Ok(_) => (), + } + }, + _ = rc.cancelled() => break, + } + } + + rc.cancel(); + debug!("read half of sftp stream ended"); + }); + } + + tokio::spawn(async move { + loop { + select! { + Some(data) = rx.recv() => { + if data.is_empty() { + let _ = wr.shutdown().await; + break; + } + + let _ = wr.write_all(&data[..]).await; + }, + _ = wc.cancelled() => break, + } + } + + wc.cancel(); + debug!("write half of sftp stream ended"); + }); + + tx +} diff --git a/crates/bssh-russh-sftp/src/client/rawsession.rs b/crates/bssh-russh-sftp/src/client/rawsession.rs new file mode 100644 index 00000000..7457f76a --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/rawsession.rs @@ -0,0 +1,723 @@ +use bytes::Bytes; +use flurry::HashMap; +use std::{ + sync::{ + atomic::{AtomicU32, AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{mpsc, RwLock}, + time, +}; + +use super::{error::Error, run, Handler}; +use crate::{ + de, + extensions::{ + self, FsyncExtension, HardlinkExtension, LimitsExtension, Statvfs, StatvfsExtension, + }, + protocol::{ + Attrs, Close, Data, Extended, ExtendedReply, FSetStat, FileAttributes, Fstat, Handle, Init, + Lstat, MkDir, Name, Open, OpenDir, OpenFlags, Packet, Read, ReadDir, ReadLink, RealPath, + Remove, Rename, RmDir, SetStat, Stat, Status, StatusCode, Symlink, Version, Write, + }, +}; + +pub type SftpResult = Result; +type SharedRequests = HashMap, mpsc::Sender>>; + +pub(crate) struct SessionInner { + version: Option, + requests: Arc, +} + +impl SessionInner { + pub async fn reply(&mut self, id: Option, packet: Packet) -> SftpResult<()> { + if let Some(sender) = self.requests.pin().remove(&id) { + let validate = if id.is_some() && self.version.is_none() { + Err(Error::UnexpectedPacket) + } else if id.is_none() && self.version.is_some() { + Err(Error::UnexpectedBehavior("Duplicate version".to_owned())) + } else { + Ok(()) + }; + + sender + .try_send(validate.clone().map(|_| packet)) + .map_err(|e| Error::UnexpectedBehavior(e.to_string()))?; + + return validate; + } + + Err(Error::UnexpectedBehavior(format!( + "Packet {:?} for unknown recipient", + id + ))) + } +} + +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl Handler for SessionInner { + type Error = Error; + + async fn version(&mut self, packet: Version) -> Result<(), Self::Error> { + let version = packet.version; + self.reply(None, packet.into()).await?; + self.version = Some(version); + Ok(()) + } + + async fn name(&mut self, name: Name) -> Result<(), Self::Error> { + self.reply(Some(name.id), name.into()).await + } + + async fn status(&mut self, status: Status) -> Result<(), Self::Error> { + self.reply(Some(status.id), status.into()).await + } + + async fn handle(&mut self, handle: Handle) -> Result<(), Self::Error> { + self.reply(Some(handle.id), handle.into()).await + } + + async fn data(&mut self, data: Data) -> Result<(), Self::Error> { + self.reply(Some(data.id), data.into()).await + } + + async fn attrs(&mut self, attrs: Attrs) -> Result<(), Self::Error> { + self.reply(Some(attrs.id), attrs.into()).await + } + + async fn extended_reply(&mut self, reply: ExtendedReply) -> Result<(), Self::Error> { + self.reply(Some(reply.id), reply.into()).await + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct Limits { + // todo: implement + //pub packet_len: Option, + pub read_len: Option, + pub write_len: Option, + pub open_handles: Option, +} + +impl From for Limits { + fn from(limits: LimitsExtension) -> Self { + Self { + read_len: if limits.max_read_len > 0 { + Some(limits.max_read_len) + } else { + None + }, + write_len: if limits.max_write_len > 0 { + Some(limits.max_write_len) + } else { + None + }, + open_handles: if limits.max_open_handles > 0 { + Some(limits.max_open_handles) + } else { + None + }, + } + } +} + +pub(crate) struct Options { + timeout: RwLock, + limits: Arc, +} + +/// Implements raw work with the protocol in request-response format. +/// If the server returns a `Status` packet and it has the code Ok +/// then the packet is returned as Ok in other error cases +/// the packet is stored as Err. +pub struct RawSftpSession { + tx: mpsc::UnboundedSender, + requests: Arc, + next_req_id: AtomicU32, + handles: AtomicU64, + options: Options, +} + +macro_rules! into_with_status { + ($result:ident, $packet:ident) => { + match $result { + Packet::$packet(p) => Ok(p), + Packet::Status(p) => Err(p.into()), + _ => Err(Error::UnexpectedPacket), + } + }; +} + +macro_rules! into_status { + ($result:ident) => { + match $result { + Packet::Status(status) if status.status_code == StatusCode::Ok => Ok(status), + Packet::Status(status) => Err(status.into()), + _ => Err(Error::UnexpectedPacket), + } + }; +} + +impl RawSftpSession { + pub fn new(stream: S) -> Self + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let req_map = Arc::new(HashMap::new()); + let inner = SessionInner { + version: None, + requests: req_map.clone(), + }; + + Self { + tx: run(stream, inner), + requests: req_map, + next_req_id: AtomicU32::new(1), + handles: AtomicU64::new(0), + options: Options { + timeout: RwLock::new(10), + limits: Arc::new(Limits::default()), + }, + } + } + + /// Set the maximum response time in seconds. + /// Default: 10 seconds + pub async fn set_timeout(&self, secs: u64) { + *self.options.timeout.write().await = secs; + } + + /// Setting limits. For the `limits@openssh.com` extension + pub fn set_limits(&mut self, limits: Arc) { + self.options.limits = limits; + } + + async fn send(&self, id: Option, packet: Packet) -> SftpResult { + if self.tx.is_closed() { + return Err(Error::UnexpectedBehavior("session closed".into())); + } + + let (tx, mut rx) = mpsc::channel(1); + + self.requests.pin().insert(id, tx); + self.tx.send(Bytes::try_from(packet)?)?; + + let timeout = *self.options.timeout.read().await; + + match time::timeout(Duration::from_secs(timeout), rx.recv()).await { + Ok(Some(result)) => result, + Ok(None) => { + self.requests.pin().remove(&id); + Err(Error::UnexpectedBehavior("recv none message".into())) + } + Err(error) => { + self.requests.pin().remove(&id); + Err(error.into()) + } + } + } + + fn use_next_id(&self) -> u32 { + self.next_req_id.fetch_add(1, Ordering::SeqCst) + } + + /// Closes the inner channel stream. Called by [`Drop`] + pub fn close_session(&self) -> SftpResult<()> { + if self.tx.is_closed() { + return Ok(()); + } + + Ok(self.tx.send(Bytes::new())?) + } + + pub async fn init(&self) -> SftpResult { + let result = self.send(None, Init::default().into()).await?; + if let Packet::Version(version) = result { + Ok(version) + } else { + Err(Error::UnexpectedPacket) + } + } + + pub async fn open>( + &self, + filename: T, + flags: OpenFlags, + attrs: FileAttributes, + ) -> SftpResult { + if self + .options + .limits + .open_handles + .is_some_and(|h| self.handles.load(Ordering::SeqCst) >= h) + { + return Err(Error::Limited("handle limit reached".to_owned())); + } + + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Open { + id, + filename: filename.into(), + pflags: flags, + attrs, + } + .into(), + ) + .await?; + + if let Packet::Handle(_) = result { + self.handles.fetch_add(1, Ordering::SeqCst); + } + + into_with_status!(result, Handle) + } + + pub async fn close>(&self, handle: H) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Close { + id, + handle: handle.into(), + } + .into(), + ) + .await?; + + if let Packet::Status(status) = &result { + if status.status_code == StatusCode::Ok + && self + .handles + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |h| { + if h > 0 { + Some(h - 1) + } else { + None + } + }) + .is_err() + { + warn!("attempt to close more handles than exist"); + } + } + + into_status!(result) + } + + pub async fn read>( + &self, + handle: H, + offset: u64, + len: u32, + ) -> SftpResult { + if self.options.limits.read_len.is_some_and(|r| len as u64 > r) { + return Err(Error::Limited("read limit reached".to_owned())); + } + + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Read { + id, + handle: handle.into(), + offset, + len, + } + .into(), + ) + .await?; + + into_with_status!(result, Data) + } + + pub async fn write>( + &self, + handle: H, + offset: u64, + data: Vec, + ) -> SftpResult { + if self + .options + .limits + .write_len + .is_some_and(|w| data.len() as u64 > w) + { + return Err(Error::Limited("write limit reached".to_owned())); + } + + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Write { + id, + handle: handle.into(), + offset, + data, + } + .into(), + ) + .await?; + + into_status!(result) + } + + pub async fn lstat>(&self, path: P) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Lstat { + id, + path: path.into(), + } + .into(), + ) + .await?; + + into_with_status!(result, Attrs) + } + + pub async fn fstat>(&self, handle: H) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Fstat { + id, + handle: handle.into(), + } + .into(), + ) + .await?; + + into_with_status!(result, Attrs) + } + + pub async fn setstat>( + &self, + path: P, + attrs: FileAttributes, + ) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + SetStat { + id, + path: path.into(), + attrs, + } + .into(), + ) + .await?; + + into_status!(result) + } + + pub async fn fsetstat>( + &self, + handle: H, + attrs: FileAttributes, + ) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + FSetStat { + id, + handle: handle.into(), + attrs, + } + .into(), + ) + .await?; + + into_status!(result) + } + + pub async fn opendir>(&self, path: P) -> SftpResult { + if self + .options + .limits + .open_handles + .is_some_and(|h| self.handles.load(Ordering::SeqCst) >= h) + { + return Err(Error::Limited("Handle limit reached".to_owned())); + } + + let id = self.use_next_id(); + let result = self + .send( + Some(id), + OpenDir { + id, + path: path.into(), + } + .into(), + ) + .await?; + + if let Packet::Handle(_) = result { + self.handles.fetch_add(1, Ordering::SeqCst); + } + + into_with_status!(result, Handle) + } + + pub async fn readdir>(&self, handle: H) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + ReadDir { + id, + handle: handle.into(), + } + .into(), + ) + .await?; + + into_with_status!(result, Name) + } + + pub async fn remove>(&self, filename: T) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Remove { + id, + filename: filename.into(), + } + .into(), + ) + .await?; + + into_status!(result) + } + + pub async fn mkdir>( + &self, + path: P, + attrs: FileAttributes, + ) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + MkDir { + id, + path: path.into(), + attrs, + } + .into(), + ) + .await?; + + into_status!(result) + } + + pub async fn rmdir>(&self, path: P) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + RmDir { + id, + path: path.into(), + } + .into(), + ) + .await?; + + into_status!(result) + } + + pub async fn realpath>(&self, path: P) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + RealPath { + id, + path: path.into(), + } + .into(), + ) + .await?; + + into_with_status!(result, Name) + } + + pub async fn stat>(&self, path: P) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Stat { + id, + path: path.into(), + } + .into(), + ) + .await?; + + into_with_status!(result, Attrs) + } + + pub async fn rename(&self, oldpath: O, newpath: N) -> SftpResult + where + O: Into, + N: Into, + { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Rename { + id, + oldpath: oldpath.into(), + newpath: newpath.into(), + } + .into(), + ) + .await?; + + into_status!(result) + } + + pub async fn readlink>(&self, path: P) -> SftpResult { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + ReadLink { + id, + path: path.into(), + } + .into(), + ) + .await?; + + into_with_status!(result, Name) + } + + pub async fn symlink(&self, path: P, target: T) -> SftpResult + where + P: Into, + T: Into, + { + let id = self.use_next_id(); + let result = self + .send( + Some(id), + Symlink { + id, + linkpath: path.into(), + targetpath: target.into(), + } + .into(), + ) + .await?; + + into_status!(result) + } + + /// Equivalent to `SSH_FXP_EXTENDED`. Allows protocol expansion. + /// The extension can return any packet, so it's not specific + pub async fn extended>(&self, request: R, data: Vec) -> SftpResult { + let id = self.use_next_id(); + self.send( + Some(id), + Extended { + id, + request: request.into(), + data, + } + .into(), + ) + .await + } + + pub async fn limits(&self) -> SftpResult { + match self.extended(extensions::LIMITS, vec![]).await? { + Packet::ExtendedReply(reply) => { + Ok(de::from_bytes::(&mut reply.data.into())?) + } + Packet::Status(status) if status.status_code != StatusCode::Ok => { + Err(Error::Status(status)) + } + _ => Err(Error::UnexpectedPacket), + } + } + + pub async fn hardlink(&self, oldpath: O, newpath: N) -> SftpResult + where + O: Into, + N: Into, + { + let result = self + .extended( + extensions::HARDLINK, + HardlinkExtension { + oldpath: oldpath.into(), + newpath: newpath.into(), + } + .try_into()?, + ) + .await?; + + into_status!(result) + } + + pub async fn fsync>(&self, handle: H) -> SftpResult { + let result = self + .extended( + extensions::FSYNC, + FsyncExtension { + handle: handle.into(), + } + .try_into()?, + ) + .await?; + + into_status!(result) + } + + pub async fn statvfs

(&self, path: P) -> SftpResult + where + P: Into, + { + let result = self + .extended( + extensions::STATVFS, + StatvfsExtension { path: path.into() }.try_into()?, + ) + .await?; + + match result { + Packet::ExtendedReply(reply) => Ok(de::from_bytes::(&mut reply.data.into())?), + Packet::Status(status) if status.status_code != StatusCode::Ok => { + Err(Error::Status(status)) + } + _ => Err(Error::UnexpectedPacket), + } + } +} + +impl Drop for RawSftpSession { + fn drop(&mut self) { + let _ = self.close_session(); + } +} diff --git a/crates/bssh-russh-sftp/src/client/session.rs b/crates/bssh-russh-sftp/src/client/session.rs new file mode 100644 index 00000000..6d195b08 --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/session.rs @@ -0,0 +1,284 @@ +use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use super::{ + error::Error, + fs::{File, Metadata, ReadDir}, + rawsession::{Limits, SftpResult}, + RawSftpSession, +}; +use crate::{ + extensions::{self, Statvfs}, + protocol::{FileAttributes, OpenFlags, StatusCode}, +}; + +#[derive(Debug, Default)] +pub(crate) struct Extensions { + pub hardlink: bool, + pub fsync: bool, + pub statvfs: bool, + pub limits: Option>, +} + +/// High-level SFTP implementation for easy interaction with a remote file system. +/// Contains most methods similar to the native [filesystem](std::fs) +pub struct SftpSession { + session: Arc, + extensions: Arc, +} + +impl SftpSession { + /// Creates a new session by initializing the protocol and extensions + pub async fn new(stream: S) -> SftpResult + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + Self::new_opts(stream, None).await + } + + /// Creates a new session with timeout opt before the first request + pub async fn new_opts(stream: S, timeout: Option) -> SftpResult + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let mut session = RawSftpSession::new(stream); + + // todo: for new options we need builder + if let Some(timeout) = timeout { + session.set_timeout(timeout).await; + } + + let version = session.init().await?; + let mut extensions = Extensions { + hardlink: version + .extensions + .get(extensions::HARDLINK) + .is_some_and(|e| e == "1"), + fsync: version + .extensions + .get(extensions::FSYNC) + .is_some_and(|e| e == "1"), + statvfs: version + .extensions + .get(extensions::STATVFS) + .is_some_and(|e| e == "2"), + limits: None, + }; + + if version + .extensions + .get(extensions::LIMITS) + .is_some_and(|e| e == "1") + { + let limits = session.limits().await?; + let limits = Arc::new(Limits::from(limits)); + + session.set_limits(limits.clone()); + extensions.limits = Some(limits); + } + + Ok(Self { + session: Arc::new(session), + extensions: Arc::new(extensions), + }) + } + + /// Set the maximum response time in seconds. + /// Default: 10 seconds + pub async fn set_timeout(&self, secs: u64) { + self.session.set_timeout(secs).await; + } + + /// Closes the inner channel stream. + pub async fn close(&self) -> SftpResult<()> { + self.session.close_session() + } + + /// Attempts to open a file in read-only mode. + pub async fn open>(&self, filename: T) -> SftpResult { + self.open_with_flags(filename, OpenFlags::READ).await + } + + /// Opens a file in write-only mode. + /// + /// This function will create a file if it does not exist, and will truncate it if it does. + pub async fn create>(&self, filename: T) -> SftpResult { + self.open_with_flags( + filename, + OpenFlags::CREATE | OpenFlags::TRUNCATE | OpenFlags::WRITE, + ) + .await + } + + /// Attempts to open or create the file in the specified mode + pub async fn open_with_flags>( + &self, + filename: T, + flags: OpenFlags, + ) -> SftpResult { + self.open_with_flags_and_attributes(filename, flags, FileAttributes::empty()) + .await + } + + /// Attempts to open or create the file in the specified mode and with specified file attributes + pub async fn open_with_flags_and_attributes>( + &self, + filename: T, + flags: OpenFlags, + attributes: FileAttributes, + ) -> SftpResult { + let handle = self.session.open(filename, flags, attributes).await?.handle; + Ok(File::new( + self.session.clone(), + handle, + self.extensions.clone(), + )) + } + + /// Requests the remote party for the absolute from the relative path. + pub async fn canonicalize>(&self, path: T) -> SftpResult { + let name = self.session.realpath(path).await?; + match name.files.first() { + Some(file) => Ok(file.filename.to_owned()), + None => Err(Error::UnexpectedBehavior("no file".to_owned())), + } + } + + /// Creates a new empty directory. + pub async fn create_dir>(&self, path: T) -> SftpResult<()> { + self.session + .mkdir(path, FileAttributes::empty()) + .await + .map(|_| ()) + } + + /// Reads the contents of a file located at the specified path to the end. + pub async fn read>(&self, path: P) -> SftpResult> { + let mut file = self.open(path).await?; + let mut buffer = Vec::new(); + + file.read_to_end(&mut buffer).await?; + + Ok(buffer) + } + + /// Writes the contents to a file whose path is specified. + pub async fn write>(&self, path: P, data: &[u8]) -> SftpResult<()> { + let mut file = self.open_with_flags(path, OpenFlags::WRITE).await?; + file.write_all(data).await?; + Ok(()) + } + + /// Checks a file or folder exists at the specified path + pub async fn try_exists>(&self, path: P) -> SftpResult { + match self.metadata(path).await { + Ok(_) => Ok(true), + Err(Error::Status(status)) if status.status_code == StatusCode::NoSuchFile => Ok(false), + Err(error) => Err(error), + } + } + + /// Returns an iterator over the entries within a directory. + pub async fn read_dir>(&self, path: P) -> SftpResult { + let mut files = vec![]; + let handle = self.session.opendir(path).await?.handle; + + loop { + match self.session.readdir(handle.as_str()).await { + Ok(name) => { + files = name + .files + .into_iter() + .map(|f| (f.filename, f.attrs)) + .chain(files.into_iter()) + .collect(); + } + Err(Error::Status(status)) if status.status_code == StatusCode::Eof => break, + Err(err) => return Err(err), + } + } + + self.session.close(handle).await?; + + Ok(ReadDir { + entries: files.into(), + }) + } + + /// Reads a symbolic link, returning the file that the link points to. + pub async fn read_link>(&self, path: P) -> SftpResult { + let name = self.session.readlink(path).await?; + match name.files.first() { + Some(file) => Ok(file.filename.to_owned()), + None => Err(Error::UnexpectedBehavior("no file".to_owned())), + } + } + + /// Removes the specified folder. + pub async fn remove_dir>(&self, path: P) -> SftpResult<()> { + self.session.rmdir(path).await.map(|_| ()) + } + + /// Removes the specified file. + pub async fn remove_file>(&self, filename: T) -> SftpResult<()> { + self.session.remove(filename).await.map(|_| ()) + } + + /// Rename a file or directory to a new name. + pub async fn rename(&self, oldpath: O, newpath: N) -> SftpResult<()> + where + O: Into, + N: Into, + { + self.session.rename(oldpath, newpath).await.map(|_| ()) + } + + /// Creates a symlink of the specified target. + pub async fn symlink(&self, path: P, target: T) -> SftpResult<()> + where + P: Into, + T: Into, + { + self.session.symlink(path, target).await.map(|_| ()) + } + + /// Queries metadata about the remote file. + pub async fn metadata>(&self, path: P) -> SftpResult { + Ok(self.session.stat(path).await?.attrs) + } + + /// Sets metadata for a remote file. + pub async fn set_metadata>( + &self, + path: P, + metadata: Metadata, + ) -> Result<(), Error> { + self.session.setstat(path, metadata).await.map(|_| ()) + } + + pub async fn symlink_metadata>(&self, path: P) -> SftpResult { + Ok(self.session.lstat(path).await?.attrs) + } + + pub async fn hardlink(&self, oldpath: O, newpath: N) -> SftpResult + where + O: Into, + N: Into, + { + if !self.extensions.hardlink { + return Ok(false); + } + + self.session.hardlink(oldpath, newpath).await.map(|_| true) + } + + /// Performs a statvfs on the remote file system path. + /// Returns [`Ok(None)`] if the remote SFTP server does not support `statvfs@openssh.com` extension v2. + pub async fn fs_info>(&self, path: P) -> SftpResult> { + if !self.extensions.statvfs { + return Ok(None); + } + + self.session.statvfs(path).await.map(Some) + } +} diff --git a/crates/bssh-russh-sftp/src/de.rs b/crates/bssh-russh-sftp/src/de.rs new file mode 100644 index 00000000..75b505f7 --- /dev/null +++ b/crates/bssh-russh-sftp/src/de.rs @@ -0,0 +1,395 @@ +use bytes::{Buf, BufMut, Bytes}; +use serde::de::{EnumAccess, IntoDeserializer, MapAccess, SeqAccess, VariantAccess, Visitor}; +use std::fmt; + +use crate::{buf::TryBuf, error::Error}; + +pub struct Deserializer<'a> { + input: &'a mut Bytes, +} + +/// Converting bytes to protocol-compliant type +pub fn from_bytes<'a, T>(bytes: &'a mut Bytes) -> Result +where + T: serde::Deserialize<'a>, +{ + let mut deserializer = Deserializer { input: bytes }; + T::deserialize(&mut deserializer) +} + +/// Deserilization of a [`Vec`] without length. Usually reads until the end byte +/// or end of the packet because the size is unknown. +pub fn data_deserialize<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + struct DataVisitor; + + impl<'de> Visitor<'de> for DataVisitor { + type Value = Vec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("data") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut data = Vec::new(); + while let Some(byte) = seq.next_element::()? { + data.put_u8(byte); + } + Ok(data) + } + } + + deserializer.deserialize_any(DataVisitor) +} + +impl<'de> serde::Deserializer<'de> for &mut Deserializer<'de> { + type Error = Error; + + fn deserialize_any(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + let len = self.input.len(); + visitor.visit_seq(SeqDeserializer { + de: self, + len: Some(len), + }) + } + + fn deserialize_bool(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("bool not supported".to_owned())) + } + + fn deserialize_i8(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("i8 not supported".to_owned())) + } + + fn deserialize_i16(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("i16 not supported".to_owned())) + } + + fn deserialize_i32(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("i32 not supported".to_owned())) + } + + fn deserialize_i64(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("i64 not supported".to_owned())) + } + + fn deserialize_u8(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_u8(self.input.try_get_u8()?) + } + + fn deserialize_u16(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("u16 not supported".to_owned())) + } + + fn deserialize_u32(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_u32(self.input.try_get_u32()?) + } + + fn deserialize_u64(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_u64(self.input.try_get_u64()?) + } + + fn deserialize_f32(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("f32 not supported".to_owned())) + } + + fn deserialize_f64(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("f64 not supported".to_owned())) + } + + fn deserialize_char(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("char not supported".to_owned())) + } + + fn deserialize_str(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_str(&self.input.try_get_string()?) + } + + fn deserialize_string(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_str(visitor) + } + + fn deserialize_bytes(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_bytes(&self.input.try_get_bytes()?) + } + + fn deserialize_byte_buf(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_bytes(visitor) + } + + fn deserialize_option(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("option not supported".to_owned())) + } + + fn deserialize_unit(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_unit() + } + + fn deserialize_unit_struct( + self, + _name: &'static str, + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_unit() + } + + fn deserialize_newtype_struct( + self, + _name: &'static str, + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_newtype_struct(self) + } + + fn deserialize_seq(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + let len = self.input.try_get_u32()? as usize; + visitor.visit_seq(SeqDeserializer { + de: self, + len: Some(len), + }) + } + + fn deserialize_tuple(self, len: usize, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_seq(SeqDeserializer { + de: self, + len: Some(len), + }) + } + + fn deserialize_tuple_struct( + self, + _name: &'static str, + len: usize, + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_tuple(len, visitor) + } + + fn deserialize_map(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_map(MapDeserializer { de: self }) + } + + fn deserialize_struct( + self, + _name: &'static str, + fields: &'static [&'static str], + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_tuple(fields.len(), visitor) + } + + fn deserialize_enum( + self, + _name: &'static str, + _variants: &'static [&'static str], + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_enum(self) + } + + fn deserialize_identifier(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("identifier not supported".to_owned())) + } + + fn deserialize_ignored_any(self, _visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Err(Error::BadMessage("ignored any not supported".to_owned())) + } + + fn is_human_readable(&self) -> bool { + false + } +} + +struct SeqDeserializer<'a, 'de: 'a> { + de: &'a mut Deserializer<'de>, + len: Option, +} + +impl<'de> SeqAccess<'de> for SeqDeserializer<'_, 'de> { + type Error = Error; + + fn next_element_seed(&mut self, seed: T) -> Result, Self::Error> + where + T: serde::de::DeserializeSeed<'de>, + { + if self.len == Some(0) { + return Ok(None); + } + + if let Some(len) = self.len.as_mut() { + *len -= 1; + } + + seed.deserialize(&mut *self.de).map(Some) + } + + fn size_hint(&self) -> Option { + self.len + } +} + +struct MapDeserializer<'a, 'de: 'a> { + de: &'a mut Deserializer<'de>, +} + +impl<'de> MapAccess<'de> for MapDeserializer<'_, 'de> { + type Error = Error; + + fn next_key_seed(&mut self, seed: K) -> Result, Self::Error> + where + K: serde::de::DeserializeSeed<'de>, + { + if self.de.input.remaining() == 0 { + return Ok(None); + } + + seed.deserialize(&mut *self.de).map(Some) + } + + fn next_value_seed(&mut self, seed: V) -> Result + where + V: serde::de::DeserializeSeed<'de>, + { + seed.deserialize(&mut *self.de) + } +} + +impl<'de> VariantAccess<'de> for &mut Deserializer<'de> { + type Error = Error; + + fn unit_variant(self) -> Result<(), Self::Error> { + Ok(()) + } + + fn newtype_variant_seed(self, seed: T) -> Result + where + T: serde::de::DeserializeSeed<'de>, + { + seed.deserialize(self) + } + + fn tuple_variant(self, len: usize, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + use serde::Deserializer; + self.deserialize_tuple(len, visitor) + } + + fn struct_variant( + self, + fields: &'static [&'static str], + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + use crate::serde::Deserializer; + self.deserialize_tuple(fields.len(), visitor) + } +} + +impl<'de> EnumAccess<'de> for &mut Deserializer<'de> { + type Error = Error; + type Variant = Self; + + fn variant_seed(self, seed: V) -> Result<(V::Value, Self::Variant), Self::Error> + where + V: serde::de::DeserializeSeed<'de>, + { + let v = IntoDeserializer::::into_deserializer(self.input.try_get_u32()?); + Ok((seed.deserialize(v)?, self)) + } +} diff --git a/crates/bssh-russh-sftp/src/error.rs b/crates/bssh-russh-sftp/src/error.rs new file mode 100644 index 00000000..2ed8dd3a --- /dev/null +++ b/crates/bssh-russh-sftp/src/error.rs @@ -0,0 +1,64 @@ +use bytes::TryGetError; +use std::{fmt, io}; +use thiserror::Error; + +use crate::client; + +#[derive(Debug, Clone, Error)] +pub enum Error { + #[error("I/O: {0}")] + IO(String), + #[error("Unexpected EOF on stream")] + UnexpectedEof, + #[error("Bad message: {0}")] + BadMessage(String), + #[error("Client error. ({0})")] + Client(String), + #[error("Unexpected behavior: {0}")] + UnexpectedBehavior(String), +} + +impl From for Error { + fn from(error: client::error::Error) -> Self { + Self::Client(error.to_string()) + } +} + +impl From for Error { + fn from(err: io::Error) -> Self { + let kind = err.kind(); + let msg = err.into_inner().map_or("".to_string(), |m| format!("{m}")); + match kind { + io::ErrorKind::UnexpectedEof => Self::UnexpectedEof, + io::ErrorKind::Other if msg == "EOF" => Self::UnexpectedEof, + e => Self::IO(e.to_string()), + } + } +} + +impl From for Error { + fn from(err: TryGetError) -> Self { + Self::BadMessage(format!( + "only {} bytes remaining, but {} requested", + err.available, err.requested + )) + } +} + +impl serde::ser::Error for Error { + fn custom(msg: T) -> Self + where + T: fmt::Display, + { + Self::BadMessage(msg.to_string()) + } +} + +impl serde::de::Error for Error { + fn custom(msg: T) -> Self + where + T: fmt::Display, + { + Self::BadMessage(msg.to_string()) + } +} diff --git a/crates/bssh-russh-sftp/src/extensions.rs b/crates/bssh-russh-sftp/src/extensions.rs new file mode 100644 index 00000000..76db05c9 --- /dev/null +++ b/crates/bssh-russh-sftp/src/extensions.rs @@ -0,0 +1,76 @@ +use crate::{error::Error, ser}; + +pub const LIMITS: &str = "limits@openssh.com"; +pub const HARDLINK: &str = "hardlink@openssh.com"; +pub const FSYNC: &str = "fsync@openssh.com"; +pub const STATVFS: &str = "statvfs@openssh.com"; + +macro_rules! impl_try_into_bytes { + ($struct:ty) => { + impl TryInto> for $struct { + type Error = Error; + + fn try_into(self) -> Result, Self::Error> { + ser::to_bytes(&self).map(|b| b.to_vec()) + } + } + }; +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct LimitsExtension { + pub max_packet_len: u64, + pub max_read_len: u64, + pub max_write_len: u64, + pub max_open_handles: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct HardlinkExtension { + pub oldpath: String, + pub newpath: String, +} + +impl_try_into_bytes!(HardlinkExtension); + +#[derive(Debug, Serialize, Deserialize)] +pub struct FsyncExtension { + pub handle: String, +} + +impl_try_into_bytes!(FsyncExtension); + +#[derive(Debug, Serialize, Deserialize)] +pub struct StatvfsExtension { + pub path: String, +} + +impl_try_into_bytes!(StatvfsExtension); + +#[derive(Debug, Serialize, Deserialize)] +pub struct Statvfs { + /// The file system block size + pub block_size: u64, + /// The fundamental file system block size + pub fragment_size: u64, + /// The number of blocks. + /// + /// Units are in units of `fragment_size` + pub blocks: u64, + /// The number of free blocks in the file system + pub blocks_free: u64, + /// The number of free blocks for unprivileged users + pub blocks_avail: u64, + /// The total number of file inodes + pub inodes: u64, + /// The number of free file inodes + pub inodes_free: u64, + /// The number of free file inodes for unprivileged users + pub inodes_avail: u64, + /// The file system id + pub fs_id: u64, + /// The mount flags + pub flags: u64, + /// The maximum filename length + pub name_max: u64, +} diff --git a/crates/bssh-russh-sftp/src/lib.rs b/crates/bssh-russh-sftp/src/lib.rs new file mode 100644 index 00000000..240bb300 --- /dev/null +++ b/crates/bssh-russh-sftp/src/lib.rs @@ -0,0 +1,38 @@ +// Lints tripped by vendored upstream source that we do not want to diverge from. +#![allow(clippy::io_other_error)] + +//! SFTP subsystem with client and server support for Russh and more! +//! +//! Crate can provide compatibility with anything that can provide the raw data +//! stream in and out of the subsystem channel. +//! +//! The client implementation contains: +//! +//! * Standard communication via [RawSftpSession](crate::client::RawSftpSession) which provides methods +//! for sending and receiving a packet in place. +//! * [High level](crate::client::SftpSession) is similar to [`std::fs`] and has almost all the same +//! implementations. Implements Async I/O for interaction with files. The main idea is to abstract +//! from all the nuances and flaws of the SFTP protocol. This also takes into account the extension +//! provided by the server provided by the server such as `limits@openssh.com` and `fsync@openssh.com`. +//! +//! You can find more examples in the repository. + +#[macro_use] +extern crate log; +#[macro_use] +extern crate bitflags; +#[macro_use] +extern crate serde; + +mod buf; +/// Client side +pub mod client; +pub mod de; +mod error; +pub mod extensions; +/// Protocol implementation +pub mod protocol; +pub mod ser; +/// Server side +pub mod server; +mod utils; diff --git a/crates/bssh-russh-sftp/src/protocol/attrs.rs b/crates/bssh-russh-sftp/src/protocol/attrs.rs new file mode 100644 index 00000000..e0a1c2dd --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/attrs.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, FileAttributes, Packet, RequestId}; + +/// Implementation for `SSH_FXP_ATTRS` +#[derive(Debug, Serialize, Deserialize)] +pub struct Attrs { + pub id: u32, + pub attrs: FileAttributes, +} + +impl_request_id!(Attrs); +impl_packet_for!(Attrs); diff --git a/crates/bssh-russh-sftp/src/protocol/close.rs b/crates/bssh-russh-sftp/src/protocol/close.rs new file mode 100644 index 00000000..2e06f359 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/close.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_CLOSE` +#[derive(Debug, Serialize, Deserialize)] +pub struct Close { + pub id: u32, + pub handle: String, +} + +impl_request_id!(Close); +impl_packet_for!(Close); diff --git a/crates/bssh-russh-sftp/src/protocol/data.rs b/crates/bssh-russh-sftp/src/protocol/data.rs new file mode 100644 index 00000000..e6cbe1c5 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/data.rs @@ -0,0 +1,12 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_DATA` +#[derive(Debug, Serialize, Deserialize)] +pub struct Data { + pub id: u32, + #[serde(with = "serde_bytes")] + pub data: Vec, +} + +impl_request_id!(Data); +impl_packet_for!(Data); diff --git a/crates/bssh-russh-sftp/src/protocol/extended.rs b/crates/bssh-russh-sftp/src/protocol/extended.rs new file mode 100644 index 00000000..2ba7a9c0 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/extended.rs @@ -0,0 +1,27 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; +use crate::{de::data_deserialize, ser::data_serialize}; + +/// Implementation for `SSH_FXP_EXTENDED` +#[derive(Debug, Serialize, Deserialize)] +pub struct Extended { + pub id: u32, + pub request: String, + #[serde(serialize_with = "data_serialize")] + #[serde(deserialize_with = "data_deserialize")] + pub data: Vec, +} + +impl_request_id!(Extended); +impl_packet_for!(Extended); + +/// Implementation for `SSH_FXP_EXTENDED_REPLY` +#[derive(Debug, Serialize, Deserialize)] +pub struct ExtendedReply { + pub id: u32, + #[serde(serialize_with = "data_serialize")] + #[serde(deserialize_with = "data_deserialize")] + pub data: Vec, +} + +impl_request_id!(ExtendedReply); +impl_packet_for!(ExtendedReply); diff --git a/crates/bssh-russh-sftp/src/protocol/file.rs b/crates/bssh-russh-sftp/src/protocol/file.rs new file mode 100644 index 00000000..5f2eccd5 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/file.rs @@ -0,0 +1,60 @@ +use chrono::{DateTime, Utc}; +use std::time::{Duration, UNIX_EPOCH}; + +use super::FileAttributes; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct File { + pub filename: String, + pub longname: String, + pub attrs: FileAttributes, +} + +impl File { + /// Omits `longname` and set dummy `attributes`. This is mainly used for [`crate::server::Handler::realpath`] as per the standard + pub fn dummy>(filename: S) -> Self { + Self { + filename: filename.into(), + longname: "".to_string(), + attrs: FileAttributes::default(), + } + } + + /// Implies the use of longname + pub fn new>(filename: S, attrs: FileAttributes) -> Self { + let mut file = Self { + filename: filename.into(), + longname: "".to_string(), + attrs, + }; + file.longname = file.longname(); + file + } + + /// Get formed longname + pub fn longname(&self) -> String { + let directory = if self.attrs.is_dir() { "d" } else { "-" }; + let permissions = self.attrs.permissions().to_string(); + + let size = self.attrs.size.unwrap_or(0); + let mtime = self.attrs.mtime.unwrap_or(0); + + let datetime = DateTime::::from(UNIX_EPOCH + Duration::from_secs(mtime as u64)); + let delayed = datetime.format("%b %d %Y %H:%M"); + + format!( + "{directory}{permissions} 0 {} {} {size} {delayed} {}", + if let Some(user) = &self.attrs.user { + user.to_string() + } else { + self.attrs.uid.unwrap_or(0).to_string() + }, + if let Some(group) = &self.attrs.group { + group.to_string() + } else { + self.attrs.gid.unwrap_or(0).to_string() + }, + self.filename + ) + } +} diff --git a/crates/bssh-russh-sftp/src/protocol/file_attrs.rs b/crates/bssh-russh-sftp/src/protocol/file_attrs.rs new file mode 100644 index 00000000..660ba3a4 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/file_attrs.rs @@ -0,0 +1,456 @@ +use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Deserializer, Serialize}; +#[cfg(unix)] +use std::os::unix::fs::MetadataExt; +use std::{ + fmt, + fs::Metadata, + io::ErrorKind, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use crate::utils; + +/// Attributes flags according to the specification +#[derive(Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct FileAttr(u32); + +/// Type according to mode unix +#[derive(Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct FileMode(u32); + +/// Type describing permission flags +#[derive(Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct FilePermissionFlags(u32); + +bitflags! { + impl FileAttr: u32 { + const SIZE = 0x00000001; + const UIDGID = 0x00000002; + const PERMISSIONS = 0x00000004; + const ACMODTIME = 0x00000008; + const EXTENDED = 0x80000000; + } + + impl FileMode: u32 { + const FIFO = 0x1000; + const CHR = 0x2000; + const DIR = 0x4000; + const NAM = 0x5000; + const BLK = 0x6000; + const REG = 0x8000; + const LNK = 0xA000; + const SOCK = 0xC000; + } + + impl FilePermissionFlags: u32 { + const OTHER_READ = 0o4; + const OTHER_WRITE = 0o2; + const OTHER_EXEC = 0o1; + const GROUP_READ = 0o40; + const GROUP_WRITE = 0o20; + const GROUP_EXEC = 0o10; + const OWNER_READ = 0o400; + const OWNER_WRITE = 0o200; + const OWNER_EXEC = 0o100; + } +} + +/// Represents a simplified version of the [`FileMode`] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FileType { + Dir, + File, + Symlink, + Other, +} + +impl FileType { + /// Returns `true` if the file is a directory + pub fn is_dir(&self) -> bool { + matches!(self, Self::Dir) + } + + /// Returns `true` if the file is a file + pub fn is_file(&self) -> bool { + matches!(self, Self::File) + } + + /// Returns `true` if the file is a symlink + pub fn is_symlink(&self) -> bool { + matches!(self, Self::Symlink) + } + + /// Returns `true` if the file has a distinctive type + pub fn is_other(&self) -> bool { + matches!(self, Self::Other) + } +} + +impl From for FileType { + fn from(mode: FileMode) -> Self { + if mode == FileMode::DIR { + FileType::Dir + } else if mode == FileMode::LNK { + FileType::Symlink + } else if mode == FileMode::REG { + FileType::File + } else { + FileType::Other + } + } +} + +impl From for FileType { + fn from(mode: u32) -> Self { + FileMode::from_bits_truncate(mode).into() + } +} + +#[derive(Default, Clone, Copy, PartialEq, Eq)] +pub struct FilePermissions { + pub other_exec: bool, + pub other_read: bool, + pub other_write: bool, + pub group_exec: bool, + pub group_read: bool, + pub group_write: bool, + pub owner_exec: bool, + pub owner_read: bool, + pub owner_write: bool, +} + +impl FilePermissions { + /// Returns `true` if the file is read-only. + pub fn is_readonly(&self) -> bool { + !self.other_write && !self.group_write && !self.owner_write + } + + /// Clears all flags or sets them to a positive value. Works for unix. + pub fn set_readonly(&mut self, readonly: bool) { + self.other_exec = !readonly; + self.other_write = !readonly; + self.group_exec = !readonly; + self.group_write = !readonly; + self.owner_exec = !readonly; + self.owner_write = !readonly; + + if readonly { + self.other_read = true; + self.group_read = true; + self.owner_read = true; + } + } +} + +impl fmt::Display for FilePermissions { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}{}{}{}{}{}{}{}{}", + if self.owner_read { "r" } else { "-" }, + if self.owner_write { "w" } else { "-" }, + if self.owner_exec { "x" } else { "-" }, + if self.group_read { "r" } else { "-" }, + if self.group_write { "w" } else { "-" }, + if self.group_exec { "x" } else { "-" }, + if self.other_read { "r" } else { "-" }, + if self.other_write { "w" } else { "-" }, + if self.other_exec { "x" } else { "-" }, + ) + } +} + +impl From for FilePermissions { + fn from(flags: FilePermissionFlags) -> Self { + Self { + other_read: flags.contains(FilePermissionFlags::OTHER_READ), + other_write: flags.contains(FilePermissionFlags::OTHER_WRITE), + other_exec: flags.contains(FilePermissionFlags::OTHER_EXEC), + group_read: flags.contains(FilePermissionFlags::GROUP_READ), + group_write: flags.contains(FilePermissionFlags::GROUP_WRITE), + group_exec: flags.contains(FilePermissionFlags::GROUP_EXEC), + owner_read: flags.contains(FilePermissionFlags::OWNER_READ), + owner_write: flags.contains(FilePermissionFlags::OWNER_WRITE), + owner_exec: flags.contains(FilePermissionFlags::OWNER_EXEC), + } + } +} + +impl From for FilePermissions { + fn from(mode: u32) -> Self { + FilePermissionFlags::from_bits_truncate(mode).into() + } +} + +/// Used in the implementation of other packets. +/// Implements most [`Metadata`] methods +/// +/// The fields `user` and `group` are string names of users and groups for +/// clients that can be displayed in longname. Can be omitted. +/// +/// The `flags` field is omitted because it is set by itself depending on the fields +#[derive(Debug, Clone)] +pub struct FileAttributes { + pub size: Option, + pub uid: Option, + pub user: Option, + pub gid: Option, + pub group: Option, + pub permissions: Option, + pub atime: Option, + pub mtime: Option, +} + +macro_rules! impl_fn_type { + ($get_name:ident, $set_name:ident, $doc_name:expr, $flag:ident) => { + #[doc = "Returns `true` if is a "] + #[doc = $doc_name] + pub fn $get_name(&self) -> bool { + self.permissions.map_or(false, |b| { + FileMode::from_bits_truncate(b).contains(FileMode::$flag) + }) + } + + #[doc = "Set flag if is a "] + #[doc = $doc_name] + #[doc = " or not"] + pub fn $set_name(&mut self, $get_name: bool) { + match $get_name { + true => self.set_type(FileMode::$flag), + false => self.remove_type(FileMode::$flag), + } + } + }; +} + +impl FileAttributes { + impl_fn_type!(is_dir, set_dir, "dir", DIR); + impl_fn_type!(is_regular, set_regular, "regular", REG); + impl_fn_type!(is_symlink, set_symlink, "symlink", LNK); + impl_fn_type!(is_character, set_character, "character", CHR); + impl_fn_type!(is_block, set_block, "block", BLK); + impl_fn_type!(is_fifo, set_fifo, "fifo", FIFO); + + /// Set mode flag + pub fn set_type(&mut self, mode: FileMode) { + let perms = self.permissions.unwrap_or(0); + self.permissions = Some(perms | mode.bits()); + } + + /// Remove mode flag + pub fn remove_type(&mut self, mode: FileMode) { + let perms = self.permissions.unwrap_or(0); + self.permissions = Some(perms & !mode.bits()); + } + + /// Returns the file type + pub fn file_type(&self) -> FileType { + FileMode::from_bits_truncate(self.permissions.unwrap_or_default()).into() + } + + /// Returns `true` if is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the size of the file + pub fn len(&self) -> u64 { + self.size.unwrap_or(0) + } + + /// Returns the permissions of the file this metadata is for. + pub fn permissions(&self) -> FilePermissions { + FilePermissionFlags::from_bits_truncate(self.permissions.unwrap_or_default()).into() + } + + /// Returns the last access time + pub fn accessed(&self) -> std::io::Result { + match self.atime { + Some(time) => Ok(UNIX_EPOCH + Duration::from_secs(time as u64)), + None => Err(ErrorKind::InvalidData.into()), + } + } + + /// Returns the last modification time + pub fn modified(&self) -> std::io::Result { + match self.mtime { + Some(time) => Ok(UNIX_EPOCH + Duration::from_secs(time as u64)), + None => Err(ErrorKind::InvalidData.into()), + } + } + + /// Creates a structure with omitted attributes + pub fn empty() -> Self { + Self { + size: None, + uid: None, + user: None, + gid: None, + group: None, + permissions: None, + atime: None, + mtime: None, + } + } +} + +/// For packets which require dummy attributes +impl Default for FileAttributes { + fn default() -> Self { + Self { + size: Some(0), + uid: Some(0), + user: None, + gid: Some(0), + group: None, + permissions: Some(0o777 | FileMode::DIR.bits()), + atime: Some(0), + mtime: Some(0), + } + } +} + +/// For simple conversion of [`Metadata`] into [`FileAttributes`] +impl From<&Metadata> for FileAttributes { + fn from(metadata: &Metadata) -> Self { + let mut attrs = Self { + size: Some(metadata.len()), + #[cfg(unix)] + uid: Some(metadata.uid()), + #[cfg(unix)] + gid: Some(metadata.gid()), + #[cfg(windows)] + permissions: Some(if metadata.permissions().readonly() { + 0o555 + } else { + 0o777 + }), + #[cfg(unix)] + permissions: Some(metadata.mode()), + atime: Some(utils::unix(metadata.accessed().unwrap_or(UNIX_EPOCH))), + mtime: Some(utils::unix(metadata.modified().unwrap_or(UNIX_EPOCH))), + ..Default::default() + }; + + attrs.set_dir(metadata.is_dir()); + attrs.set_regular(!metadata.is_dir()); + + attrs + } +} + +impl Serialize for FileAttributes { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut attrs = FileAttr::default(); + let mut field_count = 1; + + if self.size.is_some() { + attrs |= FileAttr::SIZE; + field_count += 1; + } + + if self.uid.is_some() || self.gid.is_some() { + attrs |= FileAttr::UIDGID; + field_count += 2; + } + + if self.permissions.is_some() { + attrs |= FileAttr::PERMISSIONS; + field_count += 1; + } + + if self.atime.is_some() || self.mtime.is_some() { + attrs |= FileAttr::ACMODTIME; + field_count += 2; + } + + let mut s = serializer.serialize_struct("FileAttributes", field_count)?; + s.serialize_field("attrs", &attrs)?; + + if let Some(size) = self.size { + s.serialize_field("size", &size)?; + } + + if self.uid.is_some() || self.gid.is_some() { + s.serialize_field("uid", &self.uid.unwrap_or(0))?; + s.serialize_field("gid", &self.gid.unwrap_or(0))?; + } + + if let Some(permissions) = self.permissions { + s.serialize_field("permissions", &permissions)?; + } + + if self.atime.is_some() || self.mtime.is_some() { + s.serialize_field("atime", &self.atime.unwrap_or(0))?; + s.serialize_field("mtime", &self.mtime.unwrap_or(0))?; + } + + // todo: extended implementation + + s.end() + } +} + +impl<'de> Deserialize<'de> for FileAttributes { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct FileAttributesVisitor; + + impl<'de> Visitor<'de> for FileAttributesVisitor { + type Value = FileAttributes; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("file attributes") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let attrs = FileAttr::from_bits_truncate(seq.next_element::()?.unwrap_or(0)); + + Ok(FileAttributes { + size: if attrs.contains(FileAttr::SIZE) { + seq.next_element::()? + } else { + None + }, + uid: if attrs.contains(FileAttr::UIDGID) { + seq.next_element::()? + } else { + None + }, + user: None, + gid: if attrs.contains(FileAttr::UIDGID) { + seq.next_element::()? + } else { + None + }, + group: None, + permissions: if attrs.contains(FileAttr::PERMISSIONS) { + seq.next_element::()? + } else { + None + }, + atime: if attrs.contains(FileAttr::ACMODTIME) { + seq.next_element::()? + } else { + None + }, + mtime: if attrs.contains(FileAttr::ACMODTIME) { + seq.next_element::()? + } else { + None + }, + }) + } + } + + deserializer.deserialize_any(FileAttributesVisitor) + } +} diff --git a/crates/bssh-russh-sftp/src/protocol/fsetstat.rs b/crates/bssh-russh-sftp/src/protocol/fsetstat.rs new file mode 100644 index 00000000..28439835 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/fsetstat.rs @@ -0,0 +1,12 @@ +use super::{impl_packet_for, impl_request_id, FileAttributes, Packet, RequestId}; + +/// Implementation for `SSH_FXP_FSETSTAT` +#[derive(Debug, Serialize, Deserialize)] +pub struct FSetStat { + pub id: u32, + pub handle: String, + pub attrs: FileAttributes, +} + +impl_request_id!(FSetStat); +impl_packet_for!(FSetStat); diff --git a/crates/bssh-russh-sftp/src/protocol/fstat.rs b/crates/bssh-russh-sftp/src/protocol/fstat.rs new file mode 100644 index 00000000..cff6a0d6 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/fstat.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_FSTAT` +#[derive(Debug, Serialize, Deserialize)] +pub struct Fstat { + pub id: u32, + pub handle: String, +} + +impl_request_id!(Fstat); +impl_packet_for!(Fstat); diff --git a/crates/bssh-russh-sftp/src/protocol/handle.rs b/crates/bssh-russh-sftp/src/protocol/handle.rs new file mode 100644 index 00000000..8b60d653 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/handle.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_HANDLE` +#[derive(Debug, Serialize, Deserialize)] +pub struct Handle { + pub id: u32, + pub handle: String, +} + +impl_request_id!(Handle); +impl_packet_for!(Handle); diff --git a/crates/bssh-russh-sftp/src/protocol/init.rs b/crates/bssh-russh-sftp/src/protocol/init.rs new file mode 100644 index 00000000..0bb0f7b4 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/init.rs @@ -0,0 +1,27 @@ +use std::collections::HashMap; + +use super::{impl_packet_for, Packet, VERSION}; + +/// Implementation for `SSH_FXP_INIT` +#[derive(Debug, Serialize, Deserialize)] +pub struct Init { + pub version: u32, + pub extensions: HashMap, +} + +impl_packet_for!(Init); + +impl Init { + pub fn new() -> Self { + Self { + version: VERSION, + extensions: HashMap::new(), + } + } +} + +impl Default for Init { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/bssh-russh-sftp/src/protocol/lstat.rs b/crates/bssh-russh-sftp/src/protocol/lstat.rs new file mode 100644 index 00000000..0cf27999 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/lstat.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_LSTAT` +#[derive(Debug, Serialize, Deserialize)] +pub struct Lstat { + pub id: u32, + pub path: String, +} + +impl_request_id!(Lstat); +impl_packet_for!(Lstat); diff --git a/crates/bssh-russh-sftp/src/protocol/mkdir.rs b/crates/bssh-russh-sftp/src/protocol/mkdir.rs new file mode 100644 index 00000000..75c1fbfe --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/mkdir.rs @@ -0,0 +1,12 @@ +use super::{impl_packet_for, impl_request_id, FileAttributes, Packet, RequestId}; + +/// Implementation for `SSH_FXP_MKDIR` +#[derive(Debug, Serialize, Deserialize)] +pub struct MkDir { + pub id: u32, + pub path: String, + pub attrs: FileAttributes, +} + +impl_request_id!(MkDir); +impl_packet_for!(MkDir); diff --git a/crates/bssh-russh-sftp/src/protocol/mod.rs b/crates/bssh-russh-sftp/src/protocol/mod.rs new file mode 100644 index 00000000..50d58ab5 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/mod.rs @@ -0,0 +1,280 @@ +mod attrs; +mod close; +mod data; +mod extended; +mod file; +mod file_attrs; +mod fsetstat; +mod fstat; +mod handle; +mod init; +mod lstat; +mod mkdir; +mod name; +mod open; +mod opendir; +mod read; +mod readdir; +mod readlink; +mod realpath; +mod remove; +mod rename; +mod rmdir; +mod setstat; +mod stat; +mod status; +mod symlink; +mod version; +mod write; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +use crate::{de, error::Error, ser}; + +pub use self::{ + attrs::Attrs, + close::Close, + data::Data, + extended::{Extended, ExtendedReply}, + file::File, + file_attrs::{ + FileAttr, FileAttributes, FileMode, FilePermissionFlags, FilePermissions, FileType, + }, + fsetstat::FSetStat, + fstat::Fstat, + handle::Handle, + init::Init, + lstat::Lstat, + mkdir::MkDir, + name::Name, + open::{Open, OpenFlags}, + opendir::OpenDir, + read::Read, + readdir::ReadDir, + readlink::ReadLink, + realpath::RealPath, + remove::Remove, + rename::Rename, + rmdir::RmDir, + setstat::SetStat, + stat::Stat, + status::{Status, StatusCode}, + symlink::Symlink, + version::Version, + write::Write, +}; + +pub const VERSION: u32 = 3; + +const SSH_FXP_INIT: u8 = 1; +const SSH_FXP_VERSION: u8 = 2; +const SSH_FXP_OPEN: u8 = 3; +const SSH_FXP_CLOSE: u8 = 4; +const SSH_FXP_READ: u8 = 5; +const SSH_FXP_WRITE: u8 = 6; +const SSH_FXP_LSTAT: u8 = 7; +const SSH_FXP_FSTAT: u8 = 8; +const SSH_FXP_SETSTAT: u8 = 9; +const SSH_FXP_FSETSTAT: u8 = 10; +const SSH_FXP_OPENDIR: u8 = 11; +const SSH_FXP_READDIR: u8 = 12; +const SSH_FXP_REMOVE: u8 = 13; +const SSH_FXP_MKDIR: u8 = 14; +const SSH_FXP_RMDIR: u8 = 15; +const SSH_FXP_REALPATH: u8 = 16; +const SSH_FXP_STAT: u8 = 17; +const SSH_FXP_RENAME: u8 = 18; +const SSH_FXP_READLINK: u8 = 19; +const SSH_FXP_SYMLINK: u8 = 20; + +const SSH_FXP_STATUS: u8 = 101; +const SSH_FXP_HANDLE: u8 = 102; +const SSH_FXP_DATA: u8 = 103; +const SSH_FXP_NAME: u8 = 104; +const SSH_FXP_ATTRS: u8 = 105; + +const SSH_FXP_EXTENDED: u8 = 200; +const SSH_FXP_EXTENDED_REPLY: u8 = 201; + +pub(crate) trait RequestId: Sized { + fn get_request_id(&self) -> u32; +} + +macro_rules! impl_request_id { + ($packet:ty) => { + impl RequestId for $packet { + fn get_request_id(&self) -> u32 { + self.id + } + } + }; +} + +macro_rules! impl_packet_for { + ($name:ident) => { + impl From<$name> for Packet { + fn from(input: $name) -> Self { + Self::$name(input) + } + } + }; +} + +pub(crate) use impl_packet_for; +pub(crate) use impl_request_id; + +#[derive(Debug)] +pub enum Packet { + Init(Init), + Version(Version), + Open(Open), + Close(Close), + Read(Read), + Write(Write), + Lstat(Lstat), + Fstat(Fstat), + SetStat(SetStat), + FSetStat(FSetStat), + OpenDir(OpenDir), + ReadDir(ReadDir), + Remove(Remove), + MkDir(MkDir), + RmDir(RmDir), + RealPath(RealPath), + Stat(Stat), + Rename(Rename), + ReadLink(ReadLink), + Symlink(Symlink), + Status(Status), + Handle(Handle), + Data(Data), + Name(Name), + Attrs(Attrs), + Extended(Extended), + ExtendedReply(ExtendedReply), +} + +impl Packet { + pub fn get_request_id(&self) -> u32 { + match self { + Self::Open(open) => open.get_request_id(), + Self::Close(close) => close.get_request_id(), + Self::Read(read) => read.get_request_id(), + Self::Write(write) => write.get_request_id(), + Self::Lstat(lstat) => lstat.get_request_id(), + Self::Fstat(fstat) => fstat.get_request_id(), + Self::SetStat(setstat) => setstat.get_request_id(), + Self::FSetStat(fsetstat) => fsetstat.get_request_id(), + Self::OpenDir(opendir) => opendir.get_request_id(), + Self::ReadDir(readdir) => readdir.get_request_id(), + Self::Remove(remove) => remove.get_request_id(), + Self::MkDir(mkdir) => mkdir.get_request_id(), + Self::RmDir(rmdir) => rmdir.get_request_id(), + Self::RealPath(realpath) => realpath.get_request_id(), + Self::Stat(stat) => stat.get_request_id(), + Self::Rename(rename) => rename.get_request_id(), + Self::ReadLink(readlink) => readlink.get_request_id(), + Self::Symlink(symlink) => symlink.get_request_id(), + Self::Extended(extended) => extended.get_request_id(), + _ => 0, + } + } + + pub fn status(id: u32, status_code: StatusCode, msg: &str, tag: &str) -> Self { + Packet::Status(Status { + id, + status_code, + error_message: msg.to_string(), + language_tag: tag.to_string(), + }) + } + + pub fn error(id: u32, status_code: StatusCode) -> Self { + Self::status(id, status_code, &status_code.to_string(), "en-US") + } +} + +impl TryFrom<&mut Bytes> for Packet { + type Error = Error; + + fn try_from(bytes: &mut Bytes) -> Result { + let r#type = bytes.try_get_u8()?; + debug!("packet type {}", r#type); + + let request = match r#type { + SSH_FXP_INIT => Self::Init(de::from_bytes(bytes)?), + SSH_FXP_VERSION => Self::Version(de::from_bytes(bytes)?), + SSH_FXP_OPEN => Self::Open(de::from_bytes(bytes)?), + SSH_FXP_CLOSE => Self::Close(de::from_bytes(bytes)?), + SSH_FXP_READ => Self::Read(de::from_bytes(bytes)?), + SSH_FXP_WRITE => Self::Write(de::from_bytes(bytes)?), + SSH_FXP_LSTAT => Self::Lstat(de::from_bytes(bytes)?), + SSH_FXP_FSTAT => Self::Fstat(de::from_bytes(bytes)?), + SSH_FXP_SETSTAT => Self::SetStat(de::from_bytes(bytes)?), + SSH_FXP_FSETSTAT => Self::FSetStat(de::from_bytes(bytes)?), + SSH_FXP_OPENDIR => Self::OpenDir(de::from_bytes(bytes)?), + SSH_FXP_READDIR => Self::ReadDir(de::from_bytes(bytes)?), + SSH_FXP_REMOVE => Self::Remove(de::from_bytes(bytes)?), + SSH_FXP_MKDIR => Self::MkDir(de::from_bytes(bytes)?), + SSH_FXP_RMDIR => Self::RmDir(de::from_bytes(bytes)?), + SSH_FXP_REALPATH => Self::RealPath(de::from_bytes(bytes)?), + SSH_FXP_STAT => Self::Stat(de::from_bytes(bytes)?), + SSH_FXP_RENAME => Self::Rename(de::from_bytes(bytes)?), + SSH_FXP_READLINK => Self::ReadLink(de::from_bytes(bytes)?), + SSH_FXP_SYMLINK => Self::Symlink(de::from_bytes(bytes)?), + SSH_FXP_STATUS => Self::Status(de::from_bytes(bytes)?), + SSH_FXP_HANDLE => Self::Handle(de::from_bytes(bytes)?), + SSH_FXP_DATA => Self::Data(de::from_bytes(bytes)?), + SSH_FXP_NAME => Self::Name(de::from_bytes(bytes)?), + SSH_FXP_ATTRS => Self::Attrs(de::from_bytes(bytes)?), + SSH_FXP_EXTENDED => Self::Extended(de::from_bytes(bytes)?), + SSH_FXP_EXTENDED_REPLY => Self::ExtendedReply(de::from_bytes(bytes)?), + _ => return Err(Error::BadMessage("unknown type".to_owned())), + }; + + Ok(request) + } +} + +impl TryFrom for Bytes { + type Error = Error; + + fn try_from(packet: Packet) -> Result { + let (r#type, payload): (u8, Bytes) = match packet { + Packet::Init(init) => (SSH_FXP_INIT, ser::to_bytes(&init)?), + Packet::Version(version) => (SSH_FXP_VERSION, ser::to_bytes(&version)?), + Packet::Open(open) => (SSH_FXP_OPEN, ser::to_bytes(&open)?), + Packet::Close(close) => (SSH_FXP_CLOSE, ser::to_bytes(&close)?), + Packet::Read(read) => (SSH_FXP_READ, ser::to_bytes(&read)?), + Packet::Write(write) => (SSH_FXP_WRITE, ser::to_bytes(&write)?), + Packet::Lstat(stat) => (SSH_FXP_LSTAT, ser::to_bytes(&stat)?), + Packet::Fstat(stat) => (SSH_FXP_FSTAT, ser::to_bytes(&stat)?), + Packet::SetStat(setstat) => (SSH_FXP_SETSTAT, ser::to_bytes(&setstat)?), + Packet::FSetStat(setstat) => (SSH_FXP_FSETSTAT, ser::to_bytes(&setstat)?), + Packet::OpenDir(opendir) => (SSH_FXP_OPENDIR, ser::to_bytes(&opendir)?), + Packet::ReadDir(readdir) => (SSH_FXP_READDIR, ser::to_bytes(&readdir)?), + Packet::Remove(remove) => (SSH_FXP_REMOVE, ser::to_bytes(&remove)?), + Packet::MkDir(mkdir) => (SSH_FXP_MKDIR, ser::to_bytes(&mkdir)?), + Packet::RmDir(rmdir) => (SSH_FXP_RMDIR, ser::to_bytes(&rmdir)?), + Packet::RealPath(realpath) => (SSH_FXP_REALPATH, ser::to_bytes(&realpath)?), + Packet::Stat(stat) => (SSH_FXP_STAT, ser::to_bytes(&stat)?), + Packet::Rename(rename) => (SSH_FXP_RENAME, ser::to_bytes(&rename)?), + Packet::ReadLink(readlink) => (SSH_FXP_READLINK, ser::to_bytes(&readlink)?), + Packet::Symlink(symlink) => (SSH_FXP_SYMLINK, ser::to_bytes(&symlink)?), + Packet::Status(status) => (SSH_FXP_STATUS, ser::to_bytes(&status)?), + Packet::Handle(handle) => (SSH_FXP_HANDLE, ser::to_bytes(&handle)?), + Packet::Data(data) => (SSH_FXP_DATA, ser::to_bytes(&data)?), + Packet::Name(name) => (SSH_FXP_NAME, ser::to_bytes(&name)?), + Packet::Attrs(attrs) => (SSH_FXP_ATTRS, ser::to_bytes(&attrs)?), + Packet::Extended(extended) => (SSH_FXP_EXTENDED, ser::to_bytes(&extended)?), + Packet::ExtendedReply(reply) => (SSH_FXP_EXTENDED_REPLY, ser::to_bytes(&reply)?), + }; + + let length = payload.len() as u32 + 1; + let mut bytes = BytesMut::new(); + bytes.put_u32(length); + bytes.put_u8(r#type); + bytes.put_slice(&payload); + Ok(bytes.freeze()) + } +} diff --git a/crates/bssh-russh-sftp/src/protocol/name.rs b/crates/bssh-russh-sftp/src/protocol/name.rs new file mode 100644 index 00000000..2fc90040 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/name.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; + +use super::{impl_packet_for, impl_request_id, File, Packet, RequestId}; + +/// Implementation for `SSH_FXP_NAME` +#[derive(Debug, Serialize, Deserialize)] +pub struct Name { + pub id: u32, + pub files: Vec, +} + +impl_request_id!(Name); +impl_packet_for!(Name); diff --git a/crates/bssh-russh-sftp/src/protocol/open.rs b/crates/bssh-russh-sftp/src/protocol/open.rs new file mode 100644 index 00000000..28a198f8 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/open.rs @@ -0,0 +1,63 @@ +use std::fs; + +use super::{impl_packet_for, impl_request_id, FileAttributes, Packet, RequestId}; + +/// Opening flags according to the specification +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub struct OpenFlags(u32); + +bitflags! { + impl OpenFlags: u32 { + const READ = 0x00000001; + const WRITE = 0x00000002; + const APPEND = 0x00000004; + const CREATE = 0x00000008; + const TRUNCATE = 0x00000010; + const EXCLUDE = 0x00000020; + } +} + +impl From for fs::OpenOptions { + fn from(value: OpenFlags) -> Self { + let mut open_options = fs::OpenOptions::new(); + if value.contains(OpenFlags::READ) { + open_options.read(true); + } + if value.contains(OpenFlags::WRITE) { + open_options.write(true); + } + if value.contains(OpenFlags::APPEND) { + open_options.append(true); + } + if value.contains(OpenFlags::CREATE) { + // SFTPv3 spec requires the `CREATE` flag to be set if the `EXCLUDE` flag + // is set. Rusts `OpenOptions` has different semantics: it ignores + // whether `create` or `truncate` was set. + // SFTPv3 spec does not say anything about read/write flags, but + // they will be required to do anything else with the file. + // https://datatracker.ietf.org/doc/html/draft-ietf-secsh-filexfer-02#section-6.3 + if value.contains(OpenFlags::EXCLUDE) { + open_options.create_new(true); + } else { + open_options.create(true); + } + } + if value.contains(OpenFlags::TRUNCATE) { + open_options.truncate(true); + } + + open_options + } +} + +/// Implementation for `SSH_FXP_OPEN` +#[derive(Debug, Serialize, Deserialize)] +pub struct Open { + pub id: u32, + pub filename: String, + pub pflags: OpenFlags, + pub attrs: FileAttributes, +} + +impl_request_id!(Open); +impl_packet_for!(Open); diff --git a/crates/bssh-russh-sftp/src/protocol/opendir.rs b/crates/bssh-russh-sftp/src/protocol/opendir.rs new file mode 100644 index 00000000..174e9045 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/opendir.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_OPENDIR` +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenDir { + pub id: u32, + pub path: String, +} + +impl_request_id!(OpenDir); +impl_packet_for!(OpenDir); diff --git a/crates/bssh-russh-sftp/src/protocol/read.rs b/crates/bssh-russh-sftp/src/protocol/read.rs new file mode 100644 index 00000000..927deb84 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/read.rs @@ -0,0 +1,13 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_READ` +#[derive(Debug, Serialize, Deserialize)] +pub struct Read { + pub id: u32, + pub handle: String, + pub offset: u64, + pub len: u32, +} + +impl_request_id!(Read); +impl_packet_for!(Read); diff --git a/crates/bssh-russh-sftp/src/protocol/readdir.rs b/crates/bssh-russh-sftp/src/protocol/readdir.rs new file mode 100644 index 00000000..4955fc1b --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/readdir.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_READDIR` +#[derive(Debug, Serialize, Deserialize)] +pub struct ReadDir { + pub id: u32, + pub handle: String, +} + +impl_request_id!(ReadDir); +impl_packet_for!(ReadDir); diff --git a/crates/bssh-russh-sftp/src/protocol/readlink.rs b/crates/bssh-russh-sftp/src/protocol/readlink.rs new file mode 100644 index 00000000..4dfdda68 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/readlink.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_READLINK` +#[derive(Debug, Serialize, Deserialize)] +pub struct ReadLink { + pub id: u32, + pub path: String, +} + +impl_request_id!(ReadLink); +impl_packet_for!(ReadLink); diff --git a/crates/bssh-russh-sftp/src/protocol/realpath.rs b/crates/bssh-russh-sftp/src/protocol/realpath.rs new file mode 100644 index 00000000..45782c6b --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/realpath.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_REALPATH` +#[derive(Debug, Serialize, Deserialize)] +pub struct RealPath { + pub id: u32, + pub path: String, +} + +impl_request_id!(RealPath); +impl_packet_for!(RealPath); diff --git a/crates/bssh-russh-sftp/src/protocol/remove.rs b/crates/bssh-russh-sftp/src/protocol/remove.rs new file mode 100644 index 00000000..ca9a0879 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/remove.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_REMOVE` +#[derive(Debug, Serialize, Deserialize)] +pub struct Remove { + pub id: u32, + pub filename: String, +} + +impl_request_id!(Remove); +impl_packet_for!(Remove); diff --git a/crates/bssh-russh-sftp/src/protocol/rename.rs b/crates/bssh-russh-sftp/src/protocol/rename.rs new file mode 100644 index 00000000..95d6798d --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/rename.rs @@ -0,0 +1,12 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_RENAME` +#[derive(Debug, Serialize, Deserialize)] +pub struct Rename { + pub id: u32, + pub oldpath: String, + pub newpath: String, +} + +impl_request_id!(Rename); +impl_packet_for!(Rename); diff --git a/crates/bssh-russh-sftp/src/protocol/rmdir.rs b/crates/bssh-russh-sftp/src/protocol/rmdir.rs new file mode 100644 index 00000000..3239d0d8 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/rmdir.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_RMDIR` +#[derive(Debug, Serialize, Deserialize)] +pub struct RmDir { + pub id: u32, + pub path: String, +} + +impl_request_id!(RmDir); +impl_packet_for!(RmDir); diff --git a/crates/bssh-russh-sftp/src/protocol/setstat.rs b/crates/bssh-russh-sftp/src/protocol/setstat.rs new file mode 100644 index 00000000..a0314a94 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/setstat.rs @@ -0,0 +1,12 @@ +use super::{impl_packet_for, impl_request_id, FileAttributes, Packet, RequestId}; + +/// Implementation for `SSH_FXP_SETSTAT` and `MKDIR` +#[derive(Debug, Serialize, Deserialize)] +pub struct SetStat { + pub id: u32, + pub path: String, + pub attrs: FileAttributes, +} + +impl_request_id!(SetStat); +impl_packet_for!(SetStat); diff --git a/crates/bssh-russh-sftp/src/protocol/stat.rs b/crates/bssh-russh-sftp/src/protocol/stat.rs new file mode 100644 index 00000000..ccc22d9d --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/stat.rs @@ -0,0 +1,11 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_STAT` +#[derive(Debug, Serialize, Deserialize)] +pub struct Stat { + pub id: u32, + pub path: String, +} + +impl_request_id!(Stat); +impl_packet_for!(Stat); diff --git a/crates/bssh-russh-sftp/src/protocol/status.rs b/crates/bssh-russh-sftp/src/protocol/status.rs new file mode 100644 index 00000000..8b5b2d9c --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/status.rs @@ -0,0 +1,54 @@ +use thiserror::Error; + +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Error Codes for SSH_FXP_STATUS +#[derive(Debug, Error, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub enum StatusCode { + /// Indicates successful completion of the operation. + #[error("Ok")] + Ok = 0, + /// Indicates end-of-file condition; for SSH_FX_READ it means that no more data is available in the file, + /// and for SSH_FX_READDIR it indicates that no more files are contained in the directory. + #[error("Eof")] + Eof = 1, + /// A reference is made to a file which should exist but doesn't. + #[error("No such file")] + NoSuchFile = 2, + /// Authenticated user does not have sufficient permissions to perform the operation. + #[error("Permission denied")] + PermissionDenied = 3, + /// A generic catch-all error message; + /// it should be returned if an error occurs for which there is no more specific error code defined. + #[error("Failure")] + Failure = 4, + /// May be returned if a badly formatted packet or protocol incompatibility is detected. + #[error("Bad message")] + BadMessage = 5, + /// A pseudo-error which indicates that the client has no connection to the server + /// (it can only be generated locally by the client, and MUST NOT be returned by servers). + #[error("No connection")] + NoConnection = 6, + /// A pseudo-error which indicates that the connection to the server has been lost + /// (it can only be generated locally by the client, and MUST NOT be returned by servers). + #[error("Connection lost")] + ConnectionLost = 7, + /// Indicates that an attempt was made to perform an operation which is not supported for the server + /// (it may be generated locally by the client if e.g. the version number exchange indicates that a required feature is not supported by the server, + /// or it may be returned by the server if the server does not implement an operation). + #[error("Operation unsupported")] + OpUnsupported = 8, +} + +/// Implementation for SSH_FXP_STATUS as defined in the specification draft +/// +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Status { + pub id: u32, + pub status_code: StatusCode, + pub error_message: String, + pub language_tag: String, +} + +impl_request_id!(Status); +impl_packet_for!(Status); diff --git a/crates/bssh-russh-sftp/src/protocol/symlink.rs b/crates/bssh-russh-sftp/src/protocol/symlink.rs new file mode 100644 index 00000000..89546702 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/symlink.rs @@ -0,0 +1,12 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_SYMLINK` +#[derive(Debug, Serialize, Deserialize)] +pub struct Symlink { + pub id: u32, + pub linkpath: String, + pub targetpath: String, +} + +impl_request_id!(Symlink); +impl_packet_for!(Symlink); diff --git a/crates/bssh-russh-sftp/src/protocol/version.rs b/crates/bssh-russh-sftp/src/protocol/version.rs new file mode 100644 index 00000000..41f7e4b7 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/version.rs @@ -0,0 +1,27 @@ +use std::collections::HashMap; + +use super::{impl_packet_for, Packet, VERSION}; + +/// Implementation for `SSH_FXP_VERSION` +#[derive(Debug, Serialize, Deserialize)] +pub struct Version { + pub version: u32, + pub extensions: HashMap, +} + +impl_packet_for!(Version); + +impl Version { + pub fn new() -> Self { + Self { + version: VERSION, + extensions: HashMap::new(), + } + } +} + +impl Default for Version { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/bssh-russh-sftp/src/protocol/write.rs b/crates/bssh-russh-sftp/src/protocol/write.rs new file mode 100644 index 00000000..298e5045 --- /dev/null +++ b/crates/bssh-russh-sftp/src/protocol/write.rs @@ -0,0 +1,14 @@ +use super::{impl_packet_for, impl_request_id, Packet, RequestId}; + +/// Implementation for `SSH_FXP_WRITE` +#[derive(Debug, Serialize, Deserialize)] +pub struct Write { + pub id: u32, + pub handle: String, + pub offset: u64, + #[serde(with = "serde_bytes")] + pub data: Vec, +} + +impl_request_id!(Write); +impl_packet_for!(Write); diff --git a/crates/bssh-russh-sftp/src/ser.rs b/crates/bssh-russh-sftp/src/ser.rs new file mode 100644 index 00000000..8ffba16f --- /dev/null +++ b/crates/bssh-russh-sftp/src/ser.rs @@ -0,0 +1,338 @@ +use bytes::{BufMut, Bytes, BytesMut}; +use serde::ser::{ + SerializeMap, SerializeSeq, SerializeStruct, SerializeStructVariant, SerializeTuple, + SerializeTupleStruct, SerializeTupleVariant, +}; + +use crate::error::Error; + +pub struct Serializer { + output: BytesMut, +} + +/// Converting type to bytes according to protocol +pub fn to_bytes(value: &T) -> Result +where + T: serde::Serialize + ?Sized, +{ + let mut serializer = Serializer { + output: BytesMut::new(), + }; + value.serialize(&mut serializer)?; + Ok(serializer.output.freeze()) +} + +/// Serialization of a [`Vec`] without length. +pub fn data_serialize(data: &Vec, serializer: S) -> Result +where + S: serde::Serializer, +{ + let mut seq = serializer.serialize_seq(None)?; + for byte in data { + seq.serialize_element(byte)?; + } + seq.end() +} + +impl<'a> serde::Serializer for &'a mut Serializer { + type Ok = (); + type Error = Error; + type SerializeSeq = &'a mut Serializer; + type SerializeTuple = &'a mut Serializer; + type SerializeTupleStruct = &'a mut Serializer; + type SerializeTupleVariant = &'a mut Serializer; + type SerializeMap = &'a mut Serializer; + type SerializeStruct = &'a mut Serializer; + type SerializeStructVariant = &'a mut Serializer; + + fn serialize_bool(self, _v: bool) -> Result { + Err(Error::BadMessage("bool not supported".to_owned())) + } + + fn serialize_i8(self, _v: i8) -> Result { + Err(Error::BadMessage("i8 not supported".to_owned())) + } + + fn serialize_i16(self, _v: i16) -> Result { + Err(Error::BadMessage("i16 not supported".to_owned())) + } + + fn serialize_i32(self, _v: i32) -> Result { + Err(Error::BadMessage("i32 not supported".to_owned())) + } + + fn serialize_i64(self, _v: i64) -> Result { + Err(Error::BadMessage("i64 not supported".to_owned())) + } + + fn serialize_u8(self, v: u8) -> Result { + self.output.put_u8(v); + Ok(()) + } + + fn serialize_u16(self, _v: u16) -> Result { + Err(Error::BadMessage("u16 not supported".to_owned())) + } + + fn serialize_u32(self, v: u32) -> Result { + self.output.put_u32(v); + Ok(()) + } + + fn serialize_u64(self, v: u64) -> Result { + self.output.put_u64(v); + Ok(()) + } + + fn serialize_f32(self, _v: f32) -> Result { + Err(Error::BadMessage("f32 not supported".to_owned())) + } + + fn serialize_f64(self, _v: f64) -> Result { + Err(Error::BadMessage("f64 not supported".to_owned())) + } + + fn serialize_char(self, _v: char) -> Result { + Err(Error::BadMessage("char not supported".to_owned())) + } + + fn serialize_str(self, v: &str) -> Result { + let bytes = v.as_bytes(); + self.output.put_u32(bytes.len() as u32); + self.output.put_slice(bytes); + Ok(()) + } + + fn serialize_bytes(self, v: &[u8]) -> Result { + self.output.put_u32(v.len() as u32); + self.output.put_slice(v); + Ok(()) + } + + fn serialize_none(self) -> Result { + Ok(()) + } + + fn serialize_some(self, value: &T) -> Result + where + T: serde::Serialize + ?Sized, + { + value.serialize(self) + } + + fn serialize_unit(self) -> Result { + Err(Error::BadMessage("unit not supported".to_owned())) + } + + fn serialize_unit_struct(self, _name: &'static str) -> Result { + Err(Error::BadMessage("unit struct not supported".to_owned())) + } + + fn serialize_unit_variant( + self, + _name: &'static str, + variant_index: u32, + _variant: &'static str, + ) -> Result { + self.serialize_u32(variant_index) + } + + fn serialize_newtype_struct( + self, + _name: &'static str, + value: &T, + ) -> Result + where + T: serde::Serialize + ?Sized, + { + value.serialize(self) + } + + fn serialize_newtype_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + value: &T, + ) -> Result + where + T: serde::Serialize + ?Sized, + { + value.serialize(self) + } + + fn serialize_seq(self, len: Option) -> Result { + if let Some(len) = len { + self.output.put_u32(len as u32); + } + + Ok(self) + } + + fn serialize_tuple_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_tuple(self, _len: usize) -> Result { + Ok(self) + } + + fn serialize_tuple_struct( + self, + _name: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_map(self, _len: Option) -> Result { + Ok(self) + } + + fn serialize_struct( + self, + _name: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_struct_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + _len: usize, + ) -> Result { + Err(Error::BadMessage("struct variant not supported".to_owned())) + } + + fn is_human_readable(&self) -> bool { + false + } +} + +impl SerializeSeq for &mut Serializer { + type Ok = (); + type Error = Error; + + fn serialize_element(&mut self, value: &T) -> Result<(), Self::Error> + where + T: serde::Serialize + ?Sized, + { + value.serialize(&mut **self) + } + + fn end(self) -> Result { + Ok(()) + } +} + +impl SerializeMap for &mut Serializer { + type Ok = (); + type Error = Error; + + fn serialize_key(&mut self, key: &T) -> Result<(), Self::Error> + where + T: serde::Serialize + ?Sized, + { + key.serialize(&mut **self) + } + + fn serialize_value(&mut self, value: &T) -> Result<(), Self::Error> + where + T: serde::Serialize + ?Sized, + { + value.serialize(&mut **self) + } + + fn end(self) -> Result { + Ok(()) + } +} + +impl SerializeTuple for &mut Serializer { + type Ok = (); + type Error = Error; + + fn serialize_element(&mut self, value: &T) -> Result<(), Self::Error> + where + T: serde::Serialize + ?Sized, + { + value.serialize(&mut **self) + } + + fn end(self) -> Result { + Ok(()) + } +} + +impl SerializeStruct for &mut Serializer { + type Ok = (); + type Error = Error; + + fn serialize_field(&mut self, _key: &'static str, value: &T) -> Result<(), Self::Error> + where + T: serde::Serialize + ?Sized, + { + value.serialize(&mut **self) + } + + fn end(self) -> Result { + Ok(()) + } +} + +impl SerializeStructVariant for &mut Serializer { + type Ok = (); + type Error = Error; + + fn serialize_field(&mut self, _key: &'static str, value: &T) -> Result<(), Self::Error> + where + T: serde::Serialize + ?Sized, + { + value.serialize(&mut **self) + } + + fn end(self) -> Result { + Ok(()) + } +} + +impl SerializeTupleStruct for &mut Serializer { + type Ok = (); + type Error = Error; + + fn serialize_field(&mut self, value: &T) -> Result<(), Self::Error> + where + T: serde::Serialize + ?Sized, + { + value.serialize(&mut **self) + } + + fn end(self) -> Result { + Ok(()) + } +} + +impl SerializeTupleVariant for &mut Serializer { + type Ok = (); + type Error = Error; + + fn serialize_field(&mut self, value: &T) -> Result<(), Self::Error> + where + T: serde::Serialize + ?Sized, + { + value.serialize(&mut **self) + } + + fn end(self) -> Result { + Ok(()) + } +} diff --git a/crates/bssh-russh-sftp/src/server/handler.rs b/crates/bssh-russh-sftp/src/server/handler.rs new file mode 100644 index 00000000..d8a7d483 --- /dev/null +++ b/crates/bssh-russh-sftp/src/server/handler.rs @@ -0,0 +1,259 @@ +use std::{collections::HashMap, future::Future}; + +use crate::protocol::{ + Attrs, Data, FileAttributes, Handle, Name, OpenFlags, Packet, Status, StatusCode, Version, +}; + +/// Server handler for each client. This is `async_trait` +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +pub trait Handler: Sized { + /// The type must have an `Into` + /// implementation because a response must be sent + /// to any request, even if completed by error. + type Error: Into + Send; + + /// Called by the handler when the packet is not implemented + fn unimplemented(&self) -> Self::Error; + + /// The default is to send an SSH_FXP_VERSION response with + /// the protocol version and ignore any extensions. + #[allow(unused_variables)] + fn init( + &mut self, + version: u32, + extensions: HashMap, + ) -> impl Future> + Send { + async { Ok(Version::new()) } + } + + /// Called on SSH_FXP_OPEN + #[allow(unused_variables)] + fn open( + &mut self, + id: u32, + filename: String, + pflags: OpenFlags, + attrs: FileAttributes, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_CLOSE. + /// The status can be returned as Ok or as Err + #[allow(unused_variables)] + fn close( + &mut self, + id: u32, + handle: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_READ + #[allow(unused_variables)] + fn read( + &mut self, + id: u32, + handle: String, + offset: u64, + len: u32, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_WRITE + #[allow(unused_variables)] + fn write( + &mut self, + id: u32, + handle: String, + offset: u64, + data: Vec, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_LSTAT + #[allow(unused_variables)] + fn lstat( + &mut self, + id: u32, + path: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_FSTAT + #[allow(unused_variables)] + fn fstat( + &mut self, + id: u32, + handle: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_SETSTAT + #[allow(unused_variables)] + fn setstat( + &mut self, + id: u32, + path: String, + attrs: FileAttributes, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_FSETSTAT + #[allow(unused_variables)] + fn fsetstat( + &mut self, + id: u32, + handle: String, + attrs: FileAttributes, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_OPENDIR + #[allow(unused_variables)] + fn opendir( + &mut self, + id: u32, + path: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_READDIR. + /// EOF error should be returned at the end of reading the directory + #[allow(unused_variables)] + fn readdir( + &mut self, + id: u32, + handle: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_REMOVE. + /// The status can be returned as Ok or as Err + #[allow(unused_variables)] + fn remove( + &mut self, + id: u32, + filename: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_MKDIR + #[allow(unused_variables)] + fn mkdir( + &mut self, + id: u32, + path: String, + attrs: FileAttributes, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_RMDIR. + /// The status can be returned as Ok or as Err + #[allow(unused_variables)] + fn rmdir( + &mut self, + id: u32, + path: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_REALPATH. + /// Must contain only one name and a dummy attributes + #[allow(unused_variables)] + fn realpath( + &mut self, + id: u32, + path: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_STAT + #[allow(unused_variables)] + fn stat( + &mut self, + id: u32, + path: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_RENAME. + /// The status can be returned as Ok or as Err + #[allow(unused_variables)] + fn rename( + &mut self, + id: u32, + oldpath: String, + newpath: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_READLINK + #[allow(unused_variables)] + fn readlink( + &mut self, + id: u32, + path: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_SYMLINK. + /// The status can be returned as Ok or as Err + #[allow(unused_variables)] + fn symlink( + &mut self, + id: u32, + linkpath: String, + targetpath: String, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } + + /// Called on SSH_FXP_EXTENDED. + /// The extension can return any packet, so it's not specific. + /// If the server does not recognize the `request' name + /// the server must respond with an SSH_FX_OP_UNSUPPORTED error + #[allow(unused_variables)] + fn extended( + &mut self, + id: u32, + request: String, + data: Vec, + ) -> impl Future> + Send { + let err = self.unimplemented(); + async { Err(err) } + } +} diff --git a/crates/bssh-russh-sftp/src/server/mod.rs b/crates/bssh-russh-sftp/src/server/mod.rs new file mode 100644 index 00000000..6228ed44 --- /dev/null +++ b/crates/bssh-russh-sftp/src/server/mod.rs @@ -0,0 +1,90 @@ +mod handler; + +use bytes::Bytes; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; + +pub use self::handler::Handler; + +use crate::{ + error::Error, + protocol::{Packet, StatusCode}, + utils::read_packet, +}; + +macro_rules! into_wrap { + ($id:expr, $handler:expr, $var:ident; $($arg:ident),*) => { + match $handler.$var($($var.$arg),*).await { + Err(err) => Packet::error($id, err.into()), + Ok(packet) => packet.into(), + } + }; +} + +async fn process_request(packet: Packet, handler: &mut H) -> Packet +where + H: Handler + Send, +{ + let id = packet.get_request_id(); + + match packet { + Packet::Init(init) => into_wrap!(id, handler, init; version, extensions), + Packet::Open(open) => into_wrap!(id, handler, open; id, filename, pflags, attrs), + Packet::Close(close) => into_wrap!(id, handler, close; id, handle), + Packet::Read(read) => into_wrap!(id, handler, read; id, handle, offset, len), + Packet::Write(write) => into_wrap!(id, handler, write; id, handle, offset, data), + Packet::Lstat(lstat) => into_wrap!(id, handler, lstat; id, path), + Packet::Fstat(fstat) => into_wrap!(id, handler, fstat; id, handle), + Packet::SetStat(setstat) => into_wrap!(id, handler, setstat; id, path, attrs), + Packet::FSetStat(fsetstat) => into_wrap!(id, handler, fsetstat; id, handle, attrs), + Packet::OpenDir(opendir) => into_wrap!(id, handler, opendir; id, path), + Packet::ReadDir(readdir) => into_wrap!(id, handler, readdir; id, handle), + Packet::Remove(remove) => into_wrap!(id, handler, remove; id, filename), + Packet::MkDir(mkdir) => into_wrap!(id, handler, mkdir; id, path, attrs), + Packet::RmDir(rmdir) => into_wrap!(id, handler, rmdir; id, path), + Packet::RealPath(realpath) => into_wrap!(id, handler, realpath; id, path), + Packet::Stat(stat) => into_wrap!(id, handler, stat; id, path), + Packet::Rename(rename) => into_wrap!(id, handler, rename; id, oldpath, newpath), + Packet::ReadLink(readlink) => into_wrap!(id, handler, readlink; id, path), + Packet::Symlink(symlink) => into_wrap!(id, handler, symlink; id, linkpath, targetpath), + Packet::Extended(extended) => into_wrap!(id, handler, extended; id, request, data), + _ => Packet::error(0, StatusCode::BadMessage), + } +} + +async fn process_handler(stream: &mut S, handler: &mut H) -> Result<(), Error> +where + H: Handler + Send, + S: AsyncRead + AsyncWrite + Unpin, +{ + let mut bytes = read_packet(stream).await?; + + let response = match Packet::try_from(&mut bytes) { + Ok(request) => process_request(request, handler).await, + Err(_) => Packet::error(0, StatusCode::BadMessage), + }; + + let packet = Bytes::try_from(response)?; + stream.write_all(&packet).await?; + stream.flush().await?; + + Ok(()) +} + +/// Run processing stream as SFTP +pub async fn run(mut stream: S, mut handler: H) +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + H: Handler + Send + 'static, +{ + tokio::spawn(async move { + loop { + match process_handler(&mut stream, &mut handler).await { + Err(Error::UnexpectedEof) => break, + Err(err) => warn!("{}", err), + Ok(_) => (), + } + } + + debug!("sftp stream ended"); + }); +} diff --git a/crates/bssh-russh-sftp/src/utils.rs b/crates/bssh-russh-sftp/src/utils.rs new file mode 100644 index 00000000..783cdf14 --- /dev/null +++ b/crates/bssh-russh-sftp/src/utils.rs @@ -0,0 +1,19 @@ +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use std::time::SystemTime; +use tokio::io::{AsyncRead, AsyncReadExt}; + +use crate::error::Error; + +pub fn unix(time: SystemTime) -> u32 { + DateTime::::from(time).timestamp() as u32 +} + +pub async fn read_packet(stream: &mut S) -> Result { + let length = stream.read_u32().await?; + + let mut buf = vec![0; length as usize]; + stream.read_exact(&mut buf).await?; + + Ok(Bytes::from(buf)) +} diff --git a/crates/bssh-russh-sftp/sync-upstream.sh b/crates/bssh-russh-sftp/sync-upstream.sh new file mode 100755 index 00000000..96741fc7 --- /dev/null +++ b/crates/bssh-russh-sftp/sync-upstream.sh @@ -0,0 +1,114 @@ +#!/bin/bash +# sync-upstream.sh +# Syncs bssh-russh-sftp with upstream russh-sftp and applies our patches. +# +# Usage: ./sync-upstream.sh [version] +# version: optional, e.g., "2.1.1" or "master" (default: latest tag) + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +UPSTREAM_URL="https://github.com/AspectUnk/russh-sftp.git" +TEMP_DIR="/tmp/russh-sftp-sync-$$" +PATCH_FILE="$SCRIPT_DIR/patches/sftp-serde-bytes-perf.patch" + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +log_info() { echo -e "${GREEN}[INFO]${NC} $1"; } +log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; } +log_error() { echo -e "${RED}[ERROR]${NC} $1"; } + +cleanup() { + if [ -d "$TEMP_DIR" ]; then + rm -rf "$TEMP_DIR" + fi +} +trap cleanup EXIT + +VERSION="${1:-}" + +log_info "Syncing bssh-russh-sftp with upstream russh-sftp..." + +log_info "Cloning upstream russh-sftp..." +git clone "$UPSTREAM_URL" "$TEMP_DIR" + +cd "$TEMP_DIR" + +if [ -z "$VERSION" ]; then + VERSION=$(git describe --tags --abbrev=0 2>/dev/null || echo "master") + log_info "Using latest tag: $VERSION" +elif [ "$VERSION" != "master" ]; then + log_info "Using specified version: $VERSION" +fi + +if [ "$VERSION" != "master" ]; then + git checkout "v$VERSION" 2>/dev/null || git checkout "$VERSION" +fi + +COMMIT_HASH=$(git rev-parse --short HEAD) +log_info "Upstream commit: $COMMIT_HASH" + +log_info "Copying source files..." +cd "$SCRIPT_DIR" + +cp Cargo.toml Cargo.toml.bak +cp README.md README.md.bak 2>/dev/null || true + +find src -type f -name "*.rs" -delete 2>/dev/null || true + +cp -r "$TEMP_DIR/src/"* src/ + +mv Cargo.toml.bak Cargo.toml +mv README.md.bak README.md 2>/dev/null || true + +if [ "$VERSION" != "master" ]; then + CLEAN_VERSION="${VERSION#v}" + if [[ "$OSTYPE" == "darwin"* ]]; then + sed -i '' "s/^version = \".*\"/version = \"$CLEAN_VERSION\"/" Cargo.toml + else + sed -i "s/^version = \".*\"/version = \"$CLEAN_VERSION\"/" Cargo.toml + fi + log_info "Updated version to $CLEAN_VERSION" +fi + +log_info "Applying patches..." + +if [ -f "$PATCH_FILE" ]; then + if patch -p1 --dry-run < "$PATCH_FILE" > /dev/null 2>&1; then + patch -p1 < "$PATCH_FILE" + log_info "Applied sftp-serde-bytes-perf.patch" + else + log_warn "Patch may not apply cleanly, attempting with fuzz..." + if patch -p1 --fuzz=3 < "$PATCH_FILE"; then + log_warn "Patch applied with fuzz - please verify manually" + else + log_error "Failed to apply patch. Manual intervention required." + log_error "Patch file: $PATCH_FILE" + exit 1 + fi + fi +else + log_error "Patch file not found: $PATCH_FILE" + log_error "Please create the patch file first using: ./create-patch.sh" + exit 1 +fi + +log_info "Verifying build..." +cd "$SCRIPT_DIR/../.." +if cargo check -p bssh-russh-sftp 2>/dev/null; then + log_info "Build verification passed" +else + log_error "Build verification failed" + exit 1 +fi + +log_info "Sync complete!" +log_info "Upstream version: $VERSION ($COMMIT_HASH)" +log_info "" +log_info "Next steps:" +log_info " 1. Review changes: git diff crates/bssh-russh-sftp/" +log_info " 2. Test: cargo test -p bssh-russh-sftp" +log_info " 3. Commit: git add -A && git commit -m 'chore: sync bssh-russh-sftp with upstream $VERSION'"