Skip to content

Commit

Permalink
Better Kusto Tracing for self-hosted runner. (#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingluoHuang committed Apr 15, 2020
1 parent 7f092c9 commit 8456f76
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 44 deletions.
17 changes: 14 additions & 3 deletions src/Runner.Common/HostContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface IHostContext : IDisposable
CancellationToken RunnerShutdownToken { get; }
ShutdownReason RunnerShutdownReason { get; }
ISecretMasker SecretMasker { get; }
ProductInfoHeaderValue UserAgent { get; }
List<ProductInfoHeaderValue> UserAgents { get; }
RunnerWebProxy WebProxy { get; }
string GetDirectory(WellKnownDirectory directory);
string GetConfigFile(WellKnownConfigFile configFile);
Expand Down Expand Up @@ -54,7 +54,7 @@ public sealed class HostContext : EventListener, IObserver<DiagnosticListener>,
private readonly ConcurrentDictionary<Type, object> _serviceInstances = new ConcurrentDictionary<Type, object>();
private readonly ConcurrentDictionary<Type, Type> _serviceTypes = new ConcurrentDictionary<Type, Type>();
private readonly ISecretMasker _secretMasker = new SecretMasker();
private readonly ProductInfoHeaderValue _userAgent = new ProductInfoHeaderValue($"GitHubActionsRunner-{BuildConstants.RunnerPackage.PackageName}", BuildConstants.RunnerPackage.Version);
private readonly List<ProductInfoHeaderValue> _userAgents = new List<ProductInfoHeaderValue>() { new ProductInfoHeaderValue($"GitHubActionsRunner-{BuildConstants.RunnerPackage.PackageName}", BuildConstants.RunnerPackage.Version) };
private CancellationTokenSource _runnerShutdownTokenSource = new CancellationTokenSource();
private object _perfLock = new object();
private Tracing _trace;
Expand All @@ -72,7 +72,7 @@ public sealed class HostContext : EventListener, IObserver<DiagnosticListener>,
public CancellationToken RunnerShutdownToken => _runnerShutdownTokenSource.Token;
public ShutdownReason RunnerShutdownReason { get; private set; }
public ISecretMasker SecretMasker => _secretMasker;
public ProductInfoHeaderValue UserAgent => _userAgent;
public List<ProductInfoHeaderValue> UserAgents => _userAgents;
public RunnerWebProxy WebProxy => _webProxy;
public HostContext(string hostType, string logFile = null)
{
Expand Down Expand Up @@ -189,6 +189,17 @@ public HostContext(string hostType, string logFile = null)
{
_trace.Info($"No proxy settings were found based on environmental variables (http_proxy/https_proxy/HTTP_PROXY/HTTPS_PROXY)");
}

var credFile = GetConfigFile(WellKnownConfigFile.Credentials);
if (File.Exists(credFile))
{
var credData = IOUtil.LoadObject<CredentialData>(credFile);
if (credData != null &&
credData.Data.TryGetValue("clientId", out var clientId))
{
_userAgents.Add(new ProductInfoHeaderValue($"RunnerId", clientId));
}
}
}

public string GetDirectory(WellKnownDirectory directory)
Expand Down
6 changes: 3 additions & 3 deletions src/Runner.Common/RunnerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface IRunnerServer : IRunnerService

// job request
Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken);
Task<TaskAgentJobRequest> RenewAgentRequestAsync(int poolId, long requestId, Guid lockToken, CancellationToken cancellationToken);
Task<TaskAgentJobRequest> RenewAgentRequestAsync(int poolId, long requestId, Guid lockToken, string orchestrationId, CancellationToken cancellationToken);
Task<TaskAgentJobRequest> FinishAgentRequestAsync(int poolId, long requestId, Guid lockToken, DateTime finishTime, TaskResult result, CancellationToken cancellationToken);

// agent package
Expand Down Expand Up @@ -300,10 +300,10 @@ public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId,
// JobRequest
//-----------------------------------------------------------------

public Task<TaskAgentJobRequest> RenewAgentRequestAsync(int poolId, long requestId, Guid lockToken, CancellationToken cancellationToken = default(CancellationToken))
public Task<TaskAgentJobRequest> RenewAgentRequestAsync(int poolId, long requestId, Guid lockToken, string orchestrationId = null, CancellationToken cancellationToken = default(CancellationToken))
{
CheckConnection(RunnerConnectionType.JobRequest);
return _requestTaskAgentClient.RenewAgentRequestAsync(poolId, requestId, lockToken, cancellationToken: cancellationToken);
return _requestTaskAgentClient.RenewAgentRequestAsync(poolId, requestId, lockToken, orchestrationId: orchestrationId, cancellationToken: cancellationToken);
}

