|
49 | 49 | except ImportError:
|
50 | 50 | _ToolLike = Any
|
51 | 51 |
|
| 52 | + try: |
| 53 | + from opentelemetry.sdk import trace |
| 54 | + |
| 55 | + TracerProvider = trace.TracerProvider |
| 56 | + SpanProcessor = trace.SpanProcessor |
| 57 | + SynchronousMultiSpanProcessor = trace.SynchronousMultiSpanProcessor |
| 58 | + except ImportError: |
| 59 | + TracerProvider = Any |
| 60 | + SpanProcessor = Any |
| 61 | + SynchronousMultiSpanProcessor = Any |
| 62 | + |
52 | 63 |
|
53 | 64 | def _default_runnable_kwargs(has_history: bool) -> Mapping[str, Any]:
|
54 | 65 | # https://github.com/langchain-ai/langchain/blob/5784dfed001730530637793bea1795d9d5a7c244/libs/core/langchain_core/runnables/history.py#L237-L241
|
@@ -214,6 +225,34 @@ def _validate_tools(tools: Sequence["_ToolLike"]):
|
214 | 225 | _validate_callable_parameters_are_annotated(tool)
|
215 | 226 |
|
216 | 227 |
|
| 228 | +def _override_active_span_processor( |
| 229 | + tracer_provider: "TracerProvider", |
| 230 | + active_span_processor: "SynchronousMultiSpanProcessor", |
| 231 | +): |
| 232 | + """Overrides the active span processor. |
| 233 | +
|
| 234 | + When working with multiple LangchainAgents in the same environment, |
| 235 | + it's crucial to manage trace exports carefully. |
| 236 | + Each agent needs its own span processor tied to a unique project ID. |
| 237 | + While we add a new span processor for each agent, this can lead to |
| 238 | + unexpected behavior. |
| 239 | + For instance, with two agents linked to different projects, traces from the |
| 240 | + second agent might be sent to both projects. |
| 241 | + To prevent this and guarantee traces go to the correct project, we overwrite |
| 242 | + the active span processor whenever a new LangchainAgent is created. |
| 243 | +
|
| 244 | + Args: |
| 245 | + tracer_provider (TracerProvider): |
| 246 | + The tracer provider to use for the project. |
| 247 | + active_span_processor (SynchronousMultiSpanProcessor): |
| 248 | + The active span processor overrides the tracer provider's |
| 249 | + active span processor. |
| 250 | + """ |
| 251 | + if tracer_provider._active_span_processor: |
| 252 | + tracer_provider._active_span_processor.shutdown() |
| 253 | + tracer_provider._active_span_processor = active_span_processor |
| 254 | + |
| 255 | + |
217 | 256 | class LangchainAgent:
|
218 | 257 | """A Langchain Agent.
|
219 | 258 |
|
@@ -419,17 +458,55 @@ def set_up(self):
|
419 | 458 | credentials=credentials.with_quota_project(self._project),
|
420 | 459 | ),
|
421 | 460 | )
|
422 |
| - span_processor = opentelemetry_sdk_trace.export.SimpleSpanProcessor( |
423 |
| - span_exporter=span_exporter, |
| 461 | + span_processor: SpanProcessor = ( |
| 462 | + opentelemetry_sdk_trace.export.SimpleSpanProcessor( |
| 463 | + span_exporter=span_exporter, |
| 464 | + ) |
| 465 | + ) |
| 466 | + tracer_provider: TracerProvider = ( |
| 467 | + opentelemetry.trace.get_tracer_provider() |
424 | 468 | )
|
425 |
| - tracer_provider = opentelemetry.trace.get_tracer_provider() |
426 |
| - if tracer_provider and _utils._is_noop_tracer_provider(tracer_provider): |
427 |
| - # Avoids AttributeError: 'ProxyTracerProvider' object has no |
428 |
| - # attribute 'add_span_processor' |
| 469 | + # Get the appropriate tracer provider: |
| 470 | + # 1. If _TRACER_PROVIDER is already set, use that. |
| 471 | + # 2. Otherwise, if the OTEL_PYTHON_TRACER_PROVIDER environment |
| 472 | + # variable is set, use that. |
| 473 | + # 3. As a final fallback, use _PROXY_TRACER_PROVIDER. |
| 474 | + # If none of the above is set, we log a warning, and |
| 475 | + # create a tracer provider. |
| 476 | + if not tracer_provider: |
| 477 | + from google.cloud.aiplatform import base |
| 478 | + |
| 479 | + _LOGGER = base.Logger(__name__) |
| 480 | + _LOGGER.warning( |
| 481 | + "No tracer provider. By default, " |
| 482 | + "we should get one of the following providers: " |
| 483 | + "OTEL_PYTHON_TRACER_PROVIDER, _TRACER_PROVIDER, " |
| 484 | + "or _PROXY_TRACER_PROVIDER." |
| 485 | + ) |
429 | 486 | tracer_provider = opentelemetry_sdk_trace.TracerProvider()
|
430 |
| - opentelemetry.trace.set_tracer_provider(tracer_provider) |
| 487 | + opentelemetry.trace.set_tracer_provider(tracer_provider) |
| 488 | + # Avoids AttributeError: |
| 489 | + # 'ProxyTracerProvider' and 'NoOpTracerProvider' objects has no |
| 490 | + # attribute 'add_span_processor'. |
| 491 | + if _utils.is_noop_or_proxy_tracer_provider(tracer_provider): |
| 492 | + tracer_provider = opentelemetry_sdk_trace.TracerProvider() |
| 493 | + opentelemetry.trace.set_tracer_provider(tracer_provider) |
| 494 | + # Avoids OpenTelemetry client already exists error. |
| 495 | + _override_active_span_processor( |
| 496 | + tracer_provider, |
| 497 | + opentelemetry_sdk_trace.SynchronousMultiSpanProcessor(), |
| 498 | + ) |
431 | 499 | tracer_provider.add_span_processor(span_processor)
|
| 500 | + # Keep the instrumentation up-to-date. |
| 501 | + # When creating multiple LangchainAgents, |
| 502 | + # we need to keep the instrumentation up-to-date. |
| 503 | + # We deliberately override the instrument each time, |
| 504 | + # so that if different agents end up using different |
| 505 | + # instrumentations, we guarantee that the user is always |
| 506 | + # working with the most recent agent's instrumentation. |
432 | 507 | self._instrumentor = openinference_langchain.LangChainInstrumentor()
|
| 508 | + if self._instrumentor.is_instrumented_by_opentelemetry: |
| 509 | + self._instrumentor.uninstrument() |
433 | 510 | self._instrumentor.instrument()
|
434 | 511 | else:
|
435 | 512 | from google.cloud.aiplatform import base
|
|
0 commit comments