From 0e6276c4c9852381011ce9977f4bd91649f8446e Mon Sep 17 00:00:00 2001 From: Jim Clark Date: Thu, 6 Mar 2025 14:08:50 -0800 Subject: [PATCH] Python servers require the initialized notification --- runbook.md | 15 +--- src/docker.clj | 75 +++++++++++------ src/docker/main.clj | 2 +- src/extension/docker-compose.yaml | 2 +- src/jsonrpc/extras.clj | 45 +++++++++++ src/jsonrpc/logger.clj | 49 ++++++++++- src/jsonrpc/producer.clj | 39 ++++++++- src/jsonrpc/server.clj | 130 ++++-------------------------- src/mcp/client.clj | 78 ++++++++++++------ 9 files changed, 253 insertions(+), 182 deletions(-) create mode 100644 src/jsonrpc/extras.clj diff --git a/runbook.md b/runbook.md index 33040eb..fae8a69 100644 --- a/runbook.md +++ b/runbook.md @@ -1,19 +1,10 @@ -```sh -docker buildx build \ - --builder hydrobuild \ - --platform linux/amd64,linux/arm64 \ - --tag mcp/docker:0.0.1 \ - --file Dockerfile \ - --push . -docker pull mcp/docker:0.0.1 -``` ```sh cd src/extension && make build-extension ``` ```sh -docker extension update docker/labs-ai-tools-for-devs:0.2.7 +docker extension update docker/labs-ai-tools-for-devs:0.2.8 ``` ```sh @@ -32,10 +23,10 @@ docker pull mcp/docker:prerelease docker buildx build \ --builder hydrobuild \ --platform linux/amd64,linux/arm64 \ - --tag mcp/docker:0.0.1 \ + --tag mcp/docker:0.0.4 \ --file Dockerfile \ --push . -docker pull mcp/docker:0.0.1 +docker pull mcp/docker:0.0.4 ``` ```sh diff --git a/src/docker.clj b/src/docker.clj index 68212f6..d24468c 100644 --- a/src/docker.clj +++ b/src/docker.clj @@ -283,26 +283,6 @@ (def pull (comp (status? 200 "pull-image") pull-image)) (def images (comp ->json list-images)) -(defn injected-entrypoint [secrets s] - (format "%s ; %s" - (->> secrets - (map (fn [[k v]] - (format "export %s=$(cat /secret/%s | sed -e \"s/^[[:space:]]*//\")" v (name k)))) - (string/join " ; ")) - s)) - -(defn inject-secret-transform [container-definition] - (let [{:keys [Entrypoint Cmd]} - (-> - (image-inspect - (-> (images {"reference" [(:image container-definition)]}) - first)) - :Config) - real-entrypoint (string/join " " (concat Entrypoint (or (:command container-definition) Cmd)))] - (-> container-definition - (assoc :entrypoint ["/bin/sh" "-c" (injected-entrypoint (:secrets container-definition) real-entrypoint)]) - (dissoc :command)))) - (defn add-latest [image] (let [[_ tag] (re-find #".*(:.*)$" image)] (if tag @@ -331,11 +311,56 @@ (and digest (= digest Id)))) (images {})))) +(defn check-then-pull [container-definition] + (when (not (has-image? (:image container-definition))) + (-pull container-definition))) + +(defn injected-entrypoint [secrets environment s] + (->> (concat + (let [s (->> secrets + (map (fn [[k v]] + (format "export %s=$(cat /secret/%s | sed -e \"s/^[[:space:]]*//\")" v (name k)))) + (string/join " ; "))] + (when (and s (not (= "" s))) + [s])) + (let [env (->> environment + (map (fn [s] (when-let [[_ k v] (and s (re-find #"(.*)=(.*)" s))] + [k v]))) + (filter identity) + (map (fn [[k v]] + (format "export %s=%s" k v))) + (string/join " ; "))] + (when (and env (not (= "" env))) + [env])) + [s]) + (string/join " ; "))) + +(comment + (injected-entrypoint {:a "A"} ["BLAH=whatever"] "my command") + (injected-entrypoint nil nil "my command") + (injected-entrypoint {:a "A"} nil "my command") + (injected-entrypoint nil nil nil) + ) + +(defn inject-secret-transform [container-definition] + (check-then-pull container-definition) + (let [{:keys [Entrypoint Cmd Env]} + (-> + (image-inspect + (-> (images {"reference" [(:image container-definition)]}) + first)) + :Config) + real-entrypoint (string/join " " (concat + (or (:entrypoint container-definition) Entrypoint) + (or (:command container-definition) Cmd)))] + (-> container-definition + (assoc :entrypoint ["/bin/sh" "-c" (injected-entrypoint (:secrets container-definition) Env real-entrypoint)]) + (dissoc :command)))) + (defn run-streaming-function-with-no-stdin "run container function with no stdin, and no timeout, but streaming stdout" [m cb] - (when (not (has-image? (:image m))) - (-pull m)) + (check-then-pull m) (let [x (-> m (update :opts (fnil merge {}) @@ -377,8 +402,7 @@ (defn run-background-function "run container function with no stdin, and no streaming output" [m] - (when (not (has-image? (:image m))) - (-pull m)) + (check-then-pull m) (let [x (create m)] (start x) (shutdown/schedule-container-shutdown @@ -391,8 +415,7 @@ (defn run-function "run container function with no stdin, and no streaming output" [{:keys [timeout] :or {timeout 600000} :as m}] - (when (not (has-image? (:image m))) - (-pull m)) + (check-then-pull m) (let [x (create m) finished-channel (async/promise-chan)] (start x) diff --git a/src/docker/main.clj b/src/docker/main.clj index 9d45fa5..735431e 100644 --- a/src/docker/main.clj +++ b/src/docker/main.clj @@ -147,7 +147,7 @@ (fn [m] (update-in m [:prompts] (fn [coll] (remove (fn [{:keys [type]}] (= type (second args))) coll)))))) "run" (fn [] - (logger/setup (jsonrpc.server/->TimbreLogger)) + (logger/setup (jsonrpc.logger/->TimbreLogger)) (let [[in send] (let [[[w c] in] (user-loop/create-pipe)] diff --git a/src/extension/docker-compose.yaml b/src/extension/docker-compose.yaml index d23ffec..526be84 100644 --- a/src/extension/docker-compose.yaml +++ b/src/extension/docker-compose.yaml @@ -1,6 +1,6 @@ services: mcp_docker: - image: mcp/docker:0.0.3 + image: mcp/docker:0.0.4 ports: - 8811:8811 volumes: diff --git a/src/jsonrpc/extras.clj b/src/jsonrpc/extras.clj new file mode 100644 index 0000000..89d2825 --- /dev/null +++ b/src/jsonrpc/extras.clj @@ -0,0 +1,45 @@ +(ns jsonrpc.extras + (:require + [clojure.core.async :as async] + git + graph + [jsonrpc.logger :as logger] + [lsp4clj.server :as lsp.server] + [promesa.core :as p] + state + volumes)) + +(defmethod lsp.server/receive-request "docker/prompts/register" [_ {:keys [db* id]} params] + (logger/info "docker/prompts/register")) + +(defmethod lsp.server/receive-request "docker/prompts/run" + [_ {:keys [db* id] :as components} {:keys [thread-id] {:keys [file content uri]} :prompts :as params}] + (lsp.server/discarding-stdout + (let [conversation-id (str (java.util.UUID/randomUUID)) + prompt-string (cond + file (slurp file) + content content + uri (slurp (git/prompt-file uri)))] + (swap! db* update-in [:mcp/conversations] (fnil assoc {}) conversation-id + {:state-promise + (p/create + (fn [resolve reject] + (resolve + (async/ {} + (assoc-in [:opts :conversation-id] conversation-id) + (assoc-in [:opts :thread-id] thread-id) + (assoc-in [:opts :prompt-content] prompt-string) + (state/construct-initial-state-from-prompts))] + (graph/stream + (if (-> m :metadata :agent) + ((graph/require-graph (-> m :metadata :agent)) m) + (graph/chat-with-tools m)) + m))) + (if thread-id + {:thread-id thread-id :save-thread-volume false} + {}))))))}) + {:conversation-id conversation-id}))) + diff --git a/src/jsonrpc/logger.clj b/src/jsonrpc/logger.clj index 90fc2dc..32a6009 100644 --- a/src/jsonrpc/logger.clj +++ b/src/jsonrpc/logger.clj @@ -1,4 +1,8 @@ -(ns jsonrpc.logger) +(ns jsonrpc.logger + (:require + [babashka.fs :as fs] + [taoensso.timbre :as timbre] + [taoensso.timbre.appenders.core :as appenders])) (defprotocol ILogger (setup [this]) @@ -53,3 +57,46 @@ (defn trace [x] (info "trace " x) x) + +(defn log! [level args fmeta] + (timbre/log! level :p args {:?line (:line fmeta) + :?file (:file fmeta) + :?ns-str (:ns-str fmeta)})) + +(defn decide-log-path [] + (let [prompts-dir (fs/file "/prompts")] + (if (fs/exists? prompts-dir) + (do + (fs/create-dirs (fs/file prompts-dir "log")) + (fs/file prompts-dir "log/docker-mcp-server.out")) + (do + (fs/create-dirs (fs/file "./log")) + (fs/file "./log/docker-mcp-server.out"))))) + +(defrecord TimbreLogger [] + ILogger + (setup [this] + (let [log-path (str (decide-log-path))] + (timbre/merge-config! {:middleware [#(assoc % :hostname_ "")] + :appenders {:println {:enabled? false} + :spit (appenders/spit-appender {:fname log-path})}}) + (timbre/handle-uncaught-jvm-exceptions!) + (set-logger! this) + log-path)) + + (set-log-path [_this log-path] + (timbre/merge-config! {:appenders {:spit (appenders/spit-appender {:fname log-path})}})) + + (-info [_this fmeta arg1] (log! :info [arg1] fmeta)) + (-info [_this fmeta arg1 arg2] (log! :info [arg1 arg2] fmeta)) + (-info [_this fmeta arg1 arg2 arg3] (log! :info [arg1 arg2 arg3] fmeta)) + (-warn [_this fmeta arg1] (log! :warn [arg1] fmeta)) + (-warn [_this fmeta arg1 arg2] (log! :warn [arg1 arg2] fmeta)) + (-warn [_this fmeta arg1 arg2 arg3] (log! :warn [arg1 arg2 arg3] fmeta)) + (-error [_this fmeta arg1] (log! :error [arg1] fmeta)) + (-error [_this fmeta arg1 arg2] (log! :error [arg1 arg2] fmeta)) + (-error [_this fmeta arg1 arg2 arg3] (log! :error [arg1 arg2 arg3] fmeta)) + (-debug [_this fmeta arg1] (log! :debug [arg1] fmeta)) + (-debug [_this fmeta arg1 arg2] (log! :debug [arg1 arg2] fmeta)) + (-debug [_this fmeta arg1 arg2 arg3] (log! :debug [arg1 arg2 arg3] fmeta))) + diff --git a/src/jsonrpc/producer.clj b/src/jsonrpc/producer.clj index 30065db..6304fd9 100644 --- a/src/jsonrpc/producer.clj +++ b/src/jsonrpc/producer.clj @@ -1,4 +1,7 @@ -(ns jsonrpc.producer) +(ns jsonrpc.producer + (:require + [jsonrpc.logger :as logger] + [lsp4clj.server :as lsp.server])) (defprotocol IProducer (publish-exit [this params]) @@ -10,3 +13,37 @@ (publish-tool-list-changed [this params]) (publish-docker-notify [this method params])) +(defrecord McpProducer + [server db*] + IProducer + + (publish-exit [_this p] + (logger/info "publish-exit " p) + (lsp.server/discarding-stdout + (->> p (lsp.server/send-notification server "$/exit")))) + ; params is a map of progressToken, progress, and total + (publish-progress [_this params] + (lsp.server/discarding-stdout + (->> params (lsp.server/send-notification server "notifications/progress")))) + ; params is a map of level, logger, data + ; level is debug info notice warning error critical alert emergency + (publish-log [_this params] + (->> params (lsp.server/send-notification server "notifications/message"))) + + (publish-prompt-list-changed [_ params] + (logger/info "send prompt list changed") + (->> params (lsp.server/send-notification server "notifications/prompts/list_changed"))) + + (publish-resource-list-changed [_ params] + (logger/info "send resource list changed") + (->> params (lsp.server/send-notification server "notifications/resources/list_changed"))) + + (publish-resource-updated [_ params] + (->> params (lsp.server/send-notification server "notifications/resources/updated"))) + + (publish-tool-list-changed [_ params] + (logger/info "send tool list changed") + (->> params (lsp.server/send-notification server "notifications/tools/list_changed"))) + (publish-docker-notify [_ method params] + (logger/info (format "%s - %s" method params)) + (lsp.server/send-notification server method params))) diff --git a/src/jsonrpc/server.clj b/src/jsonrpc/server.clj index f14ec4e..f6560a8 100644 --- a/src/jsonrpc/server.clj +++ b/src/jsonrpc/server.clj @@ -7,6 +7,7 @@ [clojure.core.async :as async] [clojure.pprint :as pprint] [clojure.string :as string] + [jsonrpc.extras] docker git graph @@ -23,8 +24,6 @@ [promesa.core :as p] shutdown state - [taoensso.timbre :as timbre] - [taoensso.timbre.appenders.core :as appenders] tools user-loop volumes) @@ -54,11 +53,6 @@ (logger/info (str level (apply str args))) #_(timbre/log! level :p args)) -(defn log! [level args fmeta] - (timbre/log! level :p args {:?line (:line fmeta) - :?file (:file fmeta) - :?ns-str (:ns-str fmeta)})) - (defn ^:private exit-server [server] (logger/info "Exiting...") (lsp.server/shutdown server) ;; blocks, waiting up to 10s for previously received messages to be processed @@ -105,6 +99,10 @@ (when arguments {:arguments arguments})))) +;; ----------------- +;; MCP prompts +;; ----------------- + (defmethod lsp.server/receive-request "prompts/list" [_ {:keys [db*]} params] ;; TODO might contain a cursor (logger/info "prompts/list" params) @@ -128,6 +126,10 @@ {:description description :messages (prompt-function (or arguments {}))})) +;; ----------------- +;; MCP resources +;; ----------------- + (defmethod lsp.server/receive-request "resources/list" [_ {:keys [db*]} _] (logger/info "resources/list") (let [resources @@ -156,6 +158,10 @@ (logger/info "resources/subscribe" params) {:resource-templates []}) +;; ----------------- +;; MCP Tools +;; ----------------- + (defmethod lsp.server/receive-request "tools/list" [_ {:keys [db*]} _] ;; TODO cursors (logger/info "tools/list " (->> (:mcp.prompts/registry @db*) @@ -253,40 +259,6 @@ (logger/info "content " (with-out-str (pprint/pprint response))) response))) -(defmethod lsp.server/receive-request "docker/prompts/register" [_ {:keys [db* id]} params] - (logger/info "docker/prompts/register")) - -(defmethod lsp.server/receive-request "docker/prompts/run" - [_ {:keys [db* id] :as components} {:keys [thread-id] {:keys [file content uri]} :prompts :as params}] - (lsp.server/discarding-stdout - (let [conversation-id (str (java.util.UUID/randomUUID)) - prompt-string (cond - file (slurp file) - content content - uri (slurp (git/prompt-file uri)))] - (swap! db* update-in [:mcp/conversations] (fnil assoc {}) conversation-id - {:state-promise - (p/create - (fn [resolve reject] - (resolve - (async/ {} - (assoc-in [:opts :conversation-id] conversation-id) - (assoc-in [:opts :thread-id] thread-id) - (assoc-in [:opts :prompt-content] prompt-string) - (state/construct-initial-state-from-prompts))] - (graph/stream - (if (-> m :metadata :agent) - ((graph/require-graph (-> m :metadata :agent)) m) - (graph/chat-with-tools m)) - m))) - (if thread-id - {:thread-id thread-id :save-thread-volume false} - {}))))))}) - {:conversation-id conversation-id}))) - (defn ^:private monitor-server-logs [log-ch] ;; NOTE: if this were moved to `initialize`, after timbre has been configured, ;; the server's startup logs and traces would appear in the regular log file @@ -297,80 +269,8 @@ (apply log-wrapper-fn log-args) (recur)))) -(defn decide-log-path [] - (let [prompts-dir (fs/file "/prompts")] - (if (fs/exists? prompts-dir) - (do - (fs/create-dirs (fs/file prompts-dir "log")) - (fs/file prompts-dir "log/docker-mcp-server.out")) - (do - (fs/create-dirs (fs/file "./log")) - (fs/file "./log/docker-mcp-server.out"))))) - -(defrecord TimbreLogger [] - logger/ILogger - (setup [this] - (let [log-path (str (decide-log-path))] - (timbre/merge-config! {:middleware [#(assoc % :hostname_ "")] - :appenders {:println {:enabled? false} - :spit (appenders/spit-appender {:fname log-path})}}) - (timbre/handle-uncaught-jvm-exceptions!) - (logger/set-logger! this) - log-path)) - - (set-log-path [_this log-path] - (timbre/merge-config! {:appenders {:spit (appenders/spit-appender {:fname log-path})}})) - - (-info [_this fmeta arg1] (log! :info [arg1] fmeta)) - (-info [_this fmeta arg1 arg2] (log! :info [arg1 arg2] fmeta)) - (-info [_this fmeta arg1 arg2 arg3] (log! :info [arg1 arg2 arg3] fmeta)) - (-warn [_this fmeta arg1] (log! :warn [arg1] fmeta)) - (-warn [_this fmeta arg1 arg2] (log! :warn [arg1 arg2] fmeta)) - (-warn [_this fmeta arg1 arg2 arg3] (log! :warn [arg1 arg2 arg3] fmeta)) - (-error [_this fmeta arg1] (log! :error [arg1] fmeta)) - (-error [_this fmeta arg1 arg2] (log! :error [arg1 arg2] fmeta)) - (-error [_this fmeta arg1 arg2 arg3] (log! :error [arg1 arg2 arg3] fmeta)) - (-debug [_this fmeta arg1] (log! :debug [arg1] fmeta)) - (-debug [_this fmeta arg1 arg2] (log! :debug [arg1 arg2] fmeta)) - (-debug [_this fmeta arg1 arg2 arg3] (log! :debug [arg1 arg2 arg3] fmeta))) - (def producers (atom [])) -(defrecord ^:private McpProducer - [server db*] - producer/IProducer - - (publish-exit [_this p] - (logger/info "publish-exit " p) - (lsp.server/discarding-stdout - (->> p (lsp.server/send-notification server "$/exit")))) - ; params is a map of progressToken, progress, and total - (publish-progress [_this params] - (lsp.server/discarding-stdout - (->> params (lsp.server/send-notification server "notifications/progress")))) - ; params is a map of level, logger, data - ; level is debug info notice warning error critical alert emergency - (publish-log [_this params] - (->> params (lsp.server/send-notification server "notifications/message"))) - - (publish-prompt-list-changed [_ params] - (logger/info "send prompt list changed") - (->> params (lsp.server/send-notification server "notifications/prompts/list_changed"))) - - (publish-resource-list-changed [_ params] - (logger/info "send resource list changed") - (->> params (lsp.server/send-notification server "notifications/resources/list_changed"))) - - (publish-resource-updated [_ params] - (->> params (lsp.server/send-notification server "notifications/resources/updated"))) - - (publish-tool-list-changed [_ params] - (logger/info "send tool list changed") - (->> params (lsp.server/send-notification server "notifications/tools/list_changed"))) - (publish-docker-notify [_ method params] - (logger/info (format "%s - %s" method params)) - (lsp.server/send-notification server method params))) - (defn get-prompts-dir [] (if (fs/exists? (fs/file "/prompts")) "/prompts" @@ -433,7 +333,7 @@ "create chan server options for any io chan server that we build" [{:keys [trace-level] :or {trace-level "off"} :as opts}] (lsp.server/discarding-stdout - (let [timbre-logger (->TimbreLogger) + (let [timbre-logger (logger/->TimbreLogger) log-path (logger/setup timbre-logger) db* db/db* log-ch (async/chan (async/sliding-buffer 20))] @@ -457,7 +357,7 @@ :keyword-function keyword :server-context-factory (fn [server] - (let [producer (McpProducer. server db*)] + (let [producer (producer/->McpProducer server db*)] (swap! producers conj producer) {:db* db* :logger timbre-logger diff --git a/src/mcp/client.clj b/src/mcp/client.clj index 1bb7fee..53a2d61 100644 --- a/src/mcp/client.clj +++ b/src/mcp/client.clj @@ -8,8 +8,7 @@ (def counter (atom 0)) (defn- mcp-stdio-stateless-server [container] - (when (not (docker/has-image? (:image container))) - (docker/-pull container)) + (docker/check-then-pull container) (let [x (docker/create (assoc container :opts {:StdinOnce true :OpenStdin true @@ -42,14 +41,16 @@ ;; real stdout message (and block (:stdout block)) - (let [message (json/parse-string (:stdout block) keyword)] - (when-let [p (get @response-promises (:id message))] - (async/put! p message)) + + (let [message (try (json/parse-string (:stdout block) keyword) (catch Throwable _))] + (if-let [p (get @response-promises (:id message))] + (async/put! p message) + (logger/debug "no promise found: " block)) (recur (async/alt! c ([v _] v) (async/timeout 15000) :timeout))) - ;; channel is closed +;; channel is closed (nil? block) (do (logger/info "channel closed") @@ -58,7 +59,7 @@ ;; non-stdout message probably :else (do - (logger/debug "socket read loop " block) + (logger/debug "socket read loop " (:stderr block)) (recur (async/alt! c ([v _] v) (async/timeout 15000) :timeout))))) @@ -70,10 +71,16 @@ c (async/promise-chan)] (swap! response-promises assoc id c) (try - (docker/write-to-stdin socket-channel (str (json/generate-string (assoc message :id id :jsonrpc "2.0")) "\n")) + (docker/write-to-stdin socket-channel (str (json/generate-string (assoc message :id id :jsonrpc "2.0")) "\n\n")) (catch Throwable t (println "error closing " t))) c)) + :notification + (fn [message] + (try + (docker/write-to-stdin socket-channel (str (json/generate-string (assoc message :jsonrpc "2.0")) "\n\n")) + (catch Throwable t + (println "error closing " t)))) :dead-channel dead-channel))))) (defn with-running-mcp @@ -86,21 +93,26 @@ a channel with the response the channel will emit [] if there's an error" [container-definition f f1] (try - (let [{:keys [request dead-channel] :as container} (mcp-stdio-stateless-server container-definition)] + (let [{:keys [request notification dead-channel] :as container} (mcp-stdio-stateless-server container-definition)] (Thread/sleep 2000) (async/go (try (if (= :initialized (async/alt! - (request {:method "initialize" :params {}}) :initialized + (request {:method "initialize" :params {:protocolVersion "2024-11-05" + :capabilities {:tools {}} + :clientInfo {:name "docker" + :version "0.1.0"}}}) :initialized dead-channel ([v _] v) (async/timeout 15000) :timeout)) - (let [response (async/alt! - (request (f)) ([v _] v) - dead-channel ([v _] v) - (async/timeout 15000) :timeout)] - (logger/debug (format "%s response %s" (:image container-definition) response)) - (f1 response)) + (do + (notification {:method "notifications/initialized" :params {}}) + (let [response (async/alt! + (request (f)) ([v _] v) + dead-channel ([v _] v) + (async/timeout 15000) :timeout)] + (logger/debug (format "%s response %s" (:image container-definition) response)) + (f1 response))) (do (logger/error (format "mcp server channel did not initialize for %s" @@ -126,13 +138,13 @@ (comment (docker/run-container - {:image "vonwig/stripe:latest" - :secrets {:stripe.api_key "API_KEY"} - :entrypoint ["/bin/sh" "-c" "cat /secret/stripe.api_key"]}) + {:image "vonwig/stripe:latest" + :secrets {:stripe.api_key "API_KEY"} + :entrypoint ["/bin/sh" "-c" "cat /secret/stripe.api_key"]}) (docker/run-container - {:image "vonwig/stripe:latest" - :secrets {:stripe.api_key "API_KEY"} - :entrypoint ["/bin/sh" "-c" "cat /secret/stripe.api_key"]}) + {:image "vonwig/stripe:latest" + :secrets {:stripe.api_key "API_KEY"} + :entrypoint ["/bin/sh" "-c" "cat /secret/stripe.api_key"]}) (async/