public Task<TaskAgentJobRequest> FinishAgentRequestAsync(int poolId, long requestId, Guid lockToken, DateTime finishTime, TaskResult result, CancellationToken cancellationToken = default(CancellationToken))
Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Listener/Configuration/ConfigurationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ private async Task<GitHubAuthResult> GetTenantCredential(string githubUrl, strin
using (var httpClient = new HttpClient(httpClientHandler))
{
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("RemoteAuth", githubToken);
httpClient.DefaultRequestHeaders.UserAgent.Add(HostContext.UserAgent);
httpClient.DefaultRequestHeaders.UserAgent.AddRange(HostContext.UserAgents);

var bodyObject = new Dictionary<string, string>()
{
Expand Down
32 changes: 24 additions & 8 deletions src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using GitHub.Services.Common;
using GitHub.Runner.Common;
using GitHub.Runner.Sdk;
using GitHub.Services.WebApi.Jwt;

namespace GitHub.Runner.Listener
{
Expand Down Expand Up @@ -86,15 +87,30 @@ public void Run(Pipelines.AgentJobRequestMessage jobRequestMessage, bool runOnce
}
}

var orchestrationId = string.Empty;
var systemConnection = jobRequestMessage.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
if (systemConnection?.Authorization != null &&
systemConnection.Authorization.Parameters.TryGetValue("AccessToken", out var accessToken) &&
!string.IsNullOrEmpty(accessToken))
{
var jwt = JsonWebToken.Create(accessToken);
var claims = jwt.ExtractClaims();
orchestrationId = claims.FirstOrDefault(x => string.Equals(x.Type, "orchid", StringComparison.OrdinalIgnoreCase))?.Value;
if (!string.IsNullOrEmpty(orchestrationId))
{
Trace.Info($"Pull OrchestrationId {orchestrationId} from JWT claims");
}
}

WorkerDispatcher newDispatch = new WorkerDispatcher(jobRequestMessage.JobId, jobRequestMessage.RequestId);
if (runOnce)
{
Trace.Info("Start dispatcher for one time used runner.");
newDispatch.WorkerDispatch = RunOnceAsync(jobRequestMessage, currentDispatch, newDispatch.WorkerCancellationTokenSource.Token, newDispatch.WorkerCancelTimeoutKillTokenSource.Token);
newDispatch.WorkerDispatch = RunOnceAsync(jobRequestMessage, orchestrationId, currentDispatch, newDispatch.WorkerCancellationTokenSource.Token, newDispatch.WorkerCancelTimeoutKillTokenSource.Token);
}
else
{
newDispatch.WorkerDispatch = RunAsync(jobRequestMessage, currentDispatch, newDispatch.WorkerCancellationTokenSource.Token, newDispatch.WorkerCancelTimeoutKillTokenSource.Token);
newDispatch.WorkerDispatch = RunAsync(jobRequestMessage, orchestrationId, currentDispatch, newDispatch.WorkerCancellationTokenSource.Token, newDispatch.WorkerCancelTimeoutKillTokenSource.Token);
}

_jobInfos.TryAdd(newDispatch.JobId, newDispatch);
Expand Down Expand Up @@ -284,11 +300,11 @@ private async Task EnsureDispatchFinished(WorkerDispatcher jobDispatch, bool can
}
}

private async Task RunOnceAsync(Pipelines.AgentJobRequestMessage message, WorkerDispatcher previousJobDispatch, CancellationToken jobRequestCancellationToken, CancellationToken workerCancelTimeoutKillToken)
private async Task RunOnceAsync(Pipelines.AgentJobRequestMessage message, string orchestrationId, WorkerDispatcher previousJobDispatch, CancellationToken jobRequestCancellationToken, CancellationToken workerCancelTimeoutKillToken)
{
try
{
await RunAsync(message, previousJobDispatch, jobRequestCancellationToken, workerCancelTimeoutKillToken);
await RunAsync(message, orchestrationId, previousJobDispatch, jobRequestCancellationToken, workerCancelTimeoutKillToken);
}
finally
{
Expand All @@ -297,7 +313,7 @@ private async Task RunOnceAsync(Pipelines.AgentJobRequestMessage message, Worker
}
}

private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDispatcher previousJobDispatch, CancellationToken jobRequestCancellationToken, CancellationToken workerCancelTimeoutKillToken)
private async Task RunAsync(Pipelines.AgentJobRequestMessage message, string orchestrationId, WorkerDispatcher previousJobDispatch, CancellationToken jobRequestCancellationToken, CancellationToken workerCancelTimeoutKillToken)
{
Busy = true;
try
Expand Down Expand Up @@ -328,7 +344,7 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDisp

// start renew job request
Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, firstJobRequestRenewed, lockRenewalTokenSource.Token);
Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token);

// wait till first renew succeed or job request is canceled
// not even start worker if the first renew fail
Expand Down Expand Up @@ -607,7 +623,7 @@ await processChannel.SendAsync(
}
}

