Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/commands/channels/annotations/get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ export default class ChannelsAnnotationsGet extends AblyBaseCommand {
}),
};

static override description = "Get annotations for a channel message";
static override description =
"List individual annotation events for a channel message (paginated event stream, not the rolled-up summary)";

static override examples = [
'$ ably channels annotations get my-channel "01234567890:0"',
Expand Down
134 changes: 134 additions & 0 deletions src/commands/channels/get-message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import { Args, Flags } from "@oclif/core";
import * as Ably from "ably";

import { AblyBaseCommand } from "../../base-command.js";
import { productApiFlags } from "../../flags.js";
import {
formatMessageTimestamp,
formatMessagesOutput,
formatResource,
} from "../../utils/output.js";
import type { MessageDisplayFields } from "../../utils/output.js";

export default class ChannelsGetMessage extends AblyBaseCommand {
static override args = {
channelName: Args.string({
description: "The channel name",
required: true,
}),
messageSerial: Args.string({
description: "The serial of the message to retrieve",
required: true,
}),
};

static override description =
"Get the latest version of a message on an Ably channel";

static override examples = [
'$ ably channels get-message my-channel "01234567890:0"',
'$ ably channels get-message my-channel "01234567890:0" --json',
'$ ably channels get-message my-channel "01234567890:0" --pretty-json',
'$ ably channels get-message my-channel "01234567890:0" --cipher YOUR_CIPHER_KEY',
];

static override flags = {
...productApiFlags,
cipher: Flags.string({
description:
"Decryption key for encrypted messages (base64-encoded or hex-encoded, supports AES-128-CBC and AES-256-CBC)",
}),
};

async run(): Promise<void> {
const { args, flags } = await this.parse(ChannelsGetMessage);
const channelName = args.channelName;
const serial = args.messageSerial;

try {
const rest = await this.createAblyRestClient(flags);
if (!rest) return;

const channelOptions: Ably.ChannelOptions = {};
if (flags.cipher) {
channelOptions.cipher = { key: flags.cipher };
}

const channel = rest.channels.get(channelName, channelOptions);

this.logProgress(
`Fetching message ${formatResource(serial)} on channel ${formatResource(channelName)}`,
flags,
);

const message = await channel.getMessage(serial);

const tracePayload = {
id: message.id,
timestamp: formatMessageTimestamp(message.timestamp),
channel: channelName,
event: message.name || undefined,
clientId: message.clientId,
connectionId: message.connectionId,
data: message.data as unknown,
encoding: message.encoding,
extras: message.extras as unknown,
action:
message.action === undefined ? undefined : String(message.action),
serial: message.serial,
version: message.version,
annotations: message.annotations,
};
this.logCliEvent(
flags,
"channelGetMessage",
"messageRetrieved",
`Retrieved message ${message.serial ?? serial} on channel ${channelName}`,
tracePayload,
);

if (this.shouldOutputJson(flags)) {
this.logJsonResult(
{
message: {
...message,
// Stringify action for predictable JSON typing across commands
// (matches `channels subscribe`'s explicit normalisation).
action:
message.action === undefined
? undefined
: String(message.action),
// Nullish-aware: a legitimate epoch-zero timestamp must not be
// dropped to undefined.
timestamp:
message.timestamp == null
? undefined
: new Date(message.timestamp).toISOString(),
},
},
flags,
);
Comment thread
sacOO7 marked this conversation as resolved.
Comment thread
sacOO7 marked this conversation as resolved.
} else {
const display: MessageDisplayFields = {
action:
message.action === undefined ? undefined : String(message.action),
channel: channelName,
clientId: message.clientId,
data: message.data,
event: message.name || undefined,
id: message.id,
serial: message.serial,
timestamp: message.timestamp ?? Date.now(),
version: message.version,
annotations: message.annotations,
};
this.log(formatMessagesOutput([display]));
}
} catch (error) {
this.fail(error, flags, "channelGetMessage", {
channel: channelName,
serial,
});
}
}
}
9 changes: 6 additions & 3 deletions src/utils/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,19 @@ export function formatMessagesOutput(messages: MessageDisplayFields[]): string {
}

if (msg.annotations && Object.keys(msg.annotations.summary).length > 0) {
lines.push(`${formatLabel("Annotations")}`);
lines.push(
`${formatLabel("Annotations")}`,
` ${formatLabel("Summary")}`,
);
for (const [annotationType, value] of Object.entries(
msg.annotations.summary,
)) {
const formattedValue = formatMessageData(value)
.split("\n")
.map((line) => ` ${line}`)
.map((line) => ` ${line}`)
.join("\n");

lines.push(` ${formatLabel(annotationType)}`, formattedValue);
lines.push(` ${formatLabel(annotationType)}`, formattedValue);
}
}

Expand Down
158 changes: 158 additions & 0 deletions test/e2e/channels/channel-message-ops-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,164 @@ describe.skipIf(SHOULD_SKIP_E2E || SHOULD_SKIP_MUTABLE_TESTS)(
},
);

