Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 159 additions & 139 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ edition = "2024"

[dependencies]
bytes = "1.11.1"
tokio = { version = "1.51.1", features = ["full"] }
tokio = { version = "1.52.1", features = ["full"] }
# Use our internal russh fork with session loop fixes
# - Development: uses local path (crates/bssh-russh)
# - Publishing: uses crates.io version (path ignored)
russh = { package = "bssh-russh", version = "0.60.1", path = "crates/bssh-russh" }
russh-sftp = "2.1.1"
clap = { version = "4.6.0", features = ["derive", "env"] }
clap = { version = "4.6.1", features = ["derive", "env"] }
anyhow = "1.0.102"
thiserror = "2.0.18"
tracing = "0.1.43"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_yaml = "0.9"
Expand Down Expand Up @@ -57,10 +57,10 @@ nix = { version = "0.31", features = ["fs", "poll", "process", "signal", "term"]
atty = "0.2.14"
arrayvec = "0.7.6"
smallvec = "1.15.1"
lru = "0.16.2"
uuid = { version = "1.23.0", features = ["v4"] }
lru = "0.17.0"
uuid = { version = "1.23.1", features = ["v4"] }
fastrand = "2.4.1"
tokio-util = "0.7.17"
tokio-util = "0.7.18"
socket2 = "0.6"
shell-words = "1.1.1"
libc = "0.2"
Expand Down
18 changes: 10 additions & 8 deletions crates/bssh-russh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ serde = ["ssh-key/serde"]
[dependencies]
aes = "0.8"
async-trait = { version = "0.1.50", optional = true }
aws-lc-rs = { version = "1.16.2", optional = true }
aws-lc-rs = { version = "1.16.3", optional = true }
bitflags = "2.0"
block-padding = { version = "0.3", features = ["std"] }
byteorder = "1.4"
Expand All @@ -41,9 +41,9 @@ delegate = "0.13"
digest = "0.10"
der = "0.8"
des = { version = "0.8.1", optional = true }
ecdsa = "0.17.0-rc.16"
ecdsa = "0.17.0-rc.17"
ed25519-dalek = { version = "3.0.0-pre.6", features = ["alloc", "rand_core", "pkcs8"] }
elliptic-curve = { version = "0.14.0-rc.30", features = ["ecdh"] }
elliptic-curve = { version = "0.14.0-rc.31", features = ["ecdh"] }
enum_dispatch = "0.3.13"
flate2 = { version = "1.0.15", optional = true }
futures = "0.3"
Expand All @@ -59,12 +59,14 @@ module-lattice = "0.2"
# num-bigint 0.4.x only supports rand 0.8; upstream russh ships a fork that
# adds rand 0.10 support via the `rand_0_10` feature flag.
num-bigint = { package = "internal-russh-num-bigint", version = "=0.5.0", features = ["rand_0_10"] }
p256 = { version = "0.14.0-rc.8", features = ["ecdh"] }
p384 = { version = "0.14.0-rc.8", features = ["ecdh"] }
p521 = { version = "0.14.0-rc.8", features = ["ecdh"] }
p256 = { version = "0.14.0-rc.9", features = ["ecdh"] }
p384 = { version = "0.14.0-rc.9", features = ["ecdh"] }
p521 = { version = "0.14.0-rc.9", features = ["ecdh"] }
pbkdf2 = "0.12"
pkcs1 = { version = "0.8.0-rc.4", optional = true }
pkcs5 = "0.8.0-rc.13"
# Pinned: pkcs8 0.11.0-rc.11 still calls the rc.13-era `Parameters::recommended` API.
# Stable 0.8.0 renamed it to `generate_recommended` and breaks pkcs8.
pkcs5 = "=0.8.0-rc.13"
pkcs8 = { version = "0.11.0-rc.11", features = ["encryption", "std"] }
polyval = "0.7.1"
rand_core = { version = "0.10.0" }
Expand All @@ -79,7 +81,7 @@ spki = "0.8.0"
ssh-encoding = { version = "0.2", features = ["bytes"] }
subtle = "2.4"
thiserror = "2.0.18"
tokio = { version = "1.51.1", features = ["io-util", "sync", "time", "rt-multi-thread", "net"] }
tokio = { version = "1.52.1", features = ["io-util", "sync", "time", "rt-multi-thread", "net"] }
typenum = "1.17"
universal-hash = "0.6.1"
yasna = { version = "0.5.0", features = ["bit-vec", "num-bigint"], optional = true }
Expand Down
88 changes: 88 additions & 0 deletions crates/bssh-russh/patches/channel-write-ordering.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
--- a/src/session.rs
+++ b/src/session.rs
@@ -449,7 +449,7 @@
let buf0 = buf0.into();
if let Some(channel) = self.channels.get_mut(&channel) {
assert!(channel.confirmed);
- if !channel.pending_data.is_empty() && is_rekeying {
+ if !channel.pending_data.is_empty() || is_rekeying {
channel.pending_data.push_back((buf0, None, 0));
return Ok(());
}
@@ -473,7 +473,7 @@
let buf0 = buf0.into();
if let Some(channel) = self.channels.get_mut(&channel) {
assert!(channel.confirmed);
- if !channel.pending_data.is_empty() && is_rekeying {
+ if !channel.pending_data.is_empty() || is_rekeying {
channel.pending_data.push_back((buf0, Some(ext), 0));
return Ok(());
}
@@ -836,4 +836,67 @@
1
);
}
+
+ #[test]
+ fn data_queues_behind_existing_pending_data_when_not_rekeying() {
+ let channel_id = ChannelId(5);
+ let mut encrypted = test_encrypted();
+ encrypted
+ .channels
+ .insert(channel_id, test_channel(channel_id, 42, false, false));
+
+ let channel = &encrypted.channels[&channel_id];
+ let initial_pending = channel.pending_data.len();
+ assert!(initial_pending > 0);
+ let initial_front = channel.pending_data.front().unwrap();
+ let initial_front_data = initial_front.0.to_vec();
+ let initial_front_ext = initial_front.1;
+
+ encrypted
+ .data(channel_id, Bytes::from_static(b"new"), false)
+ .unwrap();
+
+ let channel = &encrypted.channels[&channel_id];
+ assert_eq!(channel.pending_data.len(), initial_pending + 1);
+ assert_eq!(
+ channel.pending_data.front().unwrap().0.as_ref(),
+ initial_front_data.as_slice()
+ );
+ assert_eq!(channel.pending_data.front().unwrap().1, initial_front_ext);
+ assert_eq!(channel.pending_data.back().unwrap().0.as_ref(), b"new");
+ assert_eq!(channel.pending_data.back().unwrap().1, None);
+ assert!(encrypted.write.is_empty());
+ }
+
+ #[test]
+ fn extended_data_queues_behind_existing_pending_data_when_not_rekeying() {
+ let channel_id = ChannelId(6);
+ let ext = 1;
+ let mut encrypted = test_encrypted();
+ encrypted
+ .channels
+ .insert(channel_id, test_channel(channel_id, 42, false, false));
+
+ let channel = &encrypted.channels[&channel_id];
+ let initial_pending = channel.pending_data.len();
+ assert!(initial_pending > 0);
+ let initial_front = channel.pending_data.front().unwrap();
+ let initial_front_data = initial_front.0.to_vec();
+ let initial_front_ext = initial_front.1;
+
+ encrypted
+ .extended_data(channel_id, ext, Bytes::from_static(b"new"), false)
+ .unwrap();
+
+ let channel = &encrypted.channels[&channel_id];
+ assert_eq!(channel.pending_data.len(), initial_pending + 1);
+ assert_eq!(
+ channel.pending_data.front().unwrap().0.as_ref(),
+ initial_front_data.as_slice()
+ );
+ assert_eq!(channel.pending_data.front().unwrap().1, initial_front_ext);
+ assert_eq!(channel.pending_data.back().unwrap().0.as_ref(), b"new");
+ assert_eq!(channel.pending_data.back().unwrap().1, Some(ext));
+ assert!(encrypted.write.is_empty());
+ }
}
194 changes: 192 additions & 2 deletions crates/bssh-russh/patches/handle-data-fix.patch
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
--- /tmp/russh-upstream-compare/russh/src/server/session.rs 2026-04-03 13:17:42
+++ /Users/inureyes/Development/backend.ai/bssh/crates/bssh-russh/src/server/session.rs 2026-04-03 13:20:54
--- a/src/server/session.rs
+++ b/src/server/session.rs
@@ -7,7 +7,7 @@
use log::debug;
use negotiation::parse_kex_algo_list;
Expand Down Expand Up @@ -146,3 +146,193 @@
tokio::select! {
r = &mut reading => {
let (stream_read, mut buffer, mut opening_cipher) = match r {
@@ -659,22 +789,20 @@
// data from it.
self.common.alive_timeouts = 0;
}
- if self.common.received_data || sent_keepalive {
- if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
+ if (self.common.received_data || sent_keepalive)
+ && let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
keepalive_timer.as_mut().as_pin_mut(),
self.common.config.keepalive_interval,
) {
sleep.as_mut().reset(tokio::time::Instant::now() + d);
}
- }
- if !sent_keepalive {
- if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
+ if !sent_keepalive
+ && let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
inactivity_timer.as_mut().as_pin_mut(),
self.common.config.inactivity_timeout,
) {
sleep.as_mut().reset(tokio::time::Instant::now() + d);
}
- }
}
debug!("disconnected");
// Shutdown
@@ -703,38 +831,35 @@
}