public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToken, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
{
var runnerServer = HostContext.GetService<IRunnerServer>();
TaskAgentJobRequest request = null;
Expand All @@ -620,7 +636,7 @@ public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToke
{
try
{
request = await runnerServer.RenewAgentRequestAsync(poolId, requestId, lockToken, token);
request = await runnerServer.RenewAgentRequestAsync(poolId, requestId, lockToken, orchestrationId, token);

Trace.Info($"Successfully renew job request {requestId}, job is valid till {request.LockedUntil.Value}");

Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public async Task<int> ExecuteCommand(CommandSettings command)
{
try
{
VssUtil.InitializeVssClientSettings(HostContext.UserAgent, HostContext.WebProxy);
VssUtil.InitializeVssClientSettings(HostContext.UserAgents, HostContext.WebProxy);

_inConfigStage = true;
_completedCommand.Reset();
Expand Down
4 changes: 2 additions & 2 deletions src/Runner.Sdk/Util/VssUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ namespace GitHub.Runner.Sdk
{
public static class VssUtil
{
public static void InitializeVssClientSettings(ProductInfoHeaderValue additionalUserAgent, IWebProxy proxy)
public static void InitializeVssClientSettings(List<ProductInfoHeaderValue> additionalUserAgents, IWebProxy proxy)
{
var headerValues = new List<ProductInfoHeaderValue>();
headerValues.Add(additionalUserAgent);
headerValues.AddRange(additionalUserAgents);
headerValues.Add(new ProductInfoHeaderValue($"({RuntimeInformation.OSDescription.Trim()})"));

if (VssClientHttpRequestSettings.Default.UserAgent != null && VssClientHttpRequestSettings.Default.UserAgent.Count > 0)
Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Worker/ActionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ private async Task DownloadRepositoryActionAsync(IExecutionContext executionCont
// Intentionally empty. Temporary for GHES alpha release, download from dotcom unauthenticated.
}

httpClient.DefaultRequestHeaders.UserAgent.Add(HostContext.UserAgent);
httpClient.DefaultRequestHeaders.UserAgent.AddRange(HostContext.UserAgents);
using (var result = await httpClient.GetStreamAsync(archiveLink))
{
await result.CopyToAsync(fs, _defaultCopyBufferSize, actionDownloadCancellation.Token);
Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Worker/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task<int> RunAsync(string pipeIn, string pipeOut)
// Validate args.
ArgUtil.NotNullOrEmpty(pipeIn, nameof(pipeIn));
ArgUtil.NotNullOrEmpty(pipeOut, nameof(pipeOut));
VssUtil.InitializeVssClientSettings(HostContext.UserAgent, HostContext.WebProxy);
VssUtil.InitializeVssClientSettings(HostContext.UserAgents, HostContext.WebProxy);
var jobRunner = HostContext.CreateService<IJobRunner>();

using (var channel = HostContext.CreateService<IProcessChannel>())
Expand Down
28 changes: 26 additions & 2 deletions src/Sdk/DTWebApi/WebApi/TaskAgentHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public Task<TaskAgentJobRequest> RenewAgentRequestAsync(
Int64 requestId,
Guid lockToken,
DateTime? expiresOn = null,
string orchestrationId = null,
Object userState = null,
CancellationToken cancellationToken = default(CancellationToken))
{
Expand All @@ -104,7 +105,30 @@ public Task<TaskAgentJobRequest> RenewAgentRequestAsync(
LockedUntil = expiresOn,
};

return UpdateAgentRequestAsync(poolId, requestId, lockToken, request, userState, cancellationToken);
var additionalHeaders = new Dictionary<string, string>();
if (!string.IsNullOrEmpty(orchestrationId))
{
additionalHeaders["X-VSS-OrchestrationId"] = orchestrationId;
}

HttpMethod httpMethod = new HttpMethod("PATCH");
Guid locationId = new Guid("fc825784-c92a-4299-9221-998a02d1b54f");
object routeValues = new { poolId = poolId, requestId = requestId };
HttpContent content = new ObjectContent<TaskAgentJobRequest>(request, new VssJsonMediaTypeFormatter(true));

List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
queryParams.Add("lockToken", lockToken.ToString());

return SendAsync<TaskAgentJobRequest>(
httpMethod,
additionalHeaders,
locationId,
routeValues: routeValues,
version: new ApiResourceVersion(5.1, 1),
queryParameters: queryParams,
userState: userState,
cancellationToken: cancellationToken,
content: content);
}

public Task<TaskAgent> ReplaceAgentAsync(
Expand Down Expand Up @@ -171,5 +195,5 @@ protected async Task<T> SendAsync<T>(
}

private readonly ApiResourceVersion m_currentApiVersion = new ApiResourceVersion(3.0, 1);
}
}
}
2 changes: 1 addition & 1 deletion src/Sdk/WebApi/WebApi/Jwt/JsonWebTokenUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ internal static IEnumerable<Claim> TranslateFromJwtClaims(IDictionary<string, ob
return ret;
}

internal static IEnumerable<Claim> ExtractClaims(this JsonWebToken token)
public static IEnumerable<Claim> ExtractClaims(this JsonWebToken token)
{
ArgumentUtility.CheckForNull(token, nameof(token));

Expand Down
Loading

0 comments on commit 8456f76

Please sign in to comment.