it(
"should retrieve a message via channels get-message",
{ timeout: 60000 },
async () => {
setupTestFailureHandler(
"should retrieve a message via channels get-message",
);

// Use a fresh channel/serial so we don't see updates from other tests
const getChannel = getMutableChannelName("msg-get");
const serial = await publishAndGetSerial(getChannel, "fresh-message");

const result = await runCommand(
["channels", "get-message", getChannel, serial, "--json"],
{
env: { ABLY_API_KEY: E2E_API_KEY || "" },
timeoutMs: 30000,
},
);

expect(result.exitCode).toBe(0);

const records = parseNdjsonLines(result.stdout);
const parsed = records.find((r) => r.type === "result") ?? records[0];
expect(parsed.success).toBe(true);
expect(parsed.message).toBeDefined();
const message = parsed.message as Record<string, unknown>;
expect(message.serial).toBe(serial);
expect(message.data).toBe("fresh-message");
// Timestamp must be ISO 8601 (history-style normalisation)
expect(message.timestamp).toMatch(
/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/,
);
},
);

it(
"should return the latest version after an update via channels get-message",
{ timeout: 60000 },
async () => {
setupTestFailureHandler(
"should return the latest version after an update via channels get-message",
);

// Publish, update, then verify get-message returns the updated payload
const updateChannel = getMutableChannelName("msg-get-after-update");
const serial = await publishAndGetSerial(updateChannel, "original");

const updateResult = await runCommand(
[
"channels",
"update",
updateChannel,
serial,
"edited-text",
"--json",
],
{
env: { ABLY_API_KEY: E2E_API_KEY || "" },
timeoutMs: 30000,
},
);
expect(updateResult.exitCode).toBe(0);

// Retry get-message — update is eventually consistent
let latestMessage: Record<string, unknown> | undefined;
for (let attempt = 0; attempt < 10; attempt++) {
const getResult = await runCommand(
["channels", "get-message", updateChannel, serial, "--json"],
{
env: { ABLY_API_KEY: E2E_API_KEY || "" },
timeoutMs: 30000,
},
);
if (getResult.exitCode === 0) {
const records = parseNdjsonLines(getResult.stdout);
const parsed =
records.find((r) => r.type === "result") ?? records[0];
latestMessage = parsed.message as
| Record<string, unknown>
| undefined;
if (latestMessage?.data === "edited-text") break;
}
await new Promise((resolve) => setTimeout(resolve, 1000));
}

expect(latestMessage).toBeDefined();
expect(latestMessage!.data).toBe("edited-text");
// The action must reflect that this is an update, not the original create
expect(latestMessage!.action).toBe("message.update");
// The version block must be populated and differ from the message serial
expect(latestMessage!.version).toBeDefined();
const version = latestMessage!.version as Record<string, unknown>;
expect(version.serial).toBeDefined();
expect(version.serial).not.toBe(serial);
},
);

it(
"should render human-readable output without --json",
{ timeout: 60000 },
async () => {
setupTestFailureHandler(
"should render human-readable output without --json",
);

const humanChannel = getMutableChannelName("msg-get-human");
const serial = await publishAndGetSerial(humanChannel, "human-text");

const result = await runCommand(
["channels", "get-message", humanChannel, serial],
{
env: { ABLY_API_KEY: E2E_API_KEY || "" },
timeoutMs: 30000,
},
);

expect(result.exitCode).toBe(0);
// Field labels rendered by formatMessagesOutput must appear
expect(result.stdout).toContain("Channel");
expect(result.stdout).toContain("Serial");
expect(result.stdout).toContain(serial);
expect(result.stdout).toContain("Data");
expect(result.stdout).toContain("human-text");
},
);

it(
"should fail with a non-zero exit code for an unknown serial",
{ timeout: 60000 },
async () => {
setupTestFailureHandler(
"should fail with a non-zero exit code for an unknown serial",
);

const result = await runCommand(
[
"channels",
"get-message",
channelName,
"0000000000-000@deadbeef:000",
"--json",
],
{
env: { ABLY_API_KEY: E2E_API_KEY || "" },
timeoutMs: 30000,
},
);

expect(result.exitCode).not.toBe(0);

const records = parseNdjsonLines(result.stdout);
const errorRecord = records.find((r) => r.type === "error");
expect(errorRecord).toBeDefined();
expect(errorRecord!.success).toBe(false);
},
);

it(
"should delete a message via channels delete",
{ timeout: 60000 },
Expand Down
24 changes: 24 additions & 0 deletions test/helpers/mock-ably-rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export interface MockRestChannel {
publish: Mock;
history: Mock;
status: Mock;
getMessage: Mock;
updateMessage: Mock;
deleteMessage: Mock;
appendMessage: Mock;
Expand Down Expand Up @@ -142,6 +143,29 @@ function createMockRestChannel(name: string): MockRestChannel {
name,
publish: vi.fn().mockResolvedValue({ serials: ["mock-serial-001"] }),
history: vi.fn().mockResolvedValue(createMockPaginatedResult([])),
getMessage: vi.fn().mockResolvedValue({
id: "mock-message-id",
name: "mock-event",
data: { hello: "world" },
encoding: "json",
extras: { headers: { foo: "bar" } },
serial: "mock-serial-001",
timestamp: 1_700_000_000_000,
clientId: "mock-client",
connectionId: "mock-connection",
action: "message.update",
version: {
serial: "mock-serial-001@v2",
timestamp: 1_700_000_001_000,
clientId: "mock-editor",
description: "Fixed typo",
},
annotations: {
summary: {
"reaction:distinct.v1": { unique: 3 },
},
},
}),
updateMessage: vi
.fn()
.mockResolvedValue({ versionSerial: "mock-version-serial-update" }),
Expand Down
Loading
Loading