pub fn writable_packet_size(&self, channel: &ChannelId) -> u32 {
- if let Some(ref enc) = self.common.encrypted {
- if let Some(channel) = enc.channels.get(channel) {
+ if let Some(ref enc) = self.common.encrypted
+ && let Some(channel) = enc.channels.get(channel) {
return channel
.sender_window_size
.min(channel.sender_maximum_packet_size);
}
- }
0
}

pub fn window_size(&self, channel: &ChannelId) -> u32 {
- if let Some(ref enc) = self.common.encrypted {
- if let Some(channel) = enc.channels.get(channel) {
+ if let Some(ref enc) = self.common.encrypted
+ && let Some(channel) = enc.channels.get(channel) {
return channel.sender_window_size;
}
- }
0
}

pub fn max_packet_size(&self, channel: &ChannelId) -> u32 {
- if let Some(ref enc) = self.common.encrypted {
- if let Some(channel) = enc.channels.get(channel) {
+ if let Some(ref enc) = self.common.encrypted
+ && let Some(channel) = enc.channels.get(channel) {
return channel.sender_maximum_packet_size;
}
- }
0
}

/// Flush the session, i.e. encrypt the pending buffer.
pub fn flush(&mut self) -> Result<(), Error> {
- if let Some(ref mut enc) = self.common.encrypted {
- if enc.flush(
+ if let Some(ref mut enc) = self.common.encrypted
+ && enc.flush(
&self.common.config.as_ref().limits,
&mut self.common.packet_writer,
)? && self.kex == SessionKexState::Idle
@@ -744,7 +869,6 @@
self.begin_rekey()?;
}
}
- }
Ok(())
}

@@ -819,12 +943,11 @@
/// cancelling). Always call this function if the request was
/// successful (it checks whether the client expects an answer).
pub fn request_success(&mut self) {
- if self.common.wants_reply {
- if let Some(ref mut enc) = self.common.encrypted {
+ if self.common.wants_reply
+ && let Some(ref mut enc) = self.common.encrypted {
self.common.wants_reply = false;
push_packet!(enc.write, enc.write.push(msg::REQUEST_SUCCESS))
}
- }
}

/// Send a "failure" reply to a global request.
@@ -839,8 +962,8 @@
/// function if the request was successful (it checks whether the
/// client expects an answer).
pub fn channel_success(&mut self, channel: ChannelId) -> Result<(), crate::Error> {
- if let Some(ref mut enc) = self.common.encrypted {
- if let Some(channel) = enc.channels.get_mut(&channel) {
+ if let Some(ref mut enc) = self.common.encrypted
+ && let Some(channel) = enc.channels.get_mut(&channel) {
assert!(channel.confirmed);
if channel.wants_reply {
channel.wants_reply = false;
@@ -851,14 +974,13 @@
})
}
}
- }
Ok(())
}

/// Send a "failure" reply to a global request.
pub fn channel_failure(&mut self, channel: ChannelId) -> Result<(), crate::Error> {
- if let Some(ref mut enc) = self.common.encrypted {
- if let Some(channel) = enc.channels.get_mut(&channel) {
+ if let Some(ref mut enc) = self.common.encrypted
+ && let Some(channel) = enc.channels.get_mut(&channel) {
assert!(channel.confirmed);
if channel.wants_reply {
channel.wants_reply = false;
@@ -868,7 +990,6 @@
})
}
}
- }
Ok(())
}

@@ -951,8 +1072,8 @@
channel: ChannelId,
client_can_do: bool,
) -> Result<(), Error> {
- if let Some(ref mut enc) = self.common.encrypted {
- if let Some(channel) = enc.channels.get(&channel) {
+ if let Some(ref mut enc) = self.common.encrypted
+ && let Some(channel) = enc.channels.get(&channel) {
assert!(channel.confirmed);
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;
@@ -963,7 +1084,6 @@
(client_can_do as u8).encode(&mut enc.write)?;
})
}
- }
Ok(())
}

@@ -1003,8 +1123,8 @@
channel: ChannelId,
exit_status: u32,
) -> Result<(), Error> {
- if let Some(ref mut enc) = self.common.encrypted {
- if let Some(channel) = enc.channels.get(&channel) {
+ if let Some(ref mut enc) = self.common.encrypted
+ && let Some(channel) = enc.channels.get(&channel) {
assert!(channel.confirmed);
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;
@@ -1015,7 +1135,6 @@
exit_status.encode(&mut enc.write)?;
})
}
- }
Ok(())
}

@@ -1028,8 +1147,8 @@
error_message: &str,
language_tag: &str,
) -> Result<(), Error> {
- if let Some(ref mut enc) = self.common.encrypted {
- if let Some(channel) = enc.channels.get(&channel) {
+ if let Some(ref mut enc) = self.common.encrypted
+ && let Some(channel) = enc.channels.get(&channel) {
assert!(channel.confirmed);
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;
@@ -1043,7 +1162,6 @@
language_tag.encode(&mut enc.write)?;
})
}
- }
Ok(())
}

Loading
